Compare commits

..

12 Commits

Author SHA1 Message Date
Pratik Mankawde
0e815aa1ac feat: Migrate production entry points from Boost.Coroutine to C++20 coroutines
Migrate all production coroutine entry points from Boost.Coroutine
to C++20 std::coroutine using the CoroTask/CoroTaskRunner primitives:

- RipplePathFind: Replace Coro suspend/resume with co_await pattern,
  add cv timeout for graceful shutdown.
- ServerHandler: Replace Coro-based processRequest with CoroTask,
  simplify coroutine lifecycle management.
- GRPCServer: Replace Coro with CoroTask for streaming RPC handlers.
- Remove Coro usage from Context.h aggregate initialization.
- Add exception handling in coroutine bodies to prevent unhandled
  exceptions from escaping the coroutine frame.
2026-03-25 15:48:10 +00:00
Pratik Mankawde
21149a81e3 feat: Add C++20 coroutine primitives: CoroTask, CoroTaskRunner, JobQueueAwaiter
Add C++20 std::coroutine based task primitives for the JobQueue:

- CoroTask<T>: A coroutine return type with RAII ownership semantics
  and symmetric transfer for efficient resumption.
- CoroTaskRunner: Manages coroutine lifecycle on the JobQueue with
  suspend/resume tracking, LocalValue preservation, and graceful
  shutdown support.
- JobQueueAwaiter: External awaiter combining yield+post atomically.
- yieldAndPost(): Inline awaiter workaround for GCC-12 codegen bug
  where external awaiters at multiple co_await points corrupt the
  coroutine state machine resume index.
- CoroTask_test: Comprehensive test suite covering task lifecycle,
  suspend/resume, shutdown, and value-returning coroutines.
- BoostToStdCoroutineSwitchPlan.md: Migration plan documentation.
2026-03-25 15:46:44 +00:00
Pratik Mankawde
b78202a99a docs: Add Boost to C++20 coroutine migration plan
Comprehensive migration plan documenting the switch from
Boost.Coroutine2 to C++20 standard coroutines in rippled, including
research analysis, implementation phases, risk assessment, and
testing strategy.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-25 15:44:19 +00:00
Alex Kremer
403fd7c649 fix: More clang-tidy issues found after merging to develop (#6640)
Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
Co-authored-by: Bart <bthomee@users.noreply.github.com>
2026-03-25 14:28:28 +00:00
Ayaz Salikhov
dfed0481f7 docs: Rewrite conan docs for custom recipes (#6647) 2026-03-25 14:25:33 +00:00
Bart
0dc0c8e912 docs: Update LICENSE.md year to present (#6636)
Co-authored-by: Bart <11445373+bthomee@users.noreply.github.com>
2026-03-25 14:24:10 +00:00
Ayaz Salikhov
0510ee47d7 chore: Update some external dependencies (#6642) 2026-03-25 10:44:14 +00:00
Ayaz Salikhov
589c9c694c chore: Update external dependencies due to upstream merge (#6630) 2026-03-24 23:18:41 +00:00
Jingchen
4096623ae1 chore: Remove the forward declarations that cause build errors when unity build is enabled (#6633)
Signed-off-by: JCW <a1q123456@users.noreply.github.com>
2026-03-24 23:00:41 +00:00
Alex Kremer
dda162087f docs: Add note about clang-tidy installation (#6634) 2026-03-24 21:18:03 +00:00
Mayukha Vadari
85a4015a64 fix: Assorted Permissioned Domain fixes (#6587) 2026-03-24 18:53:57 +00:00
Mayukha Vadari
f7bb4018fa fix: Assorted Vault fixes (#6607) 2026-03-24 18:53:49 +00:00
60 changed files with 4511 additions and 1369 deletions

View File

@@ -99,15 +99,14 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
continue
# RHEL:
# - 9 using GCC 12: Debug and Release on linux/amd64
# (Release is required for RPM packaging).
# - 9 using GCC 12: Debug on linux/amd64.
# - 10 using Clang: Release on linux/amd64.
if os["distro_name"] == "rhel":
skip = True
if os["distro_version"] == "9":
if (
f"{os['compiler_name']}-{os['compiler_version']}" == "gcc-12"
and build_type in ["Debug", "Release"]
and build_type == "Debug"
and architecture["platform"] == "linux/amd64"
):
skip = False
@@ -122,8 +121,7 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
continue
# Ubuntu:
# - Jammy using GCC 12: Debug on linux/arm64, Release on
# linux/amd64 (Release is required for DEB packaging).
# - Jammy using GCC 12: Debug on linux/arm64.
# - Noble using GCC 14: Release on linux/amd64.
# - Noble using Clang 18: Debug on linux/amd64.
# - Noble using Clang 19: Release on linux/arm64.
@@ -136,12 +134,6 @@ def generate_strategy_matrix(all: bool, config: Config) -> list:
and architecture["platform"] == "linux/arm64"
):
skip = False
if (
f"{os['compiler_name']}-{os['compiler_version']}" == "gcc-12"
and build_type == "Release"
and architecture["platform"] == "linux/amd64"
):
skip = False
elif os["distro_version"] == "noble":
if (
f"{os['compiler_name']}-{os['compiler_version']}" == "gcc-14"

View File

@@ -1,66 +0,0 @@
name: Manual Package Build
on:
workflow_dispatch:
inputs:
pkg_type:
description: "Package type"
required: true
type: choice
options:
- deb
- rpm
- both
artifact_run_id:
description: "Run ID to download binary artifact from (leave empty for latest on this branch)"
required: false
type: string
version:
description: "Version override (leave empty to auto-detect)"
required: false
type: string
pkg_release:
description: "Package release number (default: 1)"
required: false
type: string
default: "1"
defaults:
run:
shell: bash
jobs:
generate-version:
runs-on: ubuntu-latest
outputs:
version: ${{ inputs.version || steps.version.outputs.version }}
steps:
- name: Checkout repository
if: ${{ !inputs.version }}
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Generate version
if: ${{ !inputs.version }}
id: version
uses: ./.github/actions/generate-version
package-deb:
if: ${{ inputs.pkg_type == 'deb' || inputs.pkg_type == 'both' }}
needs: generate-version
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: deb
artifact_name: xrpld-ubuntu-jammy-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
pkg_release: ${{ inputs.pkg_release }}
container_image: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
package-rpm:
if: ${{ inputs.pkg_type == 'rpm' || inputs.pkg_type == 'both' }}
needs: generate-version
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: rpm
artifact_name: xrpld-rhel-9-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
pkg_release: ${{ inputs.pkg_release }}
container_image: ghcr.io/xrplf/ci/rhel-9:gcc-12

View File

@@ -67,7 +67,6 @@ jobs:
.github/workflows/reusable-build-test.yml
.github/workflows/reusable-clang-tidy.yml
.github/workflows/reusable-clang-tidy-files.yml
.github/workflows/reusable-package.yml
.github/workflows/reusable-strategy-matrix.yml
.github/workflows/reusable-test.yml
.github/workflows/reusable-upload-recipe.yml
@@ -82,8 +81,6 @@ jobs:
CMakeLists.txt
conanfile.py
conan.lock
package/**
- name: Check whether to run
# This step determines whether the rest of the workflow should
# run. The rest of the workflow will run if this job runs AND at
@@ -140,39 +137,6 @@ jobs:
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
generate-version:
needs: should-run
if: ${{ needs.should-run.outputs.go == 'true' }}
runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Generate version
id: version
uses: ./.github/actions/generate-version
package-deb:
needs: [should-run, build-test, generate-version]
if: ${{ needs.should-run.outputs.go == 'true' }}
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: deb
artifact_name: xrpld-ubuntu-jammy-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
package-rpm:
needs: [should-run, build-test, generate-version]
if: ${{ needs.should-run.outputs.go == 'true' }}
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: rpm
artifact_name: xrpld-rhel-9-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/rhel-9:gcc-12
upload-recipe:
needs:
- should-run

View File

@@ -1,5 +1,5 @@
# This workflow uploads the libxrpl recipe to the Conan remote and builds
# release packages when a versioned tag is pushed.
# This workflow uploads the libxrpl recipe to the Conan remote when a versioned
# tag is pushed.
name: Tag
on:
@@ -22,49 +22,3 @@ jobs:
secrets:
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
remote_password: ${{ secrets.CONAN_REMOTE_PASSWORD }}
build-test:
if: ${{ github.repository == 'XRPLF/rippled' }}
uses: ./.github/workflows/reusable-build-test.yml
strategy:
fail-fast: true
matrix:
os: [linux]
with:
ccache_enabled: false
os: ${{ matrix.os }}
strategy_matrix: minimal
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
generate-version:
if: ${{ github.repository == 'XRPLF/rippled' }}
runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Generate version
id: version
uses: ./.github/actions/generate-version
package-deb:
needs: [build-test, generate-version]
if: ${{ github.repository == 'XRPLF/rippled' }}
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: deb
artifact_name: xrpld-ubuntu-jammy-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
package-rpm:
needs: [build-test, generate-version]
if: ${{ github.repository == 'XRPLF/rippled' }}
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: rpm
artifact_name: xrpld-rhel-9-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/rhel-9:gcc-12

View File

@@ -38,8 +38,6 @@ on:
- "CMakeLists.txt"
- "conanfile.py"
- "conan.lock"
- "package/**"
- ".github/workflows/reusable-package.yml"
# Run at 06:32 UTC on every day of the week from Monday through Friday. This
# will force all dependencies to be rebuilt, which is useful to verify that
@@ -79,7 +77,7 @@ jobs:
strategy:
fail-fast: ${{ github.event_name == 'merge_group' }}
matrix:
os: [linux]
os: [linux, macos, windows]
with:
# Enable ccache only for events targeting the XRPLF repository, since
# other accounts will not have access to our remote cache storage.
@@ -100,32 +98,3 @@ jobs:
secrets:
remote_username: ${{ secrets.CONAN_REMOTE_USERNAME }}
remote_password: ${{ secrets.CONAN_REMOTE_PASSWORD }}
generate-version:
runs-on: ubuntu-latest
outputs:
version: ${{ steps.version.outputs.version }}
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Generate version
id: version
uses: ./.github/actions/generate-version
package-deb:
needs: [build-test, generate-version]
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: deb
artifact_name: xrpld-ubuntu-jammy-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12
package-rpm:
needs: [build-test, generate-version]
uses: ./.github/workflows/reusable-package.yml
with:
pkg_type: rpm
artifact_name: xrpld-rhel-9-gcc-12-amd64-release
version: ${{ needs.generate-version.outputs.version }}
container_image: ghcr.io/xrplf/ci/rhel-9:gcc-12

View File

@@ -51,5 +51,5 @@ jobs:
if: ${{ always() && !cancelled() && (!inputs.check_only_changed || needs.determine-files.outputs.any_cpp_changed == 'true' || needs.determine-files.outputs.clang_tidy_config_changed == 'true') }}
uses: ./.github/workflows/reusable-clang-tidy-files.yml
with:
files: ${{ needs.determine-files.outputs.clang_tidy_config_changed == 'true' && '' || (inputs.check_only_changed && needs.determine-files.outputs.all_changed_files || '') }}
files: ${{ (needs.determine-files.outputs.clang_tidy_config_changed != 'true' && inputs.check_only_changed) && needs.determine-files.outputs.all_changed_files || '' }}
create_issue_on_failure: ${{ inputs.create_issue_on_failure }}

View File

@@ -1,76 +0,0 @@
# Build a Linux package (DEB or RPM) from a pre-built binary artifact.
name: Package
on:
workflow_call:
inputs:
pkg_type:
description: "Package type to build: deb or rpm."
required: true
type: string
artifact_name:
description: "Name of the pre-built binary artifact to download."
required: true
type: string
version:
description: "Version string used for naming the output artifact."
required: true
type: string
pkg_release:
description: "Package release number. Increment when repackaging the same executable."
required: false
type: string
default: "1"
container_image:
description: "Container image to use for packaging."
required: true
type: string
defaults:
run:
shell: bash
env:
BUILD_DIR: build
jobs:
package:
name: ${{ inputs.pkg_type }} (${{ inputs.version }})
runs-on: ["self-hosted", "Linux", "X64", "heavy"]
container: ${{ inputs.container_image }}
timeout-minutes: 30
steps:
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
- name: Download pre-built binary
uses: actions/download-artifact@v4
with:
name: ${{ inputs.artifact_name }}
path: ${{ env.BUILD_DIR }}
- name: Make binary executable
run: chmod +x ${{ env.BUILD_DIR }}/xrpld
- name: Generate RPM spec from template
if: ${{ inputs.pkg_type == 'rpm' }}
run: |
mkdir -p ${{ env.BUILD_DIR }}/package/rpm
sed -e "s/@xrpld_version@/${{ inputs.version }}/" \
-e "s/@pkg_release@/${{ inputs.pkg_release }}/" \
package/rpm/xrpld.spec.in > ${{ env.BUILD_DIR }}/package/rpm/xrpld.spec
- name: Build package
run: |
./package/build_pkg.sh ${{ inputs.pkg_type }} . ${{ env.BUILD_DIR }} "${{ inputs.version }}" "${{ inputs.pkg_release }}"
- name: Upload package artifact
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: xrpld-${{ inputs.pkg_type }}-${{ inputs.version }}
path: |
${{ env.BUILD_DIR }}/debbuild/*.deb
${{ env.BUILD_DIR }}/debbuild/*.ddeb
${{ env.BUILD_DIR }}/rpmbuild/RPMS/**/*.rpm
if-no-files-found: error

View File

@@ -125,9 +125,9 @@ default profile.
### Patched recipes
The recipes in Conan Center occasionally need to be patched for compatibility
with the latest version of `xrpld`. We maintain a fork of the Conan Center
[here](https://github.com/XRPLF/conan-center-index/) containing the patches.
Occasionally, we need patched recipes or recipes not present in Conan Center.
We maintain a fork of the Conan Center Index
[here](https://github.com/XRPLF/conan-center-index/) containing the modified and newly added recipes.
To ensure our patched recipes are used, you must add our Conan remote at a
higher index than the default Conan Center remote, so it is consulted first. You
@@ -137,19 +137,11 @@ can do this by running:
conan remote add --index 0 xrplf https://conan.ripplex.io
```
Alternatively, you can pull the patched recipes into the repository and use them
locally:
Alternatively, you can pull our recipes from the repository and export them locally:
```bash
# Extract the version number from the lockfile.
function extract_version {
version=$(cat conan.lock | sed -nE "s@.+${1}/(.+)#.+@\1@p" | head -n1)
echo ${version}
}
# Define which recipes to export.
recipes=('ed25519' 'grpc' 'nudb' 'openssl' 'secp256k1' 'snappy' 'soci')
folders=('all' 'all' 'all' '3.x.x' 'all' 'all' 'all')
recipes=('abseil' 'ed25519' 'grpc' 'm4' 'mpt-crypto' 'nudb' 'openssl' 'secp256k1' 'snappy' 'soci' 'wasm-xrplf' 'wasmi')
# Selectively check out the recipes from our CCI fork.
cd external
@@ -158,29 +150,19 @@ cd conan-center-index
git init
git remote add origin git@github.com:XRPLF/conan-center-index.git
git sparse-checkout init
for ((index = 1; index <= ${#recipes[@]}; index++)); do
recipe=${recipes[index]}
folder=${folders[index]}
echo "Checking out recipe '${recipe}' from folder '${folder}'..."
git sparse-checkout add recipes/${recipe}/${folder}
for recipe in "${recipes[@]}"; do
echo "Checking out recipe '${recipe}'..."
git sparse-checkout add recipes/${recipe}
done
git fetch origin master
git checkout master
cd ../..
# Export the recipes into the local cache.
for ((index = 1; index <= ${#recipes[@]}; index++)); do
recipe=${recipes[index]}
folder=${folders[index]}
version=$(extract_version ${recipe})
echo "Exporting '${recipe}/${version}' from '${recipe}/${folder}'..."
conan export --version $(extract_version ${recipe}) \
external/conan-center-index/recipes/${recipe}/${folder}
done
./export_all.sh
cd ../../
```
In the case we switch to a newer version of a dependency that still requires a
patch, it will be necessary for you to pull in the changes and re-export the
patch or add a new dependency, it will be necessary for you to pull in the changes and re-export the
updated dependencies with the newer version. However, if we switch to a newer
version that no longer requires a patch, no action is required on your part, as
the new recipe will be automatically pulled from the official Conan Center.
@@ -189,6 +171,8 @@ the new recipe will be automatically pulled from the official Conan Center.
> You might need to add `--lockfile=""` to your `conan install` command
> to avoid automatic use of the existing `conan.lock` file when you run
> `conan export` manually on your machine
>
> This is not recommended though, as you might end up using different revisions of recipes.
### Conan profile tweaks
@@ -204,39 +188,14 @@ Possible values are ['5.0', '5.1', '6.0', '6.1', '7.0', '7.3', '8.0', '8.1',
Read "http://docs.conan.io/2/knowledge/faq.html#error-invalid-setting"
```
you need to amend the list of compiler versions in
`$(conan config home)/settings.yml`, by appending the required version number(s)
you need to add your compiler to the list of compiler versions in
`$(conan config home)/settings_user.yml`, by adding the required version number(s)
to the `version` array specific for your compiler. For example:
```yaml
apple-clang:
version:
[
"5.0",
"5.1",
"6.0",
"6.1",
"7.0",
"7.3",
"8.0",
"8.1",
"9.0",
"9.1",
"10.0",
"11.0",
"12.0",
"13",
"13.0",
"13.1",
"14",
"14.0",
"15",
"15.0",
"16",
"16.0",
"17",
"17.0",
]
compiler:
apple-clang:
version: ["17.0"]
```
#### Multiple compilers

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,142 @@
# Boost.Coroutine to C++20 Migration — Task List
> Parent document: [BoostToStdCoroutineSwitchPlan.md](BoostToStdCoroutineSwitchPlan.md)
---
## Milestone 1: New Coroutine Primitives
- [ ] **1.1** Design `CoroTask<T>` class with `promise_type`
- Define `promise_type` with `initial_suspend`, `final_suspend`, `unhandled_exception`, `return_value`/`return_void`
- Implement `FinalAwaiter` for continuation support
- Implement move-only RAII handle wrapper
- Support both `CoroTask<T>` and `CoroTask<void>`
- [ ] **1.2** Design and implement `JobQueueAwaiter`
- `await_suspend()` calls `jq_.addJob(type, name, [h]{ h.resume(); })`
- Handle `addJob()` failure (shutdown) — resume with error flag or throw
- Integrate `nSuspend_` counter increment/decrement
- [ ] **1.3** Implement `LocalValues` swap in new coroutine resume path
- Before `handle.resume()`: save thread-local, install coroutine-local
- After `handle.resume()` returns: restore thread-local
- Ensure this works when coroutine migrates between threads
- [ ] **1.4** Add `postCoroTask()` template to `JobQueue`
- Accept callable returning `CoroTask<void>`
- Schedule initial execution on JobQueue (mirror `postCoro()` behavior)
- Return a handle/shared_ptr for join/cancel
- [ ] **1.5** Write unit tests (`src/test/core/CoroTask_test.cpp`)
- Test `CoroTask<void>` runs to completion
- Test `CoroTask<int>` returns value
- Test exception propagation across co_await
- Test coroutine destruction before completion
- Test `JobQueueAwaiter` schedules on correct thread
- Test `LocalValue` isolation across 4+ coroutines
- Test shutdown rejection (addJob returns false)
- Test `correct_order` equivalent (yield → join → post → complete)
- Test `incorrect_order` equivalent (post → yield → complete)
- Test multiple sequential co_await points
- [ ] **1.6** Verify build on GCC 12+, Clang 16+
- [ ] **1.7** Run ASAN + TSAN on new tests
- [ ] **1.8** Run full `--unittest` suite (no regressions)
- [ ] **1.9** Self-review and create PR #1
---
## Milestone 2: Entry Point Migration
- [ ] **2.1** Migrate `ServerHandler::onRequest()` (`ServerHandler.cpp:287`)
- Replace `m_jobQueue.postCoro(jtCLIENT_RPC, ...)` with `postCoroTask()`
- Update lambda to return `CoroTask<void>` (add `co_return`)
- Update `processSession` to accept new coroutine type
- [ ] **2.2** Migrate `ServerHandler::onWSMessage()` (`ServerHandler.cpp:325`)
- Replace `m_jobQueue.postCoro(jtCLIENT_WEBSOCKET, ...)` with `postCoroTask()`
- Update lambda signature
- [ ] **2.3** Migrate `GRPCServer::CallData::process()` (`GRPCServer.cpp:102`)
- Replace `app_.getJobQueue().postCoro(JobType::jtRPC, ...)` with `postCoroTask()`
- Update `process(shared_ptr<Coro> coro)` overload signature
- [ ] **2.4** Update `RPC::Context` (`Context.h:27`)
- Replace `std::shared_ptr<JobQueue::Coro> coro{}` with new coroutine wrapper type
- Ensure all code that accesses `context.coro` compiles
- [ ] **2.5** Update `ServerHandler.h` signatures
- `processSession()` and `processRequest()` parameter types
- [ ] **2.6** Update `GRPCServer.h` signatures
- `process()` method parameter types
- [ ] **2.7** Run full `--unittest` suite
- [ ] **2.8** Manual smoke test: HTTP + WS + gRPC RPC requests
- [ ] **2.9** Run ASAN + TSAN
- [ ] **2.10** Self-review and create PR #2
---
## Milestone 3: Handler Migration
- [ ] **3.1** Migrate `doRipplePathFind()` (`RipplePathFind.cpp`)
- Replace `context.coro->yield()` with `co_await PathFindAwaiter{...}`
- Replace continuation lambda's `coro->post()` / `coro->resume()` with awaiter scheduling
- Handle shutdown case (post failure) in awaiter
- [ ] **3.2** Create `PathFindAwaiter` (or use generic `JobQueueAwaiter`)
- Encapsulate the continuation + yield pattern from `RipplePathFind.cpp` lines 108-132
- [ ] **3.3** Update `Path_test.cpp`
- Replace `postCoro` usage with `postCoroTask`
- Ensure `context.coro` usage matches new type
- [ ] **3.4** Update `AMMTest.cpp`
- Replace `postCoro` usage with `postCoroTask`
- [ ] **3.5** Rewrite `Coroutine_test.cpp` for new API
- `correct_order`: postCoroTask → co_await → join → resume → complete
- `incorrect_order`: post before yield equivalent
- `thread_specific_storage`: 4 coroutines with LocalValue isolation
- [ ] **3.6** Update `JobQueue_test.cpp` `testPostCoro`
- Migrate to `postCoroTask` API
- [ ] **3.7** Verify `ripple_path_find` works end-to-end with new coroutines
- [ ] **3.8** Test shutdown-during-pathfind scenario
- [ ] **3.9** Run full `--unittest` suite
- [ ] **3.10** Run ASAN + TSAN
- [ ] **3.11** Self-review and create PR #3
---
## Milestone 4: Cleanup & Validation
- [ ] **4.1** Delete `include/xrpl/core/Coro.ipp`
- [ ] **4.2** Remove from `JobQueue.h`:
- `#include <boost/coroutine2/all.hpp>`
- `struct Coro_create_t`
- `class Coro` (entire class)
- `postCoro()` template
- Comment block (lines 322-377) describing old race condition
- [ ] **4.3** Update `cmake/deps/Boost.cmake`:
- Remove `coroutine` from `find_package(Boost REQUIRED COMPONENTS ...)`
- Remove `Boost::coroutine` from `target_link_libraries`
- [ ] **4.4** Update `cmake/XrplInterface.cmake`:
- Remove `BOOST_COROUTINES2_NO_DEPRECATION_WARNING`
- [ ] **4.5** Run memory benchmark
- Create N=1000 coroutines, compare RSS: before vs after
- Document results
- [ ] **4.6** Run context switch benchmark
- 100K yield/resume cycles, compare latency: before vs after
- Document results
- [ ] **4.7** Run RPC throughput benchmark
- Concurrent `ripple_path_find` requests, compare throughput
- Document results
- [ ] **4.8** Run full `--unittest` suite
- [ ] **4.9** Run ASAN, TSAN, UBSan
- Confirm `__asan_handle_no_return` warnings are gone
- [ ] **4.10** Verify build on all supported compilers
- [ ] **4.11** Self-review and create PR #4
- [ ] **4.12** Document final benchmark results in PR description

View File

@@ -133,7 +133,6 @@ endif()
include(XrplCore)
include(XrplInstall)
include(XrplPackaging)
include(XrplValidatorKeys)
if(tests)

View File

@@ -259,6 +259,10 @@ There is a Continuous Integration job that runs clang-tidy on pull requests. The
This ensures that configuration changes don't introduce new warnings across the codebase.
### Installing clang-tidy
See the [environment setup guide](./docs/build/environment.md#clang-tidy) for platform-specific installation instructions.
### Running clang-tidy locally
Before running clang-tidy, you must build the project to generate required files (particularly protobuf headers). Refer to [`BUILD.md`](./BUILD.md) for build instructions.
@@ -266,10 +270,15 @@ Before running clang-tidy, you must build the project to generate required files
Then run clang-tidy on your local changes:
```
run-clang-tidy -p build src tests
run-clang-tidy -p build src include tests
```
This will check all source files in the `src` and `tests` directories using the compile commands from your `build` directory.
This will check all source files in the `src`, `include` and `tests` directories using the compile commands from your `build` directory.
If you wish to automatically fix whatever clang-tidy finds _and_ is capable of fixing, add `-fix` to the above command:
```
run-clang-tidy -p build -fix src include tests
```
## Contracts and instrumentation

View File

@@ -1,7 +1,7 @@
ISC License
Copyright (c) 2011, Arthur Britto, David Schwartz, Jed McCaleb, Vinnie Falco, Bob Way, Eric Lombrozo, Nikolaos D. Bougalis, Howard Hinnant.
Copyright (c) 2012-2025, the XRP Ledger developers.
Copyright (c) 2012-present, the XRP Ledger developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above

View File

@@ -12,14 +12,14 @@ if(is_root_project AND TARGET xrpld)
install(
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/xrpld-example.cfg"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
RENAME xrpld.cfg
COMPONENT runtime
)
install(
FILES "${CMAKE_CURRENT_SOURCE_DIR}/cfg/validators-example.txt"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}"
DESTINATION "${CMAKE_INSTALL_SYSCONFDIR}/xrpld"
RENAME validators.txt
COMPONENT runtime
)

View File

@@ -1,147 +0,0 @@
#[===================================================================[
Linux packaging support: RPM and Debian targets + install tests
#]===================================================================]
if(NOT CMAKE_INSTALL_PREFIX STREQUAL "/opt/xrpld")
message(
STATUS
"Packaging targets require -DCMAKE_INSTALL_PREFIX=/opt/xrpld "
"(current: '${CMAKE_INSTALL_PREFIX}'); skipping."
)
return()
endif()
# Generate the RPM spec from template (substitutes @xrpld_version@, @pkg_release@).
if(NOT DEFINED pkg_release)
set(pkg_release 1)
endif()
configure_file(
${CMAKE_SOURCE_DIR}/package/rpm/xrpld.spec.in
${CMAKE_BINARY_DIR}/package/rpm/xrpld.spec
@ONLY
)
find_program(RPMBUILD_EXECUTABLE rpmbuild)
if(RPMBUILD_EXECUTABLE)
add_custom_target(
package-rpm
COMMAND
${CMAKE_SOURCE_DIR}/package/build_pkg.sh rpm ${CMAKE_SOURCE_DIR}
${CMAKE_BINARY_DIR}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
COMMENT "Building RPM package"
VERBATIM
)
else()
message(STATUS "rpmbuild not found; 'package-rpm' target not available")
endif()
find_program(DPKG_BUILDPACKAGE_EXECUTABLE dpkg-buildpackage)
if(DPKG_BUILDPACKAGE_EXECUTABLE)
add_custom_target(
package-deb
COMMAND
${CMAKE_SOURCE_DIR}/package/build_pkg.sh deb ${CMAKE_SOURCE_DIR}
${CMAKE_BINARY_DIR} ${xrpld_version}
WORKING_DIRECTORY ${CMAKE_BINARY_DIR}
COMMENT "Building Debian package"
VERBATIM
)
else()
message(
STATUS
"dpkg-buildpackage not found; 'package-deb' target not available"
)
endif()
#[===================================================================[
CTest fixtures for package install verification (requires docker)
#]===================================================================]
find_program(DOCKER_EXECUTABLE docker)
if(NOT DOCKER_EXECUTABLE)
message(STATUS "docker not found; package install tests not available")
return()
endif()
set(DEB_TEST_IMAGE "geerlingguy/docker-ubuntu2204-ansible:latest")
set(RPM_TEST_IMAGE "geerlingguy/docker-rockylinux9-ansible:latest")
foreach(PKG deb rpm)
if(PKG STREQUAL "deb")
set(IMAGE ${DEB_TEST_IMAGE})
else()
set(IMAGE ${RPM_TEST_IMAGE})
endif()
# Fixture: start container
add_test(
NAME ${PKG}_container_start
COMMAND
sh -c
"docker rm -f xrpld_${PKG}_install_test 2>/dev/null || true && \
docker run --rm -d \
--name xrpld_${PKG}_install_test \
--memory=45g --memory-swap=45g \
--privileged \
--cgroupns host \
--volume '${CMAKE_SOURCE_DIR}:/root:ro' \
--volume /sys/fs/cgroup:/sys/fs/cgroup:rw \
--tmpfs /tmp --tmpfs /run --tmpfs /run/lock \
${IMAGE} \
/usr/sbin/init"
)
set_tests_properties(
${PKG}_container_start
PROPERTIES FIXTURES_SETUP ${PKG}_container LABELS packaging
)
# Fixture: stop container
# On CI: always stop. Locally: leave running on failure for diagnosis.
add_test(
NAME ${PKG}_container_stop
COMMAND
sh -c
"if [ -n \"$CI\" ] || ! docker exec xrpld_${PKG}_install_test test -f /tmp/test_failed 2>/dev/null; then \
docker rm -f xrpld_${PKG}_install_test; \
else \
echo 'Tests failed — leaving xrpld_${PKG}_install_test running for diagnosis'; \
echo 'Clean up with: docker rm -f xrpld_${PKG}_install_test'; \
fi"
)
set_tests_properties(
${PKG}_container_stop
PROPERTIES FIXTURES_CLEANUP ${PKG}_container LABELS packaging
)
# Install package and run smoke test
add_test(
NAME ${PKG}_install
COMMAND
docker exec -w /root xrpld_${PKG}_install_test bash
/root/package/test/smoketest.sh local
)
set_tests_properties(
${PKG}_install
PROPERTIES
FIXTURES_REQUIRED ${PKG}_container
FIXTURES_SETUP ${PKG}_installed
LABELS packaging
TIMEOUT 600
)
# Validate install paths and compat symlinks
add_test(
NAME ${PKG}_install_paths
COMMAND
docker exec -w /root xrpld_${PKG}_install_test sh
/root/package/test/check_install_paths.sh
)
set_tests_properties(
${PKG}_install_paths
PROPERTIES
FIXTURES_REQUIRED "${PKG}_container;${PKG}_installed"
LABELS packaging
TIMEOUT 60
)
endforeach()

View File

@@ -1,16 +1,16 @@
{
"version": "0.5",
"requires": [
"zlib/1.3.1#b8bc2603263cf7eccbd6e17e66b0ed76%1765850150.075",
"zlib/1.3.1#cac0f6daea041b0ccf42934163defb20%1765284699.337",
"xxhash/0.8.3#681d36a0a6111fc56e5e45ea182c19cc%1765850149.987",
"sqlite3/3.49.1#8631739a4c9b93bd3d6b753bac548a63%1765850149.926",
"soci/4.0.3#a9f8d773cd33e356b5879a4b0564f287%1765850149.46",
"snappy/1.1.10#968fef506ff261592ec30c574d4a7809%1765850147.878",
"secp256k1/0.7.1#3a61e95e220062ef32c48d019e9c81f7%1770306721.686",
"secp256k1/0.7.1#481881709eb0bdd0185a12b912bbe8ad%1770910500.329",
"rocksdb/10.5.1#4a197eca381a3e5ae8adf8cffa5aacd0%1765850186.86",
"re2/20230301#ca3b241baec15bd31ea9187150e0b333%1765850148.103",
"protobuf/6.32.1#f481fd276fc23a33b85a3ed1e898b693%1765850161.038",
"openssl/3.5.5#05a4ac5b7323f7a329b2db1391d9941f%1769599205.414",
"re2/20251105#8579cfd0bda4daf0683f9e3898f964b4%1772560729.95",
"protobuf/6.32.1#b54f00da2e0f61d821330b5b638b0f80%1768401317.762",
"openssl/3.5.5#e6399de266349245a4542fc5f6c71552%1774367199.56",
"nudb/2.0.9#0432758a24204da08fee953ec9ea03cb%1769436073.32",
"lz4/1.10.0#59fc63cac7f10fbe8e05c7e62c2f3504%1765850143.914",
"libiconv/1.17#1e65319e945f2d31941a9d28cc13c058%1765842973.492",
@@ -18,27 +18,27 @@
"libarchive/3.8.1#ffee18995c706e02bf96e7a2f7042e0d%1765850144.736",
"jemalloc/5.3.0#e951da9cf599e956cebc117880d2d9f8%1729241615.244",
"gtest/1.17.0#5224b3b3ff3b4ce1133cbdd27d53ee7d%1768312129.152",
"grpc/1.72.0#f244a57bff01e708c55a1100b12e1589%1765850193.734",
"grpc/1.72.0#aaade9421980b2d926dbfb613d56c38a%1774376249.106",
"ed25519/2015.03#ae761bdc52730a843f0809bdf6c1b1f6%1765850143.772",
"date/3.0.4#862e11e80030356b53c2c38599ceb32b%1765850143.772",
"c-ares/1.34.5#5581c2b62a608b40bb85d965ab3ec7c8%1765850144.336",
"c-ares/1.34.6#545240bb1c40e2cacd4362d6b8967650%1766500685.317",
"bzip2/1.0.8#c470882369c2d95c5c77e970c0c7e321%1765850143.837",
"boost/1.90.0#d5e8defe7355494953be18524a7f135b%1769454080.269",
"abseil/20250127.0#99262a368bd01c0ccca8790dfced9719%1766517936.993"
"abseil/20250127.0#bb0baf1f362bc4a725a24eddd419b8f7%1774365460.196"
],
"build_requires": [
"zlib/1.3.1#b8bc2603263cf7eccbd6e17e66b0ed76%1765850150.075",
"strawberryperl/5.32.1.1#707032463aa0620fa17ec0d887f5fe41%1765850165.196",
"protobuf/6.32.1#f481fd276fc23a33b85a3ed1e898b693%1765850161.038",
"zlib/1.3.1#cac0f6daea041b0ccf42934163defb20%1765284699.337",
"strawberryperl/5.32.1.1#8d114504d172cfea8ea1662d09b6333e%1751971032.423",
"protobuf/6.32.1#b54f00da2e0f61d821330b5b638b0f80%1768401317.762",
"nasm/2.16.01#31e26f2ee3c4346ecd347911bd126904%1765850144.707",
"msys2/cci.latest#eea83308ad7e9023f7318c60d5a9e6cb%1770199879.083",
"m4/1.4.19#70dc8bbb33e981d119d2acc0175cf381%1763158052.846",
"cmake/4.2.0#ae0a44f44a1ef9ab68fd4b3e9a1f8671%1765850153.937",
"cmake/3.31.10#313d16a1aa16bbdb2ca0792467214b76%1765850153.479",
"b2/5.3.3#107c15377719889654eb9a162a673975%1765850144.355",
"msys2/cci.latest#d22fe7b2808f5fd34d0a7923ace9c54f%1770657326.649",
"m4/1.4.19#5d7a4994e5875d76faf7acf3ed056036%1774365463.87",
"cmake/4.3.0#b939a42e98f593fb34d3a8c5cc860359%1773780142.26",
"cmake/3.31.11#f325c933f618a1fcebc1e1c0babfd1ba%1769622857.944",
"b2/5.4.2#ffd6084a119587e70f11cd45d1a386e2%1766594659.866",
"automake/1.16.5#b91b7c384c3deaa9d535be02da14d04f%1755524470.56",
"autoconf/2.71#51077f068e61700d65bb05541ea1e4b0%1731054366.86",
"abseil/20250127.0#99262a368bd01c0ccca8790dfced9719%1766517936.993"
"abseil/20250127.0#bb0baf1f362bc4a725a24eddd419b8f7%1774365460.196"
],
"python_requires": [],
"overrides": {
@@ -46,13 +46,13 @@
null,
"boost/1.90.0"
],
"protobuf/5.27.0": [
"protobuf/[>=5.27.0 <7]": [
"protobuf/6.32.1"
],
"lz4/1.9.4": [
"lz4/1.10.0"
],
"sqlite3/3.44.2": [
"sqlite3/[>=3.44 <4]": [
"sqlite3/3.49.1"
],
"boost/1.83.0": [

View File

@@ -1,5 +1,5 @@
import re
import os
import re
from conan.tools.cmake import CMake, CMakeToolchain, cmake_layout

View File

@@ -70,6 +70,7 @@ words:
- coeffs
- coldwallet
- compr
- cppcoro
- conanfile
- conanrun
- confs
@@ -93,14 +94,11 @@ words:
- desync
- desynced
- determ
- disablerepo
- distro
- doxyfile
- dxrpl
- enablerepo
- endmacro
- exceptioned
- EXPECT_STREQ
- Falco
- fcontext
- finalizers
@@ -108,6 +106,7 @@ words:
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -151,10 +150,10 @@ words:
- ltype
- mcmodel
- MEMORYSTATUSEX
- Mankawde
- Merkle
- Metafuncton
- misprediction
- missingok
- mptbalance
- MPTDEX
- mptflags
@@ -185,9 +184,7 @@ words:
- NOLINT
- NOLINTNEXTLINE
- nonxrp
- noreplace
- noripple
- notifempty
- nudb
- nullptr
- nunl
@@ -202,15 +199,16 @@ words:
- permissioned
- pointee
- populator
- pratik
- preauth
- preauthorization
- preauthorize
- preauthorizes
- preclaim
- preun
- protobuf
- protos
- ptrs
- Pratik
- pushd
- pyenv
- pyparsing
@@ -218,6 +216,8 @@ words:
- queuable
- Raphson
- replayer
- repost
- reposts
- rerere
- retriable
- RIPD
@@ -242,15 +242,14 @@ words:
- sfields
- shamap
- shamapitem
- shlibs
- sidechain
- SIGGOOD
- sle
- sles
- soci
- socidb
- SRPMS
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue
@@ -277,8 +276,8 @@ words:
- txn
- txns
- txs
- ubsan
- UBSAN
- ubsan
- umant
- unacquired
- unambiguity
@@ -306,6 +305,7 @@ words:
- venv
- vfalco
- vinnie
- wasmi
- wextra
- wptr
- writeme
@@ -314,6 +314,7 @@ words:
- xbridge
- xchain
- ximinez
- EXPECT_STREQ
- XMACRO
- xrpkuwait
- xrpl

View File

@@ -109,3 +109,32 @@ Install CMake with Homebrew too:
```
brew install cmake
```
## Clang-tidy
Clang-tidy is required to run static analysis checks locally (see [CONTRIBUTING.md](../../CONTRIBUTING.md)).
It is not required to build the project. Currently this project uses clang-tidy version 21.
### Linux
LLVM 21 is not available in the default Debian 12 (Bookworm) repositories.
Install it using the official LLVM apt installer:
```
wget https://apt.llvm.org/llvm.sh
chmod +x llvm.sh
sudo ./llvm.sh 21
sudo apt install --yes clang-tidy-21
```
Then use `run-clang-tidy-21` when running clang-tidy locally.
### macOS
Install LLVM 21 via Homebrew:
```
brew install llvm@21
```
Then use `run-clang-tidy` from the LLVM 21 Homebrew prefix when running clang-tidy locally.

View File

@@ -0,0 +1,699 @@
#pragma once
#include <xrpl/beast/utility/instrumentation.h>
#include <coroutine>
#include <exception>
#include <type_traits>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
/**
* CoroTask<void> -- coroutine return type for void-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<void>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - exception_ : std::exception_ptr |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_void() |
* | + unhandled_exception() |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter
* +-----------------------------------------------+
* | await_suspend(h): |
* | if continuation_ set -> symmetric transfer |
* | else -> noop_coroutine |
* +-----------------------------------------------+
*
* Design Notes
* ------------
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
* body does not execute until the handle is explicitly resumed.
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
* of void/bool, allowing the scheduler to jump directly to the next
* coroutine without growing the call stack.
* - Continuation chaining: when one CoroTask is co_await-ed inside
* another, the caller's handle is stored as continuation_ so
* FinalAwaiter can resume it when this task finishes.
* - Move-only: the handle is exclusively owned; copy is deleted.
*
* Usage Examples
* ==============
*
* 1. Basic void coroutine (the most common case in rippled):
*
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
* // do something
* co_await runner->suspend(); // yield control
* // resumed later via runner->post() or runner->resume()
* co_return;
* }
*
* 2. co_await-ing one CoroTask<void> from another (chaining):
*
* CoroTask<void> inner() {
* // ...
* co_return;
* }
* CoroTask<void> outer() {
* co_await inner(); // continuation_ links outer -> inner
* co_return; // FinalAwaiter resumes outer
* }
*
* 3. Exceptions propagate through co_await:
*
* CoroTask<void> failing() {
* throw std::runtime_error("oops");
* co_return;
* }
* CoroTask<void> caller() {
* try { co_await failing(); }
* catch (std::runtime_error const&) { // caught here }
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Dangling references in coroutine parameters.
* Coroutine parameters are copied into the frame, but references
* are NOT -- they are stored as-is. If the referent goes out of scope
* before the coroutine finishes, you get use-after-free.
*
* // BROKEN -- local dies before coroutine runs:
* CoroTask<void> bad(int& ref) { co_return; }
* void launch() {
* int local = 42;
* auto task = bad(local); // frame stores &local
* } // local destroyed; frame holds dangling ref
*
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
*
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
* When a lambda that returns CoroTask captures by reference ([&]),
* GCC 14 may generate a corrupted coroutine frame. Always capture
* by explicit pointer-to-value instead:
*
* // BROKEN on GCC 14:
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
*
* // FIX -- capture pointers explicitly:
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
*
* BUG-RISK: Resuming a destroyed or completed CoroTask.
* Calling handle().resume() after the coroutine has already run to
* completion (done() == true) is undefined behavior. The CoroTaskRunner
* guards against this with an XRPL_ASSERT, but standalone usage of
* CoroTask must check done() before resuming.
*
* BUG-RISK: Moving a CoroTask that is being awaited.
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
* or destroying A will invalidate the continuation link. Never move
* or reassign a CoroTask while it is mid-execution or being awaited.
*
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
* There is no built-in notification when the coroutine finishes.
* The caller must use external synchronization (e.g. CoroTaskRunner::join
* or a gate/condition_variable) to know when it is done.
*
* LIMITATION: No cancellation token.
* There is no way to cancel a suspended CoroTask from outside. The
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
* after each co_await and co_return early if needed.
*
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
* If a coroutine calls a regular function that wants to "yield", it
* cannot. Only the immediate coroutine body can use co_await.
* This is acceptable for rippled because all yield() sites are shallow.
*/
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise. Compiler uses this to manage coroutine state.
* Stores the exception (if any) and the continuation handle for
* symmetric transfer back to the awaiting coroutine.
*/
struct promise_type
{
// Captured exception from the coroutine body, rethrown in
// await_resume() when this task is co_await-ed by a caller.
std::exception_ptr exception_;
// Handle to the coroutine that is co_await-ing this task.
// Set by await_suspend(). FinalAwaiter uses it for symmetric
// transfer back to the caller. Null if this is a top-level task.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. The coroutine body does not execute until the
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Awaiter returned by final_suspend(). Uses symmetric transfer:
* if a continuation exists, transfers control directly to it
* (tail-call, no stack growth). Otherwise returns noop_coroutine
* so the coroutine frame stays alive for the owner to destroy.
*/
struct FinalAwaiter
{
/**
* Always false. We need await_suspend to run for
* symmetric transfer.
*/
bool
await_ready() noexcept
{
return false;
}
/**
* Symmetric transfer: returns the continuation handle so
* the compiler emits a tail-call instead of a nested resume.
* If no continuation is set, returns noop_coroutine to
* suspend at final_suspend without destroying the frame.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
/**
* Returns FinalAwaiter for symmetric transfer at coroutine end.
*/
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return;` (void coroutine).
*/
void
return_void()
{
}
/**
* Called by the compiler when an exception escapes the coroutine
* body. Captures it for later rethrowing in await_resume().
*/
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `co_await someCoroTask;` --
/**
* Always false. This task is lazy, so co_await always suspends
* the caller to set up the continuation link.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores the caller's handle as our continuation, then returns
* our handle for symmetric transfer (caller suspends, we resume).
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_suspend : handle is valid");
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
/**
* Called in the awaiting coroutine's context after this task
* completes. Rethrows any exception captured by
* unhandled_exception().
*/
void
await_resume()
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<void>::await_resume : handle is valid");
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
/**
* CoroTask<T> -- coroutine return type for value-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<T>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - result_ : variant<monostate, T, |
* | exception_ptr> |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_value(T) -> stores in result_[1] |
* | + unhandled_exception -> stores in result_[2] |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
*
* Value Extraction
* ----------------
* await_resume() inspects the variant:
* - index 2 (exception_ptr) -> rethrow
* - index 1 (T) -> return value via move
*
* Usage Examples
* ==============
*
* 1. Simple value return:
*
* CoroTask<int> computeAnswer() { co_return 42; }
*
* CoroTask<void> caller() {
* int v = co_await computeAnswer(); // v == 42
* }
*
* 2. Chaining value-returning coroutines:
*
* CoroTask<int> add(int a, int b) { co_return a + b; }
* CoroTask<int> doubleSum(int a, int b) {
* int s = co_await add(a, b);
* co_return s * 2;
* }
*
* 3. Exception propagation from inner to outer:
*
* CoroTask<int> failing() {
* throw std::runtime_error("bad");
* co_return 0; // never reached
* }
* CoroTask<void> caller() {
* try {
* int v = co_await failing(); // throws here
* } catch (std::runtime_error const& e) {
* // e.what() == "bad"
* }
* }
*
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
* ================================================================
*
* BUG-RISK: await_resume() moves the value out of the variant.
* Calling co_await on the same CoroTask<T> instance twice is undefined
* behavior -- the second call will see a moved-from T. CoroTask is
* single-shot: one co_return, one co_await.
*
* BUG-RISK: T must be move-constructible.
* return_value(T) takes by value and moves into the variant.
* Types that are not movable cannot be used as T.
*
* LIMITATION: No co_yield support.
* CoroTask<T> only supports a single co_return. It does not implement
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
* compile error. For streaming values, a different return type
* (e.g. Generator<T>) would be needed.
*
* LIMITATION: Result is only accessible via co_await.
* There is no .get() or .result() method. The value can only be
* extracted by co_await-ing the CoroTask<T> from inside another
* coroutine. For extracting results in non-coroutine code, pass a
* pointer to the caller and write through it (as the tests do).
*/
template <typename T>
class CoroTask
{
static_assert(
std::is_move_constructible_v<T>,
"CoroTask<T> requires T to be move-constructible");
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise for value-returning coroutines.
* Stores the result as a variant: monostate (not yet set),
* T (co_return value), or exception_ptr (unhandled exception).
*/
struct promise_type
{
// Tri-state result:
// index 0 (monostate) -- coroutine has not yet completed
// index 1 (T) -- co_return value stored here
// index 2 (exception) -- unhandled exception captured here
std::variant<std::monostate, T, std::exception_ptr> result_;
// Handle to the coroutine co_await-ing this task. Used by
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. Coroutine body does not run until explicitly resumed.
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Symmetric-transfer awaiter at coroutine completion.
* Same pattern as CoroTask<void>::FinalAwaiter.
*/
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
/**
* Returns continuation for symmetric transfer, or
* noop_coroutine if this is a top-level task.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return value;`.
* Moves the value into result_ at index 1.
*
* @param value The value to store
*/
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
/**
* Captures unhandled exceptions at index 2 of result_.
* Rethrown later in await_resume().
*/
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
/**
* Always false. co_await always suspends to set up continuation.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores caller as continuation, returns our handle for
* symmetric transfer.
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_suspend : handle is valid");
handle_.promise().continuation_ = caller;
return handle_;
}
/**
* Extracts the result: rethrows if exception, otherwise moves
* the T value out of the variant. Single-shot: calling twice
* on the same task is undefined (moved-from T).
*
* @return The co_return-ed value
*/
T
await_resume()
{
XRPL_ASSERT(handle_, "xrpl::CoroTask<T>::await_resume : handle is valid");
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
} // namespace xrpl

View File

@@ -0,0 +1,374 @@
#pragma once
/**
* @file CoroTaskRunner.ipp
*
* CoroTaskRunner inline implementation.
*
* This file contains the business logic for managing C++20 coroutines
* on the JobQueue. It is included at the bottom of JobQueue.h.
*
* Data Flow: suspend / post / resume cycle
* =========================================
*
* coroutine body CoroTaskRunner JobQueue
* -------------- -------------- --------
* |
* co_await runner->suspend()
* |
* +--- await_suspend ------> onSuspend()
* | ++nSuspend_ ------------> nSuspend_
* | [coroutine is now suspended]
* |
* . (externally or by yieldAndPost())
* .
* +--- (caller calls) -----> post()
* | ++runCount_
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
* | |
* +--- <----- resume() <-----------------------------------+
* | --nSuspend_ ------> nSuspend_
* | swap in LocalValues (lvs_)
* | task_.handle().resume()
* | |
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | --runCount_
* | cv_.notify_all()
* v
*
* Thread Safety
* =============
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
* all in-flight resume operations complete.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks sequentially (never held simultaneously):
* jq_.m_mutex (released immediately), then mutex_ (held across resume),
* then mutex_run_ (released after decrement). post() acquires only
* mutex_run_. Any new code path must follow the same order.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
* If you remove sp, 'this' can be destroyed before the job runs,
* causing use-after-free. The sp capture is load-bearing.
*
* 3. Forgetting to decrement nSuspend_ on a new code path.
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
* suspension path (e.g. a new awaiter) and forget to decrement on resume
* or on failure, JobQueue::stop() will hang.
*
* 4. Calling task_.handle().resume() without holding mutex_.
* This allows a race where the coroutine runs on two threads
* simultaneously. Always hold mutex_ around resume().
*
* 5. Swapping LocalValues outside of the mutex_ critical section.
* The swap-in and swap-out of LocalValues must bracket the resume()
* call. If you move the swap-out before the lock_guard(mutex_) is
* released, you break LocalValue isolation for any code that runs
* after the coroutine suspends but before the lock is dropped.
*/
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), runCount_(0)
{
}
/**
* Initialize with a coroutine-returning callable.
* Stores the callable on the heap (FuncStore) so it outlives the
* coroutine frame. Coroutine frames store a reference to the
* callable's implicit object parameter (the lambda). If the callable
* is a temporary, that reference dangles after the caller returns.
* Keeping the callable alive here ensures the coroutine's captures
* remain valid.
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
using Fn = std::decay_t<F>;
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
task_ = store->func(shared_from_this());
storedFunc_ = std::move(store);
}
/**
* Destructor. Waits for any in-flight resume() to complete, then
* asserts (debug) that the coroutine has finished or
* expectEarlyExit() was called.
*
* The join() call is necessary because with async dispatch the
* coroutine runs on a worker thread. The gate signal (which wakes
* the test thread) can arrive before resume() has set finished_.
* join() synchronizes via mutex_run_, establishing a happens-before
* edge: finished_ = true -> unlock(mutex_run_) in resume() ->
* lock(mutex_run_) in join() -> read finished_.
*/
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
join();
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
*/
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
/**
* Decrement nSuspend_ without resuming.
*/
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
/**
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
* before the coroutine actually suspends. The caller must later call
* post() or resume() to continue execution.
*
* @return Awaiter for use with `co_await runner->suspend()`
*/
inline auto
JobQueue::CoroTaskRunner::suspend()
{
/**
* Custom awaiter for suspend(). Always suspends (await_ready
* returns false) and increments nSuspend_ in await_suspend().
*/
struct SuspendAwaiter
{
CoroTaskRunner& runner_; // The runner that owns this coroutine.
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Called when the coroutine suspends. Increments nSuspend_
* so the JobQueue knows a coroutine is waiting.
*/
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
/**
* Suspend and immediately repost on the JobQueue. Equivalent to
* `co_await JobQueueAwaiter{runner}` but uses an inline struct
* to work around a GCC-12 codegen bug (see declaration in JobQueue.h).
*
* If the JobQueue is stopping (post fails), the suspend count is
* undone and the coroutine is resumed immediately via h.resume().
*
* @return An inline YieldPostAwaiter
*/
inline auto
JobQueue::CoroTaskRunner::yieldAndPost()
{
struct YieldPostAwaiter
{
CoroTaskRunner& runner_;
bool
await_ready() const noexcept
{
return false;
}
void
await_suspend(std::coroutine_handle<> h)
{
runner_.onSuspend();
if (!runner_.post())
{
runner_.onUndoSuspend();
h.resume();
}
}
void
await_resume() const noexcept
{
}
};
return YieldPostAwaiter{*this};
}
/**
* Schedule coroutine resumption as a job on the JobQueue.
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
* destroyed while the job is queued but not yet executed.
*
* @return false if the JobQueue rejected the job (shutting down)
*/
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
++runCount_;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Undo the runCount_ increment.
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
return false;
}
/**
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Decrement nSuspend_ (under jq_.m_mutex)
* 2. Swap in this coroutine's LocalValues for thread-local isolation
* 3. Resume the coroutine handle (under mutex_)
* 4. Swap out LocalValues, restoring the thread's previous state
* 5. Decrement runCount_ and notify join() waiters
*
* @pre post() must have been called before resume(). Direct calls
* without a prior post() will corrupt runCount_ and break join().
* Note: runCount_ is NOT incremented here — post() already did that.
* This ensures join() stays blocked for the entire post->resume lifetime.
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(
task_.handle() && !task_.done(),
"xrpl::JobQueue::CoroTaskRunner::resume : task handle is valid and not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
if (task_.done())
{
finished_ = true;
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
// frame is destroyed. operator= would destroy the frame while handle_
// still holds the old value -- a re-entrancy hazard on GCC-12 if
// frame destruction triggers runner cleanup.
[[maybe_unused]] auto completed = std::move(task_);
}
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
}
/**
* @return true if the coroutine has not yet run to completion
*/
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
// After normal completion, task_ is reset to break the shared_ptr cycle
// (handle_ becomes null). A null handle means the coroutine is done.
return task_.handle() && !task_.done();
}
/**
* Handle early termination when the coroutine never ran (e.g. JobQueue
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
*/
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
if (!finished_)
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
finished_ = true;
}
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Use std::move (not task_ = {}) so
// task_.handle_ is null before the frame is destroyed.
{
[[maybe_unused]] auto completed = std::move(task_);
}
storedFunc_.reset();
}
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0 or
* finished_ becomes true. The finished_ check handles the case
* where resume() is called directly (without post()), which
* decrements runCount_ below zero. In that scenario runCount_
* never returns to 0, but finished_ becoming true guarantees
* the coroutine is done and no more resumes will occur.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return runCount_ == 0 || finished_; });
}
} // namespace xrpl

View File

@@ -2,6 +2,7 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
@@ -10,6 +11,7 @@
#include <boost/context/protected_fixedsize_stack.hpp>
#include <boost/coroutine2/all.hpp>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -120,6 +122,419 @@ public:
join();
};
/** C++20 coroutine lifecycle manager. Replaces Coro for new code.
*
* Class / Inheritance / Dependency Diagram
* =========================================
*
* std::enable_shared_from_this<CoroTaskRunner>
* ^
* | (public inheritance)
* |
* CoroTaskRunner
* +---------------------------------------------------+
* | - lvs_ : detail::LocalValues |
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - runCount_ : int (in-flight resumes) |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
* | - task_ : CoroTask<void> |
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
* +---------------------------------------------------+
* | + init(F&&) : set up coroutine callable |
* | + onSuspend() : ++jq_.nSuspend_ |
* | + onUndoSuspend() : --jq_.nSuspend_ |
* | + suspend() : returns SuspendAwaiter |
* | + post() : schedule resume on JobQueue |
* | + resume() : resume coroutine on caller |
* | + runnable() : !task_.done() |
* | + expectEarlyExit() : teardown for failed post |
* | + join() : block until not running |
* +---------------------------------------------------+
* | |
* | owns | references
* v v
* CoroTask<void> JobQueue
* (coroutine frame) (thread pool + nSuspend_)
*
* FuncBase / FuncStore<F> (type-erased heap storage
* for the coroutine lambda)
*
* Coroutine Lifecycle (Control Flow)
* ===================================
*
* Caller thread JobQueue worker thread
* ------------- ----------------------
* postCoroTask(f)
* |
* +-- check stopping_ (reject if JQ shutting down)
* +-- ++nSuspend_ (lazy start counts as suspended)
* +-- make_shared<CoroTaskRunner>
* +-- init(f)
* | +-- store lambda on heap (FuncStore)
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- post()
* | +-- ++runCount_
* | +-- addJob(type_, [resume]{})
* | resume()
* | |
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
* | | [coroutine body runs]
* | | ...
* | | co_await suspend()
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- --runCount_
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by yieldAndPost()
* +-- ++runCount_
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- --runCount_
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{runCount_ == 0})
* +-- [done]
*
* Usage Examples
* ==============
*
* 1. Fire-and-forget coroutine (most common pattern):
*
* jq.postCoroTask(jtCLIENT, "MyWork",
* [](auto runner) -> CoroTask<void> {
* doSomeWork();
* co_await runner->suspend(); // yield to other jobs
* doMoreWork();
* co_return;
* });
*
* 2. Manually controlling suspend / resume (external trigger):
*
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
* [&result](auto runner) -> CoroTask<void> {
* startAsyncOperation(callback);
* co_await runner->suspend();
* // callback called runner->post() to get here
* result = collectResult();
* co_return;
* });
* // ... later, from the callback:
* runner->post(); // reschedule the coroutine on the JobQueue
*
* 3. Using yieldAndPost() for automatic suspend + repost:
*
* jq.postCoroTask(jtCLIENT, "AutoRepost",
* [](auto runner) -> CoroTask<void> {
* step1();
* co_await runner->yieldAndPost(); // yield + auto-repost
* step2();
* co_await runner->yieldAndPost();
* step3();
* co_return;
* });
*
* 4. Checking shutdown after co_await (cooperative cancellation):
*
* jq.postCoroTask(jtCLIENT, "Cancellable",
* [&jq](auto runner) -> CoroTask<void> {
* while (moreWork()) {
* co_await runner->yieldAndPost();
* if (jq.isStopping())
* co_return; // bail out cleanly
* processNextItem();
* }
* co_return;
* });
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Calling suspend() without a matching post()/resume().
* After co_await runner->suspend(), the coroutine is parked and
* nSuspend_ is incremented. If nothing ever calls post() or
* resume(), the coroutine is leaked and JobQueue::stop() will
* hang forever waiting for nSuspend_ to reach zero.
*
* BUG-RISK: Calling post() on an already-running coroutine.
* post() schedules a resume() job. If the coroutine has not
* actually suspended yet (no co_await executed), the resume job
* will try to call handle().resume() while the coroutine is still
* running on another thread. This is UB. The mutex_ prevents
* data corruption but the logic is wrong — always co_await
* suspend() before calling post(). (The test incorrect_order()
* shows this works only because mutex_ serializes the calls.)
*
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
* The CoroTaskRunner destructor asserts that finished_ is true
* (the coroutine completed). If you let the last shared_ptr die
* while the coroutine is still running or suspended, you get an
* assertion failure in debug and UB in release. Always call
* join() or expectEarlyExit() first.
*
* BUG-RISK: Lambda captures outliving the coroutine frame.
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
* to prevent dangling. But objects captured by pointer still need
* their own lifetime management. If you capture a raw pointer to
* a stack variable, and the stack frame exits before the coroutine
* finishes, the pointer dangles. Use shared_ptr or ensure the
* pointed-to object outlives the coroutine.
*
* BUG-RISK: Forgetting co_return in a void coroutine.
* If the coroutine body falls off the end without co_return,
* the compiler may silently treat it as co_return (per standard),
* but some compilers warn. Always write explicit co_return.
*
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
* The task_ member is CoroTask<void>. To return values from
* the top-level coroutine, write through a captured pointer
* (as the tests demonstrate), or co_await inner CoroTask<T>
* coroutines that return values.
*
* LIMITATION: One coroutine per CoroTaskRunner.
* init() must be called exactly once. You cannot reuse a
* CoroTaskRunner to run a second coroutine. Create a new one
* via postCoroTask() instead.
*
* LIMITATION: No timeout on join().
* join() blocks indefinitely. If the coroutine is suspended
* and never posted, join() will deadlock. Use timed waits
* on the gate pattern (condition_variable + wait_for) in tests.
*/
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
// Per-coroutine thread-local storage. Swapped in before resume()
// and swapped out after, so each coroutine sees its own LocalValue
// state regardless of which worker thread executes it.
detail::LocalValues lvs_;
// Back-reference to the owning JobQueue. Used to post jobs,
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
JobQueue& jq_;
// Job type passed to addJob() when posting this coroutine.
JobType type_;
// Human-readable name for this coroutine job (for logging).
std::string name_;
// Number of in-flight resume operations (pending + active).
// Incremented by post(), decremented when resume() finishes.
// Guarded by mutex_run_. join() blocks until this reaches 0.
//
// A counter (not a bool) is needed because post() can be called
// from within the coroutine body (e.g. via yieldAndPost()),
// enqueuing a second resume while the first is still running.
// A bool would be clobbered: R2.post() sets true, then R1's
// cleanup sets false — losing the fact that R2 is still pending.
int runCount_;
// Serializes all coroutine resume() calls, preventing concurrent
// execution of the coroutine body on multiple threads. Handles the
// race where post() enqueues a resume before the coroutine has
// actually suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards runCount_. Used with cv_ for join() to wait
// until all pending/active resume operations complete.
std::mutex mutex_run_;
// Notified when runCount_ reaches zero, allowing
// join() waiters to wake up.
std::condition_variable cv_;
// The coroutine handle wrapper. Owns the coroutine frame.
// Set by init(). Reset to empty in resume() upon coroutine
// completion (to break the shared_ptr cycle) or in
// expectEarlyExit() on early termination.
CoroTask<void> task_;
/**
* Type-erased base for heap-stored callables.
* Prevents the coroutine lambda from being destroyed before
* the coroutine frame is done with it.
*
* @see FuncStore
*/
struct FuncBase
{
virtual ~FuncBase() = default;
};
/**
* Concrete type-erased storage for a callable of type F.
* The coroutine frame stores a reference to the lambda's implicit
* object parameter. If the lambda is a temporary, that reference
* dangles after the call returns. FuncStore keeps it alive on
* the heap for the lifetime of the CoroTaskRunner.
*/
template <class F>
struct FuncStore : FuncBase
{
F func; // The stored callable (coroutine lambda).
explicit FuncStore(F&& f) : func(std::move(f))
{
}
};
// Heap-allocated callable storage. Set by init(), ensures the
// lambda outlives the coroutine frame that references it.
std::unique_ptr<FuncBase> storedFunc_;
// True once the coroutine has completed or expectEarlyExit() was
// called. Asserted in the destructor (debug) to catch leaked
// runners. Available in all builds to guard expectEarlyExit()
// against double-decrementing nSuspend_.
bool finished_ = false;
public:
/**
* Tag type for private construction. Prevents external code
* from constructing CoroTaskRunner directly. Use postCoroTask().
*/
struct create_t
{
explicit create_t() = default;
};
/**
* Construct a CoroTaskRunner. Private by convention (create_t tag).
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
~CoroTaskRunner();
/**
* Initialize with a coroutine-returning callable.
* Must be called exactly once, after the object is managed by
* shared_ptr (because init uses shared_from_this internally).
* This is handled automatically by postCoroTask().
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
init(F&& f);
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
* Called when the coroutine is about to suspend. Every call
* must be balanced by a corresponding decrement (via resume()
* or onUndoSuspend()), or JobQueue::stop() will hang.
*/
void
onSuspend();
/**
* Decrement nSuspend_ without resuming.
* Used to undo onSuspend() when a scheduled post() fails
* (e.g. JobQueue is stopping).
*/
void
onUndoSuspend();
/**
* Suspend the coroutine.
* The awaiter's await_suspend() increments nSuspend_ before the
* coroutine actually suspends. The caller must later call post()
* or resume() to continue execution.
*
* @return An awaiter for use with `co_await runner->suspend()`
*/
auto
suspend();
/**
* Suspend the coroutine and immediately repost it on the
* JobQueue. Combines suspend() + post() atomically inside
* await_suspend, so there is no window where an external
* event could race between the two.
*
* Equivalent to JobQueueAwaiter but defined as an inline
* awaiter returned from a member function. This avoids a
* GCC-12 coroutine codegen bug where an external awaiter
* struct (JobQueueAwaiter) used at multiple co_await points
* corrupts the coroutine state machine's resume index,
* causing the coroutine to hang on the third resumption.
*
* @return An awaiter for use with `co_await runner->yieldAndPost()`
*/
auto
yieldAndPost();
/**
* Schedule coroutine resumption as a job on the JobQueue.
* Captures shared_from_this() to prevent this runner from being
* destroyed while the job is queued.
*
* @return true if the job was accepted; false if the JobQueue
* is stopping (caller must handle cleanup)
*/
bool
post();
/**
* Resume the coroutine on the current thread.
* Decrements nSuspend_, swaps in LocalValues, resumes the
* coroutine handle, swaps out LocalValues, and notifies join()
* waiters. Lock ordering (sequential, non-overlapping):
* jq_.m_mutex -> mutex_ -> mutex_run_.
*
* @pre post() must have been called before resume(). Direct
* calls without a prior post() will corrupt runCount_
* and break join().
*/
void
resume();
/**
* @return true if the coroutine has not yet run to completion
*/
bool
runnable() const;
/**
* Handle early termination when the coroutine never ran.
* Decrements nSuspend_ and destroys the coroutine frame to
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
* Called by postCoroTask() when post() fails.
*/
void
expectEarlyExit();
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
void
join();
};
using JobFunction = std::function<void()>;
JobQueue(
@@ -166,6 +581,19 @@ public:
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
int
@@ -380,6 +808,7 @@ private:
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
@@ -402,4 +831,69 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
return coro;
}
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
//
// Control Flow
// ============
//
// postCoroTask(t, name, f)
// |
// +-- 1. Check stopping_ — reject if JQ shutting down
// |
// +-- 2. ++nSuspend_ (mirrors Boost Coro ctor's implicit yield)
// | The coroutine is "suspended" from the JobQueue's perspective
// | even though it hasn't run yet — this keeps the JQ shutdown
// | logic correct (it waits for nSuspend_ to reach 0).
// |
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
// |
// +-- 4. runner->init(f)
// | +-- Heap-allocate the lambda (FuncStore) to prevent
// | | dangling captures in the coroutine frame
// | +-- task_ = f(shared_from_this())
// | [coroutine created but NOT started — lazy initial_suspend]
// |
// +-- 5. runner->post()
// | +-- addJob(type_, [resume]{}) → resume on worker thread
// | +-- failure (JQ stopping):
// | +-- runner->expectEarlyExit()
// | | --nSuspend_, destroy coroutine frame
// | +-- return nullptr
//
// Why async post() instead of synchronous resume()?
// ==================================================
// The initial dispatch MUST use async post() so the coroutine body runs on
// a JobQueue worker thread, not the caller's thread. resume() swaps the
// caller's thread-local LocalValues with the coroutine's private copy.
// If the coroutine mutates LocalValues (e.g. thread_specific_storage test),
// those mutations bleed back into the caller's thread-local state after the
// swap-out, corrupting subsequent tests that share the same thread pool.
// Async post() avoids this by running the coroutine on a worker thread whose
// LocalValues are managed by the thread pool, not by the caller.
//
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
// Reject if the JQ is shutting down — matches addJob()'s stopping_ check.
// Must check before incrementing nSuspend_ to avoid leaving an orphan
// count that would cause stop() to hang.
if (stopping_)
return nullptr;
{
std::lock_guard lock(m_mutex);
++nSuspend_;
}
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -0,0 +1,206 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/**
* Awaiter that suspends and immediately reschedules on the JobQueue.
* Equivalent to calling yield() followed by post() in the old Coro API.
*
* Usage:
* co_await JobQueueAwaiter{runner};
*
* What it waits for: The coroutine is re-queued as a job and resumes
* when a worker thread picks it up.
*
* Which thread resumes: A JobQueue worker thread.
*
* What await_resume() returns: void.
*
* Dependency Diagram
* ==================
*
* JobQueueAwaiter
* +----------------------------------------------+
* | + runner : shared_ptr<CoroTaskRunner> |
* +----------------------------------------------+
* | + await_ready() -> false (always suspend) |
* | + await_suspend() -> bool (suspend or cancel) |
* | + await_resume() -> void |
* +----------------------------------------------+
* | |
* | uses | uses
* v v
* CoroTaskRunner JobQueue
* .onSuspend() (via runner->post() -> addJob)
* .onUndoSuspend()
* .post()
*
* Control Flow (await_suspend)
* ============================
*
* co_await JobQueueAwaiter{runner}
* |
* +-- await_ready() -> false
* +-- await_suspend(handle)
* |
* +-- runner->onSuspend() // ++nSuspend_
* +-- runner->post() // addJob to JobQueue
* | |
* | +-- success? return noop_coroutine()
* | | // coroutine stays suspended;
* | | // worker thread will call resume()
* | +-- failure? (JQ stopping)
* | +-- runner->onUndoSuspend() // --nSuspend_
* | +-- return handle // symmetric transfer back
* | // coroutine continues immediately
* | // so it can clean up and co_return
*
* DEPRECATED — prefer `co_await runner->yieldAndPost()`
* =====================================================
*
* GCC-12 has a coroutine codegen bug where using this external awaiter
* struct at multiple co_await points in the same coroutine corrupts the
* state machine's resume index. After the second co_await, the third
* resumption enters handle().resume() but never reaches await_resume()
* or any subsequent user code — the coroutine hangs indefinitely.
*
* The fix is `co_await runner->yieldAndPost()`, which defines the
* awaiter as an inline struct inside a CoroTaskRunner member function.
* GCC-12 handles inline awaiters correctly at multiple co_await points.
*
* This struct is retained for single-use scenarios and documentation
* purposes. For any code that may use co_await in a loop or at
* multiple points, always use `runner->yieldAndPost()`.
*
* Usage Examples
* ==============
*
* 1. Yield and auto-repost (preferred — works on all compilers):
*
* CoroTask<void> handler(auto runner) {
* doPartA();
* co_await runner->yieldAndPost(); // yield + repost
* doPartB(); // runs on a worker thread
* co_return;
* }
*
* 2. Multiple yield points in a loop:
*
* CoroTask<void> batchProcessor(auto runner) {
* for (auto& item : items) {
* process(item);
* co_await runner->yieldAndPost(); // let other jobs run
* }
* co_return;
* }
*
* 3. Graceful shutdown — checking after resume:
*
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
* while (hasWork()) {
* co_await runner->yieldAndPost();
* // If JQ is stopping, await_suspend resumes the coroutine
* // immediately without re-queuing. Always check
* // isStopping() to decide whether to proceed:
* if (jq.isStopping())
* co_return;
* doNextChunk();
* }
* co_return;
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Using a stale or null runner.
* The runner shared_ptr must be valid and point to the CoroTaskRunner
* that owns the coroutine currently executing. Passing a runner from
* a different coroutine, or a default-constructed shared_ptr, is UB.
*
* BUG-RISK: Assuming resume happens on the same thread.
* After co_await, the coroutine resumes on whatever worker thread
* picks up the job. Do not rely on thread-local state unless it is
* managed through LocalValue (which CoroTaskRunner automatically
* swaps in/out).
*
* BUG-RISK: Ignoring the shutdown path.
* When the JobQueue is stopping, post() fails and await_suspend()
* resumes the coroutine immediately (symmetric transfer back to h).
* The coroutine body continues on the same thread. If your code
* after co_await assumes it was re-queued and is running on a worker
* thread, that assumption breaks during shutdown. Always handle the
* "JQ is stopping" case, either by checking jq.isStopping() or by
* letting the coroutine fall through to co_return naturally.
*
* DIFFERENCE from runner->suspend() + runner->post():
* Both JobQueueAwaiter and yieldAndPost() combine suspend + post
* in one atomic operation. With the manual suspend()/post() pattern,
* there is a window between the two calls where an external event
* could race. The atomic awaiters remove that window — onSuspend()
* and post() happen within the same await_suspend() call while the
* coroutine is guaranteed to be suspended. Use yieldAndPost() unless
* you need an external party to decide *when* to call post().
*/
struct JobQueueAwaiter
{
// The CoroTaskRunner that owns the currently executing coroutine.
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Increment nSuspend (equivalent to yield()) and schedule resume
* on the JobQueue (equivalent to post()). If the JobQueue is
* stopping, undoes the suspend count and transfers back to the
* coroutine so it can clean up and co_return.
*
* Returns a coroutine_handle<> (symmetric transfer) instead of
* bool to work around a GCC-12 codegen bug where bool-returning
* await_suspend leaves the coroutine in an invalid state —
* neither properly suspended nor resumed — causing a hang.
*
* WARNING: GCC-12 has an additional codegen bug where using this
* external awaiter struct at multiple co_await points in the same
* coroutine corrupts the state machine's resume index, causing the
* coroutine to hang on the third resumption. Prefer
* `co_await runner->yieldAndPost()` which uses an inline awaiter
* that GCC-12 handles correctly.
*
* @return noop_coroutine() to stay suspended (job posted);
* the caller's handle to resume immediately (JQ stopping)
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> h)
{
XRPL_ASSERT(runner, "xrpl::JobQueueAwaiter::await_suspend : runner is valid");
runner->onSuspend();
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// transfer back to the coroutine so it can clean up
// and co_return.
runner->onUndoSuspend();
return h;
}
return std::noop_coroutine();
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -1,118 +0,0 @@
# Linux Packaging
This directory contains all files needed to build RPM and Debian packages for `xrpld`.
## Directory layout
```
package/
build_pkg.sh Staging and build script (called by CMake targets and CI)
rpm/
xrpld.spec.in RPM spec template (substitutes @xrpld_version@, @pkg_release@)
deb/
debian/ Debian control files (control, rules, install, links, conffiles, ...)
shared/
xrpld.service systemd unit file (used by both RPM and DEB)
xrpld.sysusers sysusers.d config (used by both RPM and DEB)
xrpld.tmpfiles tmpfiles.d config (used by both RPM and DEB)
xrpld.logrotate logrotate config (installed to /opt/xrpld/bin/, user activates)
update-xrpld.sh auto-update script (installed to /opt/xrpld/bin/)
update-xrpld-cron cron entry for auto-update (installed to /opt/xrpld/bin/)
test/
smoketest.sh Package install smoke test
check_install_paths.sh Verify install paths and compat symlinks
```
## Prerequisites
| Package type | Container | Tool required |
| ------------ | -------------------------------------- | --------------------------------------------------------------- |
| RPM | `ghcr.io/xrplf/ci/rhel-9:gcc-12` | `rpmbuild` |
| DEB | `ghcr.io/xrplf/ci/ubuntu-jammy:gcc-12` | `dpkg-buildpackage`, `debhelper (>= 13)`, `dh-sequence-systemd` |
## Building packages
### Via CI (recommended)
The `reusable-package.yml` workflow downloads a pre-built `xrpld` binary artifact
and calls `build_pkg.sh` directly. No CMake configure or build step is needed in
the packaging job.
### Via CMake (local development)
Configure with the required install prefix, then invoke the target:
```bash
cmake \
-DCMAKE_INSTALL_PREFIX=/opt/xrpld \
-Dxrpld=ON \
-Dtests=OFF \
..
# RPM (in RHEL container):
cmake --build . --target package-rpm
# DEB (in Debian/Ubuntu container):
cmake --build . --target package-deb
```
The `cmake/XrplPackaging.cmake` module gates each target on whether the required
tool (`rpmbuild` / `dpkg-buildpackage`) is present at configure time, so
configuring on a host that lacks one simply omits the corresponding target.
`CMAKE_INSTALL_PREFIX` must be `/opt/xrpld`; if it is not, both targets are
skipped with a `STATUS` message.
## How `build_pkg.sh` works
`build_pkg.sh <pkg_type> <src_dir> <build_dir> [version] [pkg_release]` stages
all files and invokes the platform build tool. It resolves `src_dir` and
`build_dir` to absolute paths, then calls `stage_common()` to copy the binary,
config files, and shared support files into the staging area.
### RPM
1. Creates the standard `rpmbuild/{BUILD,BUILDROOT,RPMS,SOURCES,SPECS,SRPMS}` tree inside the build directory.
2. Copies the generated `xrpld.spec` and all source files (binary, configs, service files) into `SOURCES/`.
3. Runs `rpmbuild -bb`. The spec uses manual `install` commands to place files.
4. Output: `rpmbuild/RPMS/x86_64/xrpld-*.rpm`
### DEB
1. Creates a staging source tree at `debbuild/source/` inside the build directory.
2. Stages the binary, configs, `README.md`, and `LICENSE.md`.
3. Copies `package/deb/debian/` control files into `debbuild/source/debian/`.
4. Copies shared service/sysusers/tmpfiles into `debian/` where `dh_installsystemd`, `dh_installsysusers`, and `dh_installtmpfiles` pick them up automatically.
5. Generates a minimal `debian/changelog` (pre-release versions use `~` instead of `-`).
6. Runs `dpkg-buildpackage -b --no-sign`. `debian/rules` uses manual `install` commands.
7. Output: `debbuild/*.deb` and `debbuild/*.ddeb` (dbgsym package)
## Post-build verification
```bash
# DEB
dpkg-deb -c debbuild/*.deb | grep -E 'systemd|sysusers|tmpfiles'
lintian -I debbuild/*.deb
# RPM
rpm -qlp rpmbuild/RPMS/x86_64/*.rpm
```
## Reproducibility
The following environment variables improve build reproducibility. They are not
set automatically by `build_pkg.sh`; set them manually if needed:
```bash
export SOURCE_DATE_EPOCH=$(git log -1 --pretty=%ct)
export TZ=UTC
export LC_ALL=C.UTF-8
export GZIP=-n
export DEB_BUILD_OPTIONS="noautodbgsym reproducible=+fixfilepath"
```
## TODO
- Port debsigs signing instructions and integrate into CI.
- Port RPM GPG signing setup (key import + `%{?_gpg_sign}` in spec).
- Introduce a virtual package for key rotation.

View File

@@ -1,91 +0,0 @@
#!/usr/bin/env bash
# Build an RPM or Debian package from a pre-built xrpld binary.
#
# Usage: build_pkg.sh <pkg_type> <src_dir> <build_dir> [version] [pkg_release]
# pkg_type : rpm | deb
# src_dir : path to repository root
# build_dir : directory containing the pre-built xrpld binary
# version : package version string (e.g. 2.4.0-b1)
# pkg_release : package release number (default: 1)
set -euo pipefail
PKG_TYPE="${1:?pkg_type required}"
SRC_DIR="$(cd "${2:?src_dir required}" && pwd)"
BUILD_DIR="$(cd "${3:?build_dir required}" && pwd)"
VERSION="${4:-1.0.0}"
PKG_RELEASE="${5:-1}"
SHARED="${SRC_DIR}/package/shared"
# Stage files common to both package types into a target directory.
stage_common() {
local dest="$1"
cp "${BUILD_DIR}/xrpld" "${dest}/xrpld"
cp "${SRC_DIR}/cfg/xrpld-example.cfg" "${dest}/xrpld.cfg"
cp "${SRC_DIR}/cfg/validators-example.txt" "${dest}/validators.txt"
cp "${SHARED}/xrpld.logrotate" "${dest}/xrpld.logrotate"
cp "${SHARED}/update-xrpld.sh" "${dest}/update-xrpld.sh"
cp "${SHARED}/update-xrpld-cron" "${dest}/update-xrpld-cron"
}
build_rpm() {
local topdir="${BUILD_DIR}/rpmbuild"
mkdir -p "${topdir}"/{BUILD,BUILDROOT,RPMS,SOURCES,SPECS,SRPMS}
cp "${BUILD_DIR}/package/rpm/xrpld.spec" "${topdir}/SPECS/xrpld.spec"
stage_common "${topdir}/SOURCES"
cp "${SHARED}/xrpld.service" "${topdir}/SOURCES/xrpld.service"
cp "${SHARED}/xrpld.sysusers" "${topdir}/SOURCES/xrpld.sysusers"
cp "${SHARED}/xrpld.tmpfiles" "${topdir}/SOURCES/xrpld.tmpfiles"
set -x
rpmbuild -bb \
--define "_topdir ${topdir}" \
"${topdir}/SPECS/xrpld.spec"
}
build_deb() {
local staging="${BUILD_DIR}/debbuild/source"
rm -rf "${staging}"
mkdir -p "${staging}"
stage_common "${staging}"
cp "${SRC_DIR}/README.md" "${staging}/"
cp "${SRC_DIR}/LICENSE.md" "${staging}/"
# debian/ control files
cp -r "${SRC_DIR}/package/deb/debian" "${staging}/debian"
# Shared support files for dh_installsystemd / sysusers / tmpfiles
cp "${SHARED}/xrpld.service" "${staging}/debian/xrpld.service"
cp "${SHARED}/xrpld.sysusers" "${staging}/debian/xrpld.sysusers"
cp "${SHARED}/xrpld.tmpfiles" "${staging}/debian/xrpld.tmpfiles"
# Generate debian/changelog (pre-release versions use ~ instead of -).
local deb_version="${VERSION//-/\~}"
# TODO: Add facility for generating the changelog
cat > "${staging}/debian/changelog" <<EOF
xrpld (${deb_version}-${PKG_RELEASE}) unstable; urgency=medium
* Release ${VERSION}.
-- XRPL Foundation <contact@xrplf.org> $(LC_ALL=C date -u -R)
EOF
chmod +x "${staging}/debian/rules"
set -x
cd "${staging}"
dpkg-buildpackage -b --no-sign -d
}
case "${PKG_TYPE}" in
rpm) build_rpm ;;
deb) build_deb ;;
*)
echo "Unknown package type: ${PKG_TYPE}" >&2
exit 1
;;
esac

View File

@@ -1,33 +0,0 @@
Source: xrpld
Section: net
Priority: optional
Maintainer: XRPL Foundation <contact@xrpl.org>
Rules-Requires-Root: no
Build-Depends:
debhelper-compat (= 13),
Standards-Version: 4.7.0
Homepage: https://github.com/XRPLF/rippled
Vcs-Git: https://github.com/XRPLF/rippled.git
Vcs-Browser: https://github.com/XRPLF/rippled
Package: xrpld
Section: net
Priority: optional
Architecture: any
Depends:
${shlibs:Depends},
${misc:Depends}
Description: XRP Ledger daemon
xrpld is the reference implementation of the XRP Ledger protocol.
It participates in the peer-to-peer XRP Ledger network, processes
transactions, and maintains the ledger database.
Package: rippled
Architecture: all
Section: oldlibs
Priority: optional
Depends: xrpld, ${misc:Depends}
Description: transitional package - use xrpld
The rippled package has been renamed to xrpld. This transitional
package ensures a smooth upgrade and can be safely removed after
xrpld is installed.

View File

@@ -1,20 +0,0 @@
Format: http://www.debian.org/doc/packaging-manuals/copyright-format/1.0/
Upstream-Name: rippled
Source: https://github.com/XRPLF/rippled
Files: *
Copyright: 2012-2025 Ripple Labs Inc.
License: ISC
License: ISC
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.

View File

@@ -1,37 +0,0 @@
#!/usr/bin/make -f
export DH_VERBOSE = 1
export DH_OPTIONS = -v
%:
dh $@
override_dh_auto_configure override_dh_auto_build override_dh_auto_test:
@:
override_dh_auto_install:
install -Dm0755 xrpld debian/tmp/opt/xrpld/bin/xrpld
install -Dm0644 xrpld.cfg debian/tmp/opt/xrpld/etc/xrpld.cfg
install -Dm0644 validators.txt debian/tmp/opt/xrpld/etc/validators.txt
install -Dm0644 xrpld.logrotate debian/tmp/opt/xrpld/bin/xrpld.logrotate
install -Dm0755 update-xrpld.sh debian/tmp/opt/xrpld/bin/update-xrpld.sh
install -Dm0644 update-xrpld-cron debian/tmp/opt/xrpld/bin/update-xrpld-cron
install -Dm0644 README.md debian/tmp/usr/share/doc/xrpld/README.md
install -Dm0644 LICENSE.md debian/tmp/usr/share/doc/xrpld/LICENSE.md
override_dh_installsystemd:
dh_installsystemd
# see if this still works
# dh_installsystemd --no-start
override_dh_installsysusers:
dh_installsysusers
override_dh_installtmpfiles:
dh_installtmpfiles
override_dh_install:
dh_install
override_dh_dwz:
@:

View File

@@ -1 +0,0 @@
3.0 (quilt)

View File

@@ -1,2 +0,0 @@
/opt/xrpld/etc/xrpld.cfg
/opt/xrpld/etc/validators.txt

View File

@@ -1,10 +0,0 @@
opt/xrpld/bin/xrpld
opt/xrpld/bin/xrpld.logrotate
opt/xrpld/bin/update-xrpld.sh
opt/xrpld/bin/update-xrpld-cron
opt/xrpld/etc/xrpld.cfg
opt/xrpld/etc/validators.txt
usr/share/doc/xrpld/README.md
usr/share/doc/xrpld/LICENSE.md

View File

@@ -1,13 +0,0 @@
opt/xrpld/etc etc/opt/xrpld
opt/xrpld/bin/xrpld usr/bin/xrpld
## remove when "rippled" deprecated
opt/xrpld/bin/xrpld opt/xrpld/bin/rippled
opt/xrpld/bin/xrpld usr/bin/rippled
opt/xrpld/bin/xrpld usr/local/bin/rippled
opt/xrpld/etc/xrpld.cfg opt/xrpld/etc/rippled.cfg
var/log/xrpld var/log/rippled
var/lib/xrpld var/lib/rippled
opt/xrpld opt/ripple
etc/opt/xrpld etc/opt/ripple

View File

@@ -1,113 +0,0 @@
%global xrpld_version @xrpld_version@
%global pkg_release @pkg_release@
%global _opt_prefix /opt/xrpld
%global ver_base %(v=%{xrpld_version}; echo ${v%%-*})
%global _has_dash %(v=%{xrpld_version}; [ "${v#*-}" != "$v" ] && echo 1 || echo 0)
%if 0%{?_has_dash}
%global ver_suffix %(v=%{xrpld_version}; printf %s "${v#*-}")
%endif
Name: xrpld
Version: %{ver_base}
Release: %{?ver_suffix:0.%{ver_suffix}.}%{pkg_release}%{?dist}
Summary: XRP Ledger daemon
License: ISC
URL: https://github.com/XRPLF/rippled
Source0: xrpld
Source1: xrpld.cfg
Source2: validators.txt
Source3: xrpld.service
Source4: xrpld.sysusers
Source5: xrpld.tmpfiles
Source6: xrpld.logrotate
Source7: update-xrpld.sh
Source8: update-xrpld-cron
BuildArch: x86_64
BuildRequires: systemd-rpm-macros
%undefine _debugsource_packages
%debug_package
%{?systemd_requires}
%{?sysusers_requires_compat}
%description
xrpld is the reference implementation of the XRP Ledger protocol. It
participates in the peer-to-peer XRP Ledger network, processes
transactions, and maintains the ledger database.
%install
rm -rf %{buildroot}
# Suppress debugsource subpackage — no source files in the build tree.
touch %{_builddir}/debugsourcefiles.list
# Install binary and config files.
install -Dm0755 %{SOURCE0} %{buildroot}%{_opt_prefix}/bin/xrpld
install -Dm0644 %{SOURCE1} %{buildroot}%{_opt_prefix}/etc/xrpld.cfg
install -Dm0644 %{SOURCE2} %{buildroot}%{_opt_prefix}/etc/validators.txt
# Compatibility symlinks (matches debian/xrpld.links).
mkdir -p %{buildroot}/etc/opt %{buildroot}/usr/bin %{buildroot}/usr/local/bin \
%{buildroot}/var/log %{buildroot}/var/lib
ln -s %{_opt_prefix}/etc %{buildroot}/etc/opt/xrpld
ln -s %{_opt_prefix}/bin/xrpld %{buildroot}/usr/bin/xrpld
## remove when "rippled" deprecated
ln -s xrpld %{buildroot}%{_opt_prefix}/bin/rippled
ln -s %{_opt_prefix}/bin/xrpld %{buildroot}/usr/bin/rippled
ln -s %{_opt_prefix}/bin/xrpld %{buildroot}/usr/local/bin/rippled
ln -s xrpld.cfg %{buildroot}%{_opt_prefix}/etc/rippled.cfg
ln -s %{_opt_prefix} %{buildroot}/opt/ripple
ln -s /etc/opt/xrpld %{buildroot}/etc/opt/ripple
ln -s xrpld %{buildroot}/var/log/rippled
ln -s xrpld %{buildroot}/var/lib/rippled
# Install systemd/sysusers/tmpfiles support files.
install -Dm0644 %{SOURCE3} %{buildroot}%{_unitdir}/xrpld.service
install -Dm0644 %{SOURCE4} %{buildroot}%{_sysusersdir}/xrpld.conf
install -Dm0644 %{SOURCE5} %{buildroot}%{_tmpfilesdir}/xrpld.conf
install -Dm0644 %{SOURCE6} %{buildroot}%{_opt_prefix}/bin/xrpld.logrotate
install -Dm0755 %{SOURCE7} %{buildroot}%{_opt_prefix}/bin/update-xrpld.sh
install -Dm0644 %{SOURCE8} %{buildroot}%{_opt_prefix}/bin/update-xrpld-cron
%pre
%sysusers_create_compat %{SOURCE4}
%post
%systemd_post xrpld.service
%preun
%systemd_preun xrpld.service
%postun
%systemd_postun_with_restart xrpld.service
%files
%dir %{_opt_prefix}
%dir %{_opt_prefix}/bin
%{_opt_prefix}/bin/xrpld
%{_opt_prefix}/bin/xrpld.logrotate
%{_opt_prefix}/bin/update-xrpld.sh
%{_opt_prefix}/bin/update-xrpld-cron
%{_opt_prefix}/bin/rippled
/usr/bin/xrpld
/usr/bin/rippled
/usr/local/bin/rippled
%dir %{_opt_prefix}/etc
%config(noreplace) %{_opt_prefix}/etc/xrpld.cfg
%config(noreplace) %{_opt_prefix}/etc/validators.txt
%{_opt_prefix}/etc/rippled.cfg
/etc/opt/xrpld
/etc/opt/ripple
/opt/ripple
%{_unitdir}/xrpld.service
%{_sysusersdir}/xrpld.conf
%{_tmpfilesdir}/xrpld.conf
/var/log/rippled
/var/lib/rippled
%ghost %dir /var/opt/ripple
%ghost %dir /var/opt/ripple/lib
%ghost %dir /var/opt/ripple/log

View File

@@ -1,9 +0,0 @@
# For automatic updates, symlink this file to /etc/cron.d/
# Do not remove the newline at the end of this cron script
# bash required for use of RANDOM below.
SHELL=/bin/bash
PATH=/sbin;/bin;/usr/sbin;/usr/bin
# invoke check/update script with random delay up to 59 mins
0 * * * * root sleep $((RANDOM*3540/32768)) && /opt/xrpld/bin/update-xrpld.sh

View File

@@ -1,64 +0,0 @@
#!/usr/bin/env bash
# auto-update script for xrpld daemon
# Check for sudo/root permissions
if [[ $(id -u) -ne 0 ]] ; then
echo "This update script must be run as root or sudo"
exit 1
fi
LOCKDIR=/tmp/xrpld-update.lock
UPDATELOG=/var/log/xrpld/update.log
function cleanup {
# If this directory isn't removed, future updates will fail.
rmdir $LOCKDIR
}
# Use mkdir to check if process is already running. mkdir is atomic, as against file create.
if ! mkdir $LOCKDIR 2>/dev/null; then
echo $(date -u) "lockdir exists - won't proceed." >> $UPDATELOG
exit 1
fi
trap cleanup EXIT
source /etc/os-release
can_update=false
if [[ "$ID" == "ubuntu" || "$ID" == "debian" ]] ; then
# Silent update
apt-get update -qq
# The next line is an "awk"ward way to check if the package needs to be updated.
XRPLD=$(apt-get install -s --only-upgrade xrpld | awk '/^Inst/ { print $2 }')
test "$XRPLD" == "xrpld" && can_update=true
function apply_update {
apt-get install xrpld -qq
}
elif [[ "$ID" == "fedora" || "$ID" == "centos" || "$ID" == "rhel" || "$ID" == "scientific" ]] ; then
RIPPLE_REPO=${RIPPLE_REPO-stable}
yum --disablerepo=* --enablerepo=ripple-$RIPPLE_REPO clean expire-cache
yum check-update -q --enablerepo=ripple-$RIPPLE_REPO xrpld || can_update=true
function apply_update {
yum update -y --enablerepo=ripple-$RIPPLE_REPO xrpld
}
else
echo "unrecognized distro!"
exit 1
fi
# Do the actual update and restart the service after reloading systemctl daemon.
if [ "$can_update" = true ] ; then
exec 3>&1 1>>${UPDATELOG} 2>&1
set -e
apply_update
systemctl daemon-reload
systemctl restart xrpld.service
echo $(date -u) "xrpld daemon updated."
else
echo $(date -u) "no updates available" >> $UPDATELOG
fi

View File

@@ -1,15 +0,0 @@
/var/log/xrpld/*.log {
daily
minsize 200M
rotate 7
nocreate
missingok
notifempty
compress
compresscmd /usr/bin/nice
compressoptions -n19 ionice -c3 gzip
compressext .gz
postrotate
/opt/xrpld/bin/xrpld --conf /etc/opt/xrpld/xrpld.cfg logrotate
endscript
}

View File

@@ -1,15 +0,0 @@
[Unit]
Description=XRP Ledger Daemon
After=network-online.target
Wants=network-online.target
[Service]
Type=simple
ExecStart=/opt/xrpld/bin/xrpld --net --silent --conf /etc/opt/xrpld/xrpld.cfg
Restart=on-failure
User=xrpld
Group=xrpld
LimitNOFILE=65536
[Install]
WantedBy=multi-user.target

View File

@@ -1 +0,0 @@
u xrpld - "XRP Ledger daemon" /var/lib/xrpld /sbin/nologin

View File

@@ -1,2 +0,0 @@
d /var/opt/ripple/lib 0750 xrpld xrpld -
d /var/opt/ripple/log 0750 xrpld xrpld -

View File

@@ -1,50 +0,0 @@
#!/usr/bin/env sh
# Validate installed paths and compat symlinks for xrpld packages.
set -e
set -x
trap 'test $? -ne 0 && touch /tmp/test_failed' EXIT
check() { test $1 "$2" || { echo "FAIL: $1 $2"; exit 1; }; }
check_resolves_to() {
actual=$(readlink -f "$1")
[ "$actual" = "$2" ] || { echo "FAIL: $1 resolves to $actual, expected $2"; exit 1; }
}
# var dirs (compat symlinks)
check -L /var/log/rippled
check -L /var/lib/rippled
# compat directory symlinks — existence and resolved target
check -L /opt/ripple
check_resolves_to /opt/ripple /opt/xrpld
check -L /etc/opt/xrpld
check_resolves_to /etc/opt/xrpld /opt/xrpld/etc
check -L /etc/opt/ripple
check_resolves_to /etc/opt/ripple /opt/xrpld/etc
# config accessible via all expected paths
check -f /opt/xrpld/etc/xrpld.cfg
check -f /opt/xrpld/etc/rippled.cfg
check -f /etc/opt/xrpld/xrpld.cfg
check -f /etc/opt/xrpld/rippled.cfg
check -f /etc/opt/ripple/xrpld.cfg
check -f /etc/opt/ripple/rippled.cfg
if systemctl is-system-running >/dev/null 2>&1; then
# service file sanity check
SERVICE=$(systemctl cat xrpld)
echo "$SERVICE" | grep -q 'ExecStart=/opt/xrpld/bin/xrpld' || { echo "FAIL: ExecStart wrong"; echo "$SERVICE"; exit 1; }
echo "$SERVICE" | grep -q 'User=xrpld' || { echo "FAIL: User not xrpld"; echo "$SERVICE"; exit 1; }
fi
# binary accessible via all expected paths
/opt/xrpld/bin/xrpld --version
/opt/xrpld/bin/rippled --version
/opt/ripple/bin/xrpld --version
/opt/ripple/bin/rippled --version
/usr/bin/xrpld --version
/usr/bin/rippled --version
/usr/local/bin/rippled --version

View File

@@ -1,76 +0,0 @@
#!/usr/bin/env bash
# Install a locally-built package and run basic verification.
#
# Usage: smoketest.sh local
# Expects packages in build/{dpkg,rpm}/packages/ or build/debbuild/ / build/rpmbuild/RPMS/
set -x
trap 'test $? -ne 0 && touch /tmp/test_failed' EXIT
install_from=$1
. /etc/os-release
case ${ID} in
ubuntu|debian)
pkgtype="dpkg"
;;
fedora|centos|rhel|rocky|almalinux)
pkgtype="rpm"
;;
*)
echo "unrecognized distro!"
exit 1
;;
esac
if [ "${install_from}" != "local" ]; then
echo "only 'local' install mode is supported"
exit 1
fi
# Install the package
if [ "${pkgtype}" = "dpkg" ] ; then
apt-get -y update
# Find .deb files — check both possible output locations
debs=$(find build/debbuild/ build/dpkg/packages/ -name '*.deb' ! -name '*dbgsym*' 2>/dev/null | head -5)
if [ -z "$debs" ]; then
echo "No .deb files found"
exit 1
fi
dpkg --no-debsig -i $debs || apt-get -y install -f
elif [ "${pkgtype}" = "rpm" ] ; then
# Find .rpm files — check both possible output locations
rpms=$(find build/rpmbuild/RPMS/ build/rpm/packages/ -name '*.rpm' \
! -name '*debug*' ! -name '*devel*' ! -name '*.src.rpm' 2>/dev/null | head -5)
if [ -z "$rpms" ]; then
echo "No .rpm files found"
exit 1
fi
rpm -i $rpms
fi
# Verify installed version
VERSION_OUTPUT=$(/opt/xrpld/bin/xrpld --version)
INSTALLED=$(echo "$VERSION_OUTPUT" | head -1 | awk '{print $NF}')
echo "Installed version: ${INSTALLED}"
# Run unit tests
if [ -n "${CI:-}" ]; then
unittest_jobs=$(nproc)
else
unittest_jobs=16
fi
cd /tmp
/opt/xrpld/bin/xrpld --unittest --unittest-jobs ${unittest_jobs} > /tmp/unittest_results || true
cd -
num_failures=$(tail /tmp/unittest_results -n1 | grep -oP '\d+(?= failures)')
if [ "${num_failures:-0}" -ne 0 ]; then
echo "$num_failures unit test(s) failed:"
grep 'failed:' /tmp/unittest_results
exit 1
fi
# Compat path checks
"$(dirname "${BASH_SOURCE[0]}")/check_install_paths.sh"

View File

@@ -433,8 +433,7 @@ doWithdraw(
j) < amount)
{
// LCOV_EXCL_START
JLOG(j.error()) << "LoanBrokerCoverWithdraw: negative balance of "
"broker cover assets.";
JLOG(j.error()) << "doWithdraw: negative balance of broker cover assets.";
return tefINTERNAL;
// LCOV_EXCL_STOP
}

View File

@@ -69,7 +69,7 @@ forEachItem(
for (auto const& key : sle->getFieldV256(sfIndexes))
f(view.read(keylet::child(key)));
auto const next = sle->getFieldU64(sfIndexNext);
if (!next)
if (next == 0u)
return;
pos = keylet::page(root, next);
}

View File

@@ -1,6 +1,7 @@
#include <xrpl/ledger/helpers/MPTokenHelpers.h>
//
#include <xrpl/basics/Log.h>
#include <xrpl/ledger/View.h>
#include <xrpl/ledger/helpers/AccountRootHelpers.h>
#include <xrpl/ledger/helpers/CredentialHelpers.h>
#include <xrpl/ledger/helpers/DirectoryHelpers.h>
@@ -12,21 +13,6 @@
namespace xrpl {
// Forward declarations for functions that remain in View.h/cpp
bool
isVaultPseudoAccountFrozen(
ReadView const& view,
AccountID const& account,
MPTIssue const& mptShare,
int depth);
[[nodiscard]] TER
dirLink(
ApplyView& view,
AccountID const& owner,
std::shared_ptr<SLE>& object,
SF_UINT64 const& node = sfOwnerNode);
bool
isGlobalFrozen(ReadView const& view, MPTIssue const& mptIssue)
{
@@ -83,7 +69,7 @@ transferRate(ReadView const& view, MPTID const& issuanceID)
// which represents 50% of 1,000,000,000
if (auto const sle = view.read(keylet::mptIssuance(issuanceID));
sle && sle->isFieldPresent(sfTransferFee))
return Rate{1'000'000'000u + 10'000 * sle->getFieldU16(sfTransferFee)};
return Rate{1'000'000'000u + (10'000 * sle->getFieldU16(sfTransferFee))};
return parityRate;
}
@@ -149,7 +135,7 @@ authorizeMPToken(
// When a holder wants to unauthorize/delete a MPT, the ledger must
// - delete mptokenKey from owner directory
// - delete the MPToken
if (flags & tfMPTUnauthorize)
if ((flags & tfMPTUnauthorize) != 0)
{
auto const mptokenKey = keylet::mptoken(mptIssuanceID, account);
auto const sleMpt = view.peek(mptokenKey);
@@ -229,7 +215,7 @@ authorizeMPToken(
// Issuer wants to unauthorize the holder, unset lsfMPTAuthorized on
// their MPToken
if (flags & tfMPTUnauthorize)
if ((flags & tfMPTUnauthorize) != 0)
{
flagsOut &= ~lsfMPTAuthorized;
}
@@ -490,7 +476,7 @@ canTransfer(
if (!sleIssuance)
return tecOBJECT_NOT_FOUND;
if (!(sleIssuance->getFieldU32(sfFlags) & lsfMPTCanTransfer))
if (!sleIssuance->isFlag(lsfMPTCanTransfer))
{
if (from != (*sleIssuance)[sfIssuer] && to != (*sleIssuance)[sfIssuer])
return TER{tecNO_AUTH};

View File

@@ -224,10 +224,10 @@ trustCreate(
bSetHigh ? sfLowLimit : sfHighLimit,
STAmount(Issue{saBalance.getCurrency(), bSetDst ? uSrcAccountID : uDstAccountID}));
if (uQualityIn)
if (uQualityIn != 0u)
sleRippleState->setFieldU32(bSetHigh ? sfHighQualityIn : sfLowQualityIn, uQualityIn);
if (uQualityOut)
if (uQualityOut != 0u)
sleRippleState->setFieldU32(bSetHigh ? sfHighQualityOut : sfLowQualityOut, uQualityOut);
std::uint32_t uFlags = bSetHigh ? lsfHighReserve : lsfLowReserve;
@@ -327,16 +327,16 @@ updateTrustLine(
// Sender balance was positive.
&& after <= beast::zero
// Sender is zero or negative.
&& (flags & (!bSenderHigh ? lsfLowReserve : lsfHighReserve))
&& ((flags & (!bSenderHigh ? lsfLowReserve : lsfHighReserve)) != 0u)
// Sender reserve is set.
&& static_cast<bool>(flags & (!bSenderHigh ? lsfLowNoRipple : lsfHighNoRipple)) !=
static_cast<bool>(sle->getFlags() & lsfDefaultRipple) &&
!(flags & (!bSenderHigh ? lsfLowFreeze : lsfHighFreeze)) &&
((flags & (!bSenderHigh ? lsfLowFreeze : lsfHighFreeze)) == 0u) &&
!state->getFieldAmount(!bSenderHigh ? sfLowLimit : sfHighLimit)
// Sender trust limit is 0.
&& !state->getFieldU32(!bSenderHigh ? sfLowQualityIn : sfHighQualityIn)
&& (state->getFieldU32(!bSenderHigh ? sfLowQualityIn : sfHighQualityIn) == 0u)
// Sender quality in is 0.
&& !state->getFieldU32(!bSenderHigh ? sfLowQualityOut : sfHighQualityOut))
&& (state->getFieldU32(!bSenderHigh ? sfLowQualityOut : sfHighQualityOut) == 0u))
// Sender quality out is 0.
{
// VFALCO Where is the line being deleted?
@@ -348,7 +348,7 @@ updateTrustLine(
// Balance is zero, receiver reserve is clear.
if (!after // Balance is zero.
&& !(flags & (bSenderHigh ? lsfLowReserve : lsfHighReserve)))
&& ((flags & (bSenderHigh ? lsfLowReserve : lsfHighReserve)) == 0u))
return true;
}
return false;
@@ -539,11 +539,12 @@ requireAuth(ReadView const& view, Issue const& issue, AccountID const& account,
// If this is a weak or legacy check, or if the account has a line, fail if
// auth is required and not set on the line
if (auto const issuerAccount = view.read(keylet::account(issue.account));
issuerAccount && (*issuerAccount)[sfFlags] & lsfRequireAuth)
issuerAccount && (((*issuerAccount)[sfFlags] & lsfRequireAuth) != 0u))
{
if (trustLine)
{
return ((*trustLine)[sfFlags] & ((account > issue.account) ? lsfLowAuth : lsfHighAuth))
return (((*trustLine)[sfFlags] &
((account > issue.account) ? lsfLowAuth : lsfHighAuth)) != 0u)
? tesSUCCESS
: TER{tecNO_AUTH};
}
@@ -575,7 +576,7 @@ canTransfer(ReadView const& view, Issue const& issue, AccountID const& from, Acc
bool const issuerHigh = issuerId > account;
return line->isFlag(issuerHigh ? lsfHighNoRipple : lsfLowNoRipple);
}
return sleIssuer->isFlag(lsfDefaultRipple) == false;
return !sleIssuer->isFlag(lsfDefaultRipple);
};
// Fail if rippling disabled on both trust lines
@@ -748,7 +749,7 @@ deleteAMMTrustLine(
}
auto const uFlags = !ammLow ? lsfLowReserve : lsfHighReserve;
if (!(sleState->getFlags() & uFlags))
if ((sleState->getFlags() & uFlags) == 0u)
return tecINTERNAL; // LCOV_EXCL_LINE
adjustOwnerCount(view, !ammLow ? sleLow : sleHigh, -1, j);

View File

@@ -570,17 +570,18 @@ rippleCreditIOU(
// Sender balance was positive.
&& saBalance <= beast::zero
// Sender is zero or negative.
&& (uFlags & (!bSenderHigh ? lsfLowReserve : lsfHighReserve))
&& ((uFlags & (!bSenderHigh ? lsfLowReserve : lsfHighReserve)) != 0u)
// Sender reserve is set.
&& static_cast<bool>(uFlags & (!bSenderHigh ? lsfLowNoRipple : lsfHighNoRipple)) !=
static_cast<bool>(
view.read(keylet::account(uSenderID))->getFlags() & lsfDefaultRipple) &&
!(uFlags & (!bSenderHigh ? lsfLowFreeze : lsfHighFreeze)) &&
((uFlags & (!bSenderHigh ? lsfLowFreeze : lsfHighFreeze)) == 0u) &&
!sleRippleState->getFieldAmount(!bSenderHigh ? sfLowLimit : sfHighLimit)
// Sender trust limit is 0.
&& !sleRippleState->getFieldU32(!bSenderHigh ? sfLowQualityIn : sfHighQualityIn)
&& (sleRippleState->getFieldU32(!bSenderHigh ? sfLowQualityIn : sfHighQualityIn) == 0u)
// Sender quality in is 0.
&& !sleRippleState->getFieldU32(!bSenderHigh ? sfLowQualityOut : sfHighQualityOut))
&&
(sleRippleState->getFieldU32(!bSenderHigh ? sfLowQualityOut : sfHighQualityOut) == 0u))
// Sender quality out is 0.
{
// Clear the reserve of the sender, possibly delete the line!
@@ -592,7 +593,7 @@ rippleCreditIOU(
// Balance is zero, receiver reserve is clear.
bDelete = !saBalance // Balance is zero.
&& !(uFlags & (bSenderHigh ? lsfLowReserve : lsfHighReserve));
&& ((uFlags & (bSenderHigh ? lsfLowReserve : lsfHighReserve)) == 0u);
// Receiver reserve is clear.
}

View File

@@ -92,7 +92,7 @@ sharesToAssetsWithdraw(
std::shared_ptr<SLE const> const& issuance,
STAmount const& shares)
{
XRPL_ASSERT(!shares.negative(), "xrpl::sharesToAssetsDeposit : non-negative shares");
XRPL_ASSERT(!shares.negative(), "xrpl::sharesToAssetsWithdraw : non-negative shares");
XRPL_ASSERT(
shares.asset() == vault->at(sfShareMPTID),
"xrpl::sharesToAssetsWithdraw : shares and vault match");

View File

@@ -100,8 +100,6 @@ PermissionedDomainSet::doApply()
Keylet const pdKeylet =
keylet::permissionedDomain(account_, ctx_.tx.getFieldU32(sfSequence));
auto slePd = std::make_shared<SLE>(pdKeylet);
if (!slePd)
return tefINTERNAL; // LCOV_EXCL_LINE
slePd->setAccountID(sfOwner, account_);
slePd->setFieldU32(sfSequence, ctx_.tx.getFieldU32(sfSequence));

View File

@@ -55,7 +55,7 @@ VaultDelete::preclaim(PreclaimContext const& ctx)
if (!sleMPT)
{
// LCOV_EXCL_START
JLOG(ctx.j.error()) << "VaultDeposit: missing issuance of vault shares.";
JLOG(ctx.j.error()) << "VaultDelete: missing issuance of vault shares.";
return tecOBJECT_NOT_FOUND;
// LCOV_EXCL_STOP
}
@@ -63,7 +63,7 @@ VaultDelete::preclaim(PreclaimContext const& ctx)
if (sleMPT->at(sfIssuer) != vault->getAccountID(sfAccount))
{
// LCOV_EXCL_START
JLOG(ctx.j.error()) << "VaultDeposit: invalid owner of vault shares.";
JLOG(ctx.j.error()) << "VaultDelete: invalid owner of vault shares.";
return tecNO_PERMISSION;
// LCOV_EXCL_STOP
}
@@ -200,8 +200,6 @@ VaultDelete::doApply()
// Destroy the vault.
view().erase(vault);
associateAsset(*vault, asset);
return tesSUCCESS;
}

View File

@@ -8,6 +8,7 @@
#include <xrpld/rpc/detail/Tuning.h>
#include <xrpl/beast/unit_test.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/protocol/ApiVersion.h>
@@ -131,7 +132,6 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -155,11 +155,11 @@ public:
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;
@@ -242,28 +242,27 @@ public:
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
Json::Value result;
gate g;
// Test RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_src_cur source currencies.
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), RPC::Tuning::max_src_cur + 1);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));
@@ -271,22 +270,22 @@ public:
// Test RPC::Tuning::max_auto_src_cur source currencies.
for (auto i = 0; i < (RPC::Tuning::max_auto_src_cur - 1); ++i)
env.trust(Account("alice")[std::to_string(i + 100)](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(!result.isMember(jss::error));
// Test more than RPC::Tuning::max_auto_src_cur source currencies.
env.trust(Account("alice")["AUD"](100), "bob");
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = rpf(Account("alice"), Account("bob"), 0);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(result.isMember(jss::error));

View File

@@ -50,7 +50,7 @@ class io_latency_probe_test : public beast::unit_test::suite, public beast::test
bool done = false;
boost::system::error_code wait_err;
while (--num_samples)
while (--num_samples > 0u)
{
auto const start{MeasureClock::now()};
done = false;

View File

@@ -0,0 +1,537 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
/**
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
* and JobQueueAwaiter.
*
* Dependency Diagram
* ==================
*
* CoroTask_test
* +-------------------------------------------------+
* | + gate (inner class) : condition_variable helper |
* +-------------------------------------------------+
* | uses
* v
* jtx::Env --> JobQueue::postCoroTask()
* |
* +-- CoroTaskRunner (suspend / post / resume)
* +-- CoroTask<void> / CoroTask<T>
* +-- JobQueueAwaiter
*
* Test Coverage Matrix
* ====================
*
* Test | Primitives exercised
* --------------------------+----------------------------------------------
* testVoidCompletion | CoroTask<void> basic lifecycle
* testCorrectOrder | suspend() -> join() -> post() -> complete
* testIncorrectOrder | post() before suspend() (race-safe path)
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
* testThreadSpecificStorage | LocalValue isolation across coroutines
* testExceptionPropagation | unhandled_exception() in promise_type
* testMultipleYields | N sequential suspend/resume cycles
* testValueReturn | CoroTask<T> co_return value
* testValueException | CoroTask<T> exception via co_await
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
* testShutdownRejection | postCoroTask returns nullptr when stopping
*/
class CoroTask_test : public beast::unit_test::suite
{
public:
/**
* Simple one-shot gate for synchronizing between test thread
* and coroutine worker threads. signal() sets the flag;
* wait_for() blocks until signaled or timeout.
*/
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
/**
* Block until signaled or timeout expires.
*
* @param rel_time Maximum duration to wait
*
* @return true if signaled before timeout
*/
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
/**
* Signal the gate, waking any waiting thread.
*/
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
/**
* CoroTask<void> runs to completion and runner becomes non-runnable.
*/
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* Correct order: suspend, join, post, complete.
* Mirrors existing Coroutine_test::correct_order.
*/
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*rp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
/**
* Incorrect order: post() before suspend(). Verifies the
* race-safe path. Mirrors Coroutine_test::incorrect_order.
*/
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
/**
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
*/
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
*sp = 1;
co_await runner->yieldAndPost();
*sp = 2;
co_await runner->yieldAndPost();
*sp = 3;
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
/**
* Per-coroutine LocalValue isolation. Each coroutine sees its own
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
*/
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
/**
* Exception thrown in coroutine body is caught by
* promise_type::unhandled_exception(). Coroutine completes.
*/
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
/**
* Multiple sequential suspend/resume cycles via co_await.
*/
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
*rp = runner;
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> returns a value via co_return. Outer coroutine
* extracts it with co_await.
*/
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
*rp = co_await inner();
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> propagates exceptions from inner coroutines.
* Outer coroutine catches via try/catch around co_await.
*/
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
*cp = true;
}
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> chaining. Nested value-returning coroutines
* compose via co_await.
*/
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [add](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
*rp = co_await mul(3, 4);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
/**
* postCoroTask returns nullptr when JobQueue is stopping.
*/
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -6,6 +6,7 @@
#include <xrpld/rpc/RPCHandler.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/protocol/ApiVersion.h>
#include <xrpl/protocol/STParsedJSON.h>
#include <xrpl/resource/Fees.h>
@@ -198,7 +199,6 @@ AMMTest::find_paths_request(
c,
Role::USER,
{},
{},
RPC::apiVersionIfUnspecified},
{},
{}};
@@ -220,11 +220,11 @@ AMMTest::find_paths_request(
Json::Value result;
gate g;
app.getJobQueue().postCoro(jtCLIENT, "RPC-Client", [&](auto const& coro) {
app.getJobQueue().postCoroTask(jtCLIENT, "RPC-Client", [&](auto) -> CoroTask<void> {
context.params = std::move(params);
context.coro = coro;
RPC::doCommand(context, result);
g.signal();
co_return;
});
using namespace std::chrono_literals;

View File

@@ -1438,7 +1438,6 @@ ApplicationImp::setup(boost::program_options::variables_map const& cmdline)
c,
Role::ADMIN,
{},
{},
RPC::apiMaximumSupportedVersion},
jvCommand};

View File

@@ -3,6 +3,7 @@
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/resource/Fees.h>
namespace xrpl {
@@ -99,13 +100,14 @@ GRPCServerImpl::CallData<Request, Response>::process()
// ensures that finished is always true when this CallData object
// is returned as a tag in handleRpcs(), after sending the response
finished_ = true;
auto coro = app_.getJobQueue().postCoro(
JobType::jtRPC, "gRPC-Client", [thisShared](std::shared_ptr<JobQueue::Coro> coro) {
thisShared->process(coro);
auto runner = app_.getJobQueue().postCoroTask(
JobType::jtRPC, "gRPC-Client", [thisShared](auto) -> CoroTask<void> {
thisShared->processRequest();
co_return;
});
// If coro is null, then the JobQueue has already been shutdown
if (!coro)
// If runner is null, then the JobQueue has already been shutdown
if (!runner)
{
grpc::Status status{grpc::StatusCode::INTERNAL, "Job Queue is already stopped"};
responder_.FinishWithError(status, this);
@@ -114,7 +116,7 @@ GRPCServerImpl::CallData<Request, Response>::process()
template <class Request, class Response>
void
GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::Coro> coro)
GRPCServerImpl::CallData<Request, Response>::processRequest()
{
try
{
@@ -156,7 +158,6 @@ GRPCServerImpl::CallData<Request, Response>::process(std::shared_ptr<JobQueue::C
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
request_};

View File

@@ -206,9 +206,12 @@ private:
clone() override;
private:
// process the request. Called inside the coroutine passed to JobQueue
/**
* Process the gRPC request. Called inside the CoroTask lambda
* posted to the JobQueue by process().
*/
void
process(std::shared_ptr<JobQueue::Coro> coro);
processRequest();
// return load type of this RPC
Resource::Charge

View File

@@ -3,7 +3,6 @@
#include <xrpld/rpc/Role.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/server/InfoSub.h>
namespace xrpl {
@@ -24,7 +23,6 @@ struct Context
LedgerMaster& ledgerMaster;
Resource::Consumer& consumer;
Role role;
std::shared_ptr<JobQueue::Coro> coro{};
InfoSub::pointer infoSub{};
unsigned int apiVersion;
};

View File

@@ -169,21 +169,17 @@ public:
private:
Json::Value
processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv);
processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv);
void
processSession(std::shared_ptr<Session> const&, std::shared_ptr<JobQueue::Coro> coro);
processSession(std::shared_ptr<Session> const&);
void
processRequest(
Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output const&,
std::shared_ptr<JobQueue::Coro> coro,
Output&&,
std::string_view forwardedFor,
std::string_view user);

View File

@@ -14,6 +14,7 @@
#include <xrpl/basics/make_SSLContext.h>
#include <xrpl/beast/net/IPAddressConversion.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/json/json_reader.h>
#include <xrpl/json/to_string.h>
@@ -284,9 +285,17 @@ ServerHandler::onRequest(Session& session)
}
std::shared_ptr<Session> detachedSession = session.detach();
auto const postResult = m_jobQueue.postCoro(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](std::shared_ptr<JobQueue::Coro> coro) {
processSession(detachedSession, coro);
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_RPC, "RPC-Client", [this, detachedSession](auto) -> CoroTask<void> {
try
{
processSession(detachedSession);
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "RPC-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)
{
@@ -322,17 +331,26 @@ ServerHandler::onWSMessage(
JLOG(m_journal.trace()) << "Websocket received '" << jv << "'";
auto const postResult = m_jobQueue.postCoro(
auto const postResult = m_jobQueue.postCoroTask(
jtCLIENT_WEBSOCKET,
"WS-Client",
[this, session, jv = std::move(jv)](std::shared_ptr<JobQueue::Coro> const& coro) {
auto const jr = this->processSession(session, coro, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
[this, session, jv = std::move(jv)](auto) -> CoroTask<void> {
try
{
auto const jr = this->processSession(session, jv);
auto const s = to_string(jr);
auto const n = s.length();
boost::beast::multi_buffer sb(n);
sb.commit(
boost::asio::buffer_copy(sb.prepare(n), boost::asio::buffer(s.c_str(), n)));
session->send(std::make_shared<StreambufWSMsg<decltype(sb)>>(std::move(sb)));
session->complete();
}
catch (std::exception const& e)
{
JLOG(m_journal.error()) << "WS-Client coroutine exception: " << e.what();
}
co_return;
});
if (postResult == nullptr)
{
@@ -377,10 +395,7 @@ logDuration(Json::Value const& request, T const& duration, beast::Journal& journ
}
Json::Value
ServerHandler::processSession(
std::shared_ptr<WSSession> const& session,
std::shared_ptr<JobQueue::Coro> const& coro,
Json::Value const& jv)
ServerHandler::processSession(std::shared_ptr<WSSession> const& session, Json::Value const& jv)
{
auto is = std::static_pointer_cast<WSInfoSub>(session->appDefined);
if (is->getConsumer().disconnect(m_journal))
@@ -447,7 +462,6 @@ ServerHandler::processSession(
app_.getLedgerMaster(),
is->getConsumer(),
role,
coro,
is,
apiVersion},
jv,
@@ -518,18 +532,14 @@ ServerHandler::processSession(
return jr;
}
// Run as a coroutine.
void
ServerHandler::processSession(
std::shared_ptr<Session> const& session,
std::shared_ptr<JobQueue::Coro> coro)
ServerHandler::processSession(std::shared_ptr<Session> const& session)
{
processRequest(
session->port(),
buffers_to_string(session->request().body().data()),
session->remoteAddress().at_port(0),
makeOutput(*session),
coro,
forwardedFor(session->request()),
[&] {
auto const iter = session->request().find("X-User");
@@ -569,8 +579,7 @@ ServerHandler::processRequest(
Port const& port,
std::string const& request,
beast::IP::Endpoint const& remoteIPAddress,
Output const& output,
std::shared_ptr<JobQueue::Coro> coro,
Output&& output,
std::string_view forwardedFor,
std::string_view user)
{
@@ -830,7 +839,6 @@ ServerHandler::processRequest(
app_.getLedgerMaster(),
usage,
role,
coro,
InfoSub::pointer(),
apiVersion},
params,

View File

@@ -7,6 +7,9 @@
#include <xrpl/protocol/RPCErr.h>
#include <xrpl/resource/Fees.h>
#include <condition_variable>
#include <mutex>
namespace xrpl {
// This interface is deprecated.
@@ -37,98 +40,40 @@ doRipplePathFind(RPC::JsonContext& context)
PathRequest::pointer request;
lpLedger = context.ledgerMaster.getClosedLedger();
// It doesn't look like there's much odd happening here, but you should
// be aware this code runs in a JobQueue::Coro, which is a coroutine.
// And we may be flipping around between threads. Here's an overview:
//
// 1. We're running doRipplePathFind() due to a call to
// ripple_path_find. doRipplePathFind() is currently running
// inside of a JobQueue::Coro using a JobQueue thread.
//
// 2. doRipplePathFind's call to makeLegacyPathRequest() enqueues the
// path-finding request. That request will (probably) run at some
// indeterminate future time on a (probably different) JobQueue
// thread.
//
// 3. As a continuation from that path-finding JobQueue thread, the
// coroutine we're currently running in (!) is posted to the
// JobQueue. Because it is a continuation, that post won't
// happen until the path-finding request completes.
//
// 4. Once the continuation is enqueued, and we have reason to think
// the path-finding job is likely to run, then the coroutine we're
// running in yield()s. That means it surrenders its thread in
// the JobQueue. The coroutine is suspended, but ready to run,
// because it is kept resident by a shared_ptr in the
// path-finding continuation.
//
// 5. If all goes well then path-finding runs on a JobQueue thread
// and executes its continuation. The continuation posts this
// same coroutine (!) to the JobQueue.
//
// 6. When the JobQueue calls this coroutine, this coroutine resumes
// from the line below the coro->yield() and returns the
// path-finding result.
//
// With so many moving parts, what could go wrong?
//
// Just in terms of the JobQueue refusing to add jobs at shutdown
// there are two specific things that can go wrong.
//
// 1. The path-finding Job queued by makeLegacyPathRequest() might be
// rejected (because we're shutting down).
//
// Fortunately this problem can be addressed by looking at the
// return value of makeLegacyPathRequest(). If
// makeLegacyPathRequest() cannot get a thread to run the path-find
// on, then it returns an empty request.
//
// 2. The path-finding job might run, but the Coro::post() might be
// rejected by the JobQueue (because we're shutting down).
//
// We handle this case by resuming (not posting) the Coro.
// By resuming the Coro, we allow the Coro to run to completion
// on the current thread instead of requiring that it run on a
// new thread from the JobQueue.
//
// Both of these failure modes are hard to recreate in a unit test
// because they are so dependent on inter-thread timing. However
// the failure modes can be observed by synchronously (inside the
// rippled source code) shutting down the application. The code to
// do so looks like this:
//
// context.app.signalStop();
// while (! context.app.getJobQueue().jobCounter().joined()) { }
//
// The first line starts the process of shutting down the app.
// The second line waits until no more jobs can be added to the
// JobQueue before letting the thread continue.
//
// May 2017
// makeLegacyPathRequest enqueues a path-finding job that runs
// asynchronously. We block this thread with a condition_variable
// until the path-finding continuation signals completion.
// If makeLegacyPathRequest cannot schedule the job (e.g. during
// shutdown), it returns an empty request and we skip the wait.
// Replaces the old Coro yield/resume pattern with synchronous
// blocking, eliminating shutdown race conditions.
std::mutex mtx;
std::condition_variable cv;
bool pathDone = false;
jvResult = context.app.getPathRequests().makeLegacyPathRequest(
request,
[&context]() {
// Copying the shared_ptr keeps the coroutine alive up
// through the return. Otherwise the storage under the
// captured reference could evaporate when we return from
// coroCopy->resume(). This is not strictly necessary, but
// will make maintenance easier.
std::shared_ptr<JobQueue::Coro> coroCopy{context.coro};
if (!coroCopy->post())
[&]() {
{
// The post() failed, so we won't get a thread to let
// the Coro finish. We'll call Coro::resume() so the
// Coro can finish on our thread. Otherwise the
// application will hang on shutdown.
coroCopy->resume();
std::lock_guard lk(mtx);
pathDone = true;
}
cv.notify_one();
},
context.consumer,
lpLedger,
context.params);
if (request)
{
context.coro->yield();
using namespace std::chrono_literals;
std::unique_lock lk(mtx);
if (!cv.wait_for(lk, 30s, [&] { return pathDone; }))
{
// Path-finding continuation never fired (e.g. shutdown
// race or unexpected failure). Return an internal error
// rather than blocking the RPC thread indefinitely.
return rpcError(rpcINTERNAL);
}
jvResult = request->doStatus(context.params);
}