Compare commits

...

23 Commits

Author SHA1 Message Date
Bart
c9df784c4e ci: Use updated prepare-runner in actions and worfklows (#2889) 2026-01-08 20:13:49 +00:00
Alex Kremer
a9787b131e feat: Basic support for channels (#2859)
This PR implements go-like channels wrapper (on top of asio experimental
channels).
In the future this will be integrated into the AsyncFramework.

---------

Co-authored-by: Sergey Kuznetsov <skuznetsov@ripple.com>
2026-01-08 14:21:46 +00:00
Sergey Kuznetsov
9f76eabf0a feat: Option to save cache asyncronously (#2883)
This PR adds an option to save cache to file asynchronously in parallel
with shutting down the rest of Clio services.
2026-01-07 17:20:56 +00:00
github-actions[bot]
79c08fc735 style: Update pre-commit hooks (#2875)
Co-authored-by: mathbunnyru <12270691+mathbunnyru@users.noreply.github.com>
2026-01-05 01:10:10 +00:00
dependabot[bot]
2c9c5634ad ci: [DEPENDABOT] Bump actions/cache from 4.3.0 to 5.0.1 (#2871)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
2025-12-23 01:46:14 +00:00
dependabot[bot]
850333528c ci: [DEPENDABOT] Bump docker/setup-buildx-action from 3.11.1 to 3.12.0 (#2870)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-22 11:52:41 +00:00
github-actions[bot]
8da4194fe2 style: clang-tidy auto fixes (#2874)
Co-authored-by: godexsoft <385326+godexsoft@users.noreply.github.com>
2025-12-22 11:52:23 +00:00
dependabot[bot]
4dece23ede ci: [DEPENDABOT] Bump docker/setup-buildx-action from 3.11.1 to 3.12.0 in /.github/actions/build-docker-image (#2872)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-22 11:52:04 +00:00
Alex Kremer
2327e81b0b fix: WorkQueue contention (#2866)
Co-authored-by: Ayaz Salikhov <mathbunnyru@users.noreply.github.com>
2025-12-19 15:26:55 +00:00
Bart
5269ea0223 ci: Remove unnecessary creation of build directory (#2867) 2025-12-18 15:15:00 +00:00
github-actions[bot]
89fbcbf66a style: clang-tidy auto fixes (#2862) 2025-12-17 10:19:20 +00:00
Ayaz Salikhov
4b731a92ae ci: Update shared actions (#2852) 2025-12-16 20:54:48 +00:00
Ayaz Salikhov
7600e740a0 revert: "refactor: Add writing command to etl::SystemState" (#2860) 2025-12-16 15:06:35 +00:00
dependabot[bot]
db9a460867 ci: [DEPENDABOT] Bump actions/upload-artifact from 5.0.0 to 6.0.0 in /.github/actions/code-coverage (#2858)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:36:09 +00:00
dependabot[bot]
d5b0329e70 ci: [DEPENDABOT] Bump codecov/codecov-action from 5.5.1 to 5.5.2 (#2857)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:35:58 +00:00
dependabot[bot]
612434677a ci: [DEPENDABOT] Bump actions/upload-artifact from 5.0.0 to 6.0.0 (#2856)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:35:49 +00:00
dependabot[bot]
5a5a79fe30 ci: [DEPENDABOT] Bump actions/download-artifact from 6.0.0 to 7.0.0 (#2855)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:35:19 +00:00
dependabot[bot]
b1a49fdaab ci: [DEPENDABOT] Bump peter-evans/create-pull-request from 7.0.11 to 8.0.0 (#2854)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:35:11 +00:00
dependabot[bot]
f451996944 ci: [DEPENDABOT] Bump tj-actions/changed-files from 46.0.5 to 47.0.1 (#2853)
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
2025-12-15 12:34:55 +00:00
Ayaz Salikhov
488bb05d22 chore: Add a script to regenerate conan lockfile (#2849) 2025-12-12 16:40:00 +00:00
Ayaz Salikhov
f2c4275f61 ci: Put debian package to release (#2850) 2025-12-12 16:39:53 +00:00
github-actions[bot]
e9b98cf5b3 style: clang-tidy auto fixes (#2848)
Co-authored-by: godexsoft <385326+godexsoft@users.noreply.github.com>
2025-12-11 09:51:03 +00:00
Sergey Kuznetsov
3aa1854129 refactor: Add writing command to etl::SystemState (#2842) 2025-12-10 16:29:29 +00:00
35 changed files with 1551 additions and 157 deletions

View File

@@ -14,7 +14,7 @@ runs:
using: composite
steps:
- name: Get number of processors
uses: XRPLF/actions/.github/actions/get-nproc@046b1620f6bfd6cd0985dc82c3df02786801fe0a
uses: XRPLF/actions/get-nproc@cf0433aa74563aead044a1e395610c96d65a37cf
id: nproc
with:
subtract: ${{ inputs.nproc_subtract }}

View File

@@ -50,7 +50,7 @@ runs:
- uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0
with:
cache-image: false
- uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
- uses: docker/metadata-action@c299e40c65443455700f0fdfc63efafe5b349051 # v5.10.0
id: meta

View File

@@ -24,7 +24,7 @@ runs:
-j8 --exclude-throw-branches
- name: Archive coverage report
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: coverage-report.xml
path: build/coverage_report.xml

View File

@@ -21,10 +21,6 @@ inputs:
runs:
using: composite
steps:
- name: Create build directory
shell: bash
run: mkdir -p "${{ inputs.build_dir }}"
- name: Run conan
shell: bash
env:

25
.github/scripts/conan/regenerate_lockfile.sh vendored Executable file
View File

@@ -0,0 +1,25 @@
#!/usr/bin/env bash
set -ex
TEMP_DIR=$(mktemp -d)
trap "rm -rf $TEMP_DIR" EXIT
echo "Using temporary CONAN_HOME: $TEMP_DIR"
# We use a temporary Conan home to avoid polluting the user's existing Conan
# configuration and to not use local cache (which leads to non-reproducible lockfiles).
export CONAN_HOME="$TEMP_DIR"
# Ensure that the xrplf remote is the first to be consulted, so any recipes we
# patched are used. We also add it there to not created huge diff when the
# official Conan Center Index is updated.
conan remote add --force --index 0 xrplf https://conan.ripplex.io
# Delete any existing lockfile.
rm -f conan.lock
# Create a new lockfile that is compatible with macOS.
# It should also work on Linux.
conan lock create . \
--profile:all=.github/scripts/conan/apple-clang-17.profile

View File

@@ -52,7 +52,7 @@ jobs:
- name: Download Clio binary from artifact
if: ${{ inputs.artifact_name != null }}
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: ${{ inputs.artifact_name }}
path: ./docker/clio/artifact/

View File

@@ -92,24 +92,6 @@ jobs:
secrets:
CODECOV_TOKEN: ${{ secrets.CODECOV_TOKEN }}
package:
name: Build packages
uses: ./.github/workflows/reusable-build.yml
with:
runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:067449c3f8ae6755ea84752ea2962b589fe56c8f" }'
conan_profile: gcc
build_type: Release
download_ccache: true
upload_ccache: false
code_coverage: false
static: true
upload_clio_server: false
package: true
targets: package
analyze_build_time: false
check_config:
name: Check Config Description
needs: build-and-test
@@ -120,7 +102,7 @@ jobs:
steps:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: clio_server_Linux_Release_gcc

View File

@@ -29,9 +29,9 @@ jobs:
fetch-depth: 0
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: true
enable_ccache: false
- name: Update libXRPL version requirement
run: |
@@ -59,7 +59,7 @@ jobs:
run: strip build/clio_tests
- name: Upload clio_tests
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: clio_tests_check_libxrpl
path: build/clio_tests
@@ -72,7 +72,7 @@ jobs:
image: ghcr.io/xrplf/clio-ci:067449c3f8ae6755ea84752ea2962b589fe56c8f
steps:
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: clio_tests_check_libxrpl

View File

@@ -44,9 +44,9 @@ jobs:
fetch-depth: 0
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: true
enable_ccache: false
- name: Run conan
uses: ./.github/actions/conan
@@ -59,7 +59,7 @@ jobs:
conan_profile: ${{ env.CONAN_PROFILE }}
- name: Get number of processors
uses: XRPLF/actions/.github/actions/get-nproc@046b1620f6bfd6cd0985dc82c3df02786801fe0a
uses: XRPLF/actions/get-nproc@cf0433aa74563aead044a1e395610c96d65a37cf
id: nproc
- name: Run clang-tidy (several times)
@@ -107,7 +107,7 @@ jobs:
- name: Create PR with fixes
if: ${{ steps.files_changed.outcome != 'success' && github.event_name != 'pull_request' }}
uses: peter-evans/create-pull-request@22a9089034f40e5a961c8808d113e2c98fb63676 # v7.0.11
uses: peter-evans/create-pull-request@98357b18bf14b5342f975ff684046ec3b2a07725 # v8.0.0
env:
GH_REPO: ${{ github.repository }}
GH_TOKEN: ${{ github.token }}

View File

@@ -27,9 +27,9 @@ jobs:
lfs: true
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: true
enable_ccache: false
- name: Create build directory
run: mkdir build_docs

View File

@@ -68,6 +68,24 @@ jobs:
download_ccache: false
upload_ccache: false
package:
name: Build debian package
uses: ./.github/workflows/reusable-build.yml
with:
runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:067449c3f8ae6755ea84752ea2962b589fe56c8f" }'
conan_profile: gcc
build_type: Release
download_ccache: false
upload_ccache: false
code_coverage: false
static: true
upload_clio_server: false
package: true
targets: package
analyze_build_time: false
analyze_build_time:
name: Analyze Build Time
@@ -109,7 +127,7 @@ jobs:
echo "date=$(date +'%Y%m%d')" >> $GITHUB_OUTPUT
nightly_release:
needs: [build-and-test, get_date]
needs: [build-and-test, package, get_date]
uses: ./.github/workflows/reusable-release.yml
with:
delete_pattern: "nightly-*"

View File

@@ -8,7 +8,7 @@ on:
jobs:
run-hooks:
uses: XRPLF/actions/.github/workflows/pre-commit.yml@34790936fae4c6c751f62ec8c06696f9c1a5753a
uses: XRPLF/actions/.github/workflows/pre-commit.yml@5ca417783f0312ab26d6f48b85c78edf1de99bbd
with:
runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-pre-commit:067449c3f8ae6755ea84752ea2962b589fe56c8f" }'

View File

@@ -45,8 +45,26 @@ jobs:
upload_ccache: false
expected_version: ${{ github.event_name == 'push' && github.ref_name || '' }}
package:
name: Build debian package
uses: ./.github/workflows/reusable-build.yml
with:
runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:067449c3f8ae6755ea84752ea2962b589fe56c8f" }'
conan_profile: gcc
build_type: Release
download_ccache: false
upload_ccache: false
code_coverage: false
static: true
upload_clio_server: false
package: true
targets: package
analyze_build_time: false
release:
needs: build-and-test
needs: [build-and-test, package]
uses: ./.github/workflows/reusable-release.yml
with:
delete_pattern: ""

View File

@@ -88,7 +88,7 @@ jobs:
steps:
- name: Cleanup workspace
if: ${{ runner.os == 'macOS' }}
uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
@@ -99,9 +99,9 @@ jobs:
ref: ${{ github.ref }}
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: ${{ !inputs.download_ccache }}
enable_ccache: ${{ inputs.download_ccache }}
- name: Setup conan on macOS
if: ${{ runner.os == 'macOS' }}
@@ -117,7 +117,7 @@ jobs:
- name: Restore ccache cache
if: ${{ inputs.download_ccache && github.ref != 'refs/heads/develop' }}
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/restore@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ${{ env.CCACHE_DIR }}
key: ${{ steps.cache_key.outputs.key }}
@@ -154,7 +154,7 @@ jobs:
- name: Upload build time analyze report
if: ${{ inputs.analyze_build_time }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: build_time_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build_time_report.txt
@@ -167,7 +167,7 @@ jobs:
- name: Save ccache cache
if: ${{ inputs.upload_ccache && github.ref == 'refs/heads/develop' }}
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
uses: actions/cache/save@9255dc7a253b0ccc959486e2bca901246202afeb # v5.0.1
with:
path: ${{ env.CCACHE_DIR }}
key: ${{ steps.cache_key.outputs.key }}
@@ -182,28 +182,28 @@ jobs:
- name: Upload clio_server
if: ${{ inputs.upload_clio_server && !inputs.code_coverage && !inputs.analyze_build_time }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: clio_server_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_server
- name: Upload clio_tests
if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_tests
- name: Upload clio_integration_tests
if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_integration_tests
- name: Upload Clio Linux package
if: ${{ inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: clio_deb_package_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/*.deb

View File

@@ -60,15 +60,23 @@ jobs:
fetch-depth: 0
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: true
enable_ccache: false
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
path: release_artifacts
pattern: clio_server_*
- name: Prepare release artifacts
run: .github/scripts/prepare-release-artifacts.sh release_artifacts
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
path: release_artifacts
pattern: clio_deb_package_*
- name: Create release notes
env:
RELEASE_HEADER: ${{ inputs.header }}
@@ -86,11 +94,8 @@ jobs:
git-cliff "${BASE_COMMIT}..HEAD" --ignore-tags "nightly|-b|-rc"
cat CHANGELOG.md >> "${RUNNER_TEMP}/release_notes.md"
- name: Prepare release artifacts
run: .github/scripts/prepare-release-artifacts.sh release_artifacts
- name: Upload release notes
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: release_notes_${{ inputs.version }}
path: "${RUNNER_TEMP}/release_notes.md"
@@ -122,4 +127,4 @@ jobs:
--target "${GITHUB_SHA}" \
${DRAFT_OPTION} \
--notes-file "${RUNNER_TEMP}/release_notes.md" \
./release_artifacts/clio_server*
./release_artifacts/clio_*

View File

@@ -52,13 +52,13 @@ jobs:
steps:
- name: Cleanup workspace
if: ${{ runner.os == 'macOS' }}
uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
fetch-depth: 0
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
@@ -85,7 +85,7 @@ jobs:
- name: Upload sanitizer report
if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' && steps.check_report.outputs.found_report == 'true' }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
with:
name: sanitizer_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: .sanitizer-report/*
@@ -124,7 +124,7 @@ jobs:
steps:
- name: Cleanup workspace
if: ${{ runner.os == 'macOS' }}
uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- name: Spin up scylladb
if: ${{ runner.os == 'macOS' }}
@@ -146,7 +146,7 @@ jobs:
sleep 5
done
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
- uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}

View File

@@ -21,14 +21,14 @@ jobs:
fetch-depth: 0
- name: Download report artifact
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0
uses: actions/download-artifact@37930b1c2abaa49bbe596cd826c3c89aef350131 # v7.0.0
with:
name: coverage-report.xml
path: build
- name: Upload coverage report
if: ${{ hashFiles('build/coverage_report.xml') != '' }}
uses: codecov/codecov-action@5a1091511ad55cbe89839c7260b706298ca349f7 # v5.5.1
uses: codecov/codecov-action@671740ac38dd9b0130fbe1cec585b89eea48d3de # v5.5.2
with:
files: build/coverage_report.xml
fail_ci_if_error: true

View File

@@ -60,7 +60,7 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # v47.0.0
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/compilers/gcc/**"
@@ -98,7 +98,7 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c # v46.0.5
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/compilers/gcc/**"
@@ -136,12 +136,12 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # v47.0.0
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/compilers/gcc/**"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
- name: Login to GitHub Container Registry
if: ${{ github.event_name != 'pull_request' }}
@@ -187,7 +187,7 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # v47.0.0
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/compilers/clang/**"
@@ -223,7 +223,7 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # v47.0.0
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/tools/**"
@@ -254,7 +254,7 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@ed68ef82c095e0d48ec87eccea555d944a631a4c # v46.0.5
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/tools/**"
@@ -285,12 +285,12 @@ jobs:
- name: Get changed files
id: changed-files
uses: tj-actions/changed-files@24d32ffd492484c1d75e0c0b894501ddb9d30d62 # v47.0.0
uses: tj-actions/changed-files@e0021407031f5be11a464abee9a0776171c79891 # v47.0.1
with:
files: "docker/tools/**"
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
uses: docker/setup-buildx-action@8d2750c68a42422c14e847fe6c8ac0403b4cbd6f # v3.12.0
- name: Login to GitHub Container Registry
if: ${{ github.event_name != 'pull_request' }}

View File

@@ -22,6 +22,7 @@ on:
- .github/actions/conan/action.yml
- ".github/scripts/conan/**"
- "!.github/scripts/conan/regenerate_lockfile.sh"
- conanfile.py
- conan.lock
@@ -32,6 +33,7 @@ on:
- .github/actions/conan/action.yml
- ".github/scripts/conan/**"
- "!.github/scripts/conan/regenerate_lockfile.sh"
- conanfile.py
- conan.lock
@@ -76,9 +78,9 @@ jobs:
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382
uses: XRPLF/actions/prepare-runner@65da1c59e81965eeb257caa3587b9d45066fb925
with:
disable_ccache: true
enable_ccache: false
- name: Setup conan on macOS
if: ${{ runner.os == 'macOS' }}

View File

@@ -29,12 +29,12 @@ repos:
# Autoformat: YAML, JSON, Markdown, etc.
- repo: https://github.com/rbubley/mirrors-prettier
rev: 3c603eae8faac85303ae675fd33325cff699a797 # frozen: v3.7.3
rev: 14abee445aea04b39069c19b4bd54efff6775819 # frozen: v3.7.4
hooks:
- id: prettier
- repo: https://github.com/igorshubovych/markdownlint-cli
rev: c8fd5003603dd6f12447314ecd935ba87c09aff5 # frozen: v0.46.0
rev: 76b3d32d3f4b965e1d6425253c59407420ae2c43 # frozen: v0.47.0
hooks:
- id: markdownlint-fix
exclude: LICENSE.md
@@ -59,7 +59,7 @@ repos:
]
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 2892f1f81088477370d4fbc56545c05d33d2493f # frozen: 25.11.0
rev: 831207fd435b47aeffdf6af853097e64322b4d44 # frozen: 25.12.0
hooks:
- id: black
@@ -94,7 +94,7 @@ repos:
language: script
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: 4c26f99731e7c22a047c35224150ee9e43d7c03e # frozen: v21.1.6
rev: 75ca4ad908dc4a99f57921f29b7e6c1521e10b26 # frozen: v21.1.8
hooks:
- id: clang-format
args: [--style=file]

View File

@@ -9,10 +9,12 @@ target_sources(
util/async/ExecutionContextBenchmarks.cpp
# Logger
util/log/LoggerBenchmark.cpp
# WorkQueue
rpc/WorkQueueBenchmarks.cpp
)
include(deps/gbench)
target_include_directories(clio_benchmark PRIVATE .)
target_link_libraries(clio_benchmark PUBLIC clio_util benchmark::benchmark_main spdlog::spdlog)
target_link_libraries(clio_benchmark PUBLIC clio_util clio_rpc benchmark::benchmark_main spdlog::spdlog)
set_target_properties(clio_benchmark PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})

View File

@@ -0,0 +1,122 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "rpc/WorkQueue.hpp"
#include "util/Assert.hpp"
#include "util/config/Array.hpp"
#include "util/config/ConfigConstraints.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/config/ConfigValue.hpp"
#include "util/config/Types.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Prometheus.hpp"
#include <benchmark/benchmark.h>
#include <boost/asio/steady_timer.hpp>
#include <atomic>
#include <cassert>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <mutex>
using namespace rpc;
using namespace util::config;
namespace {
auto const kCONFIG = ClioConfigDefinition{
{"prometheus.compress_reply", ConfigValue{ConfigType::Boolean}.defaultValue(true)},
{"prometheus.enabled", ConfigValue{ConfigType::Boolean}.defaultValue(true)},
{"log.channels.[].channel", Array{ConfigValue{ConfigType::String}}},
{"log.channels.[].level", Array{ConfigValue{ConfigType::String}}},
{"log.level", ConfigValue{ConfigType::String}.defaultValue("info")},
{"log.format", ConfigValue{ConfigType::String}.defaultValue(R"(%Y-%m-%d %H:%M:%S.%f %^%3!l:%n%$ - %v)")},
{"log.is_async", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"log.enable_console", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"log.directory", ConfigValue{ConfigType::String}.optional()},
{"log.rotation_size", ConfigValue{ConfigType::Integer}.defaultValue(2048).withConstraint(gValidateUint32)},
{"log.directory_max_files", ConfigValue{ConfigType::Integer}.defaultValue(25).withConstraint(gValidateUint32)},
{"log.tag_style", ConfigValue{ConfigType::String}.defaultValue("none")},
};
// this should be a fixture but it did not work with Args very well
void
init()
{
static std::once_flag kONCE;
std::call_once(kONCE, [] {
PrometheusService::init(kCONFIG);
(void)util::LogService::init(kCONFIG);
});
}
} // namespace
static void
benchmarkWorkQueue(benchmark::State& state)
{
init();
auto const total = static_cast<size_t>(state.range(0));
auto const numThreads = static_cast<uint32_t>(state.range(1));
auto const maxSize = static_cast<uint32_t>(state.range(2));
auto const delayMs = static_cast<uint32_t>(state.range(3));
for (auto _ : state) {
std::atomic_size_t totalExecuted = 0uz;
std::atomic_size_t totalQueued = 0uz;
state.PauseTiming();
WorkQueue queue(numThreads, maxSize);
state.ResumeTiming();
for (auto i = 0uz; i < total; ++i) {
totalQueued += static_cast<std::size_t>(queue.postCoro(
[&delayMs, &totalExecuted](auto yield) {
++totalExecuted;
boost::asio::steady_timer timer(yield.get_executor(), std::chrono::milliseconds{delayMs});
timer.async_wait(yield);
},
/* isWhiteListed = */ false
));
}
queue.stop();
ASSERT(totalExecuted == totalQueued, "Totals don't match");
ASSERT(totalQueued <= total, "Queued more than requested");
ASSERT(totalQueued >= maxSize, "Queued less than maxSize");
}
}
// Usage example:
/*
./clio_benchmark \
--benchmark_repetitions=10 \
--benchmark_display_aggregates_only=true \
--benchmark_min_time=1x \
--benchmark_filter="WorkQueue"
*/
// TODO: figure out what happens on 1 thread
BENCHMARK(benchmarkWorkQueue)
->ArgsProduct({{1'000, 10'000, 100'000}, {2, 4, 8}, {0, 5'000}, {10, 100, 250}})
->Unit(benchmark::kMillisecond);

View File

@@ -97,30 +97,14 @@ Now you should be able to download the prebuilt dependencies (including `xrpl` p
#### Conan lockfile
To achieve reproducible dependencies, we use [Conan lockfile](https://docs.conan.io/2/tutorial/versioning/lockfiles.html).
To achieve reproducible dependencies, we use a [Conan lockfile](https://docs.conan.io/2/tutorial/versioning/lockfiles.html).
The `conan.lock` file in the repository contains a "snapshot" of the current dependencies.
It is implicitly used when running `conan` commands, you don't need to specify it.
You have to update this file every time you add a new dependency or change a revision or version of an existing dependency.
> [!NOTE]
> Conan uses local cache by default when creating a lockfile.
>
> To ensure, that lockfile creation works the same way on all developer machines, you should clear the local cache before creating a new lockfile.
To create a new lockfile, run the following commands in the repository root:
```bash
conan remove '*' --confirm
rm conan.lock
# This ensure that xrplf remote is the first to be consulted
conan remote add --force --index 0 xrplf https://conan.ripplex.io
conan lock create .
```
> [!NOTE]
> If some dependencies are exclusive for some OS, you may need to run the last command for them adding `--profile:all <PROFILE>`.
To update a lockfile, run from the repository root: `./.github/scripts/conan/regenerate_lockfile.sh`
## Building Clio

View File

@@ -457,6 +457,14 @@ This document provides a list of all available Clio configuration properties in
- **Constraints**: None
- **Description**: Max allowed difference between the latest sequence in DB and in cache file. If the cache file is too old (contains too low latest sequence) Clio will reject using it.
### cache.file.async_save
- **Required**: True
- **Type**: boolean
- **Default value**: `False`
- **Constraints**: None
- **Description**: When false, Clio waits for cache saving to finish before shutting down. When true, cache saving runs in parallel with other shutdown operations.
### log.channels.[].channel
- **Required**: False

View File

@@ -30,7 +30,9 @@
namespace data {
LedgerCacheSaver::LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache)
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path")), cache_(cache)
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path"))
, cache_(cache)
, isAsync_(config.get<bool>("cache.file.async_save"))
{
}
@@ -56,6 +58,9 @@ LedgerCacheSaver::save()
LOG(util::LogService::error()) << "Error saving LedgerCache to file: " << success.error();
}
});
if (not isAsync_) {
waitToFinish();
}
}
void

View File

@@ -53,6 +53,7 @@ class LedgerCacheSaver {
std::optional<std::string> cacheFilePath_;
std::reference_wrapper<LedgerCacheInterface const> cache_;
std::optional<std::thread> savingThread_;
bool isAsync_;
public:
/**

View File

@@ -34,8 +34,8 @@
#include <cstddef>
#include <cstdint>
#include <functional>
#include <optional>
#include <utility>
#include <vector>
namespace rpc {
@@ -122,7 +122,7 @@ WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
// all ongoing tasks must be completed before stopping fully
while (not stopping_ or size() > 0) {
std::vector<TaskType> batch;
std::optional<TaskType> task;
{
auto state = dispatcherState_.lock();
@@ -130,43 +130,31 @@ WorkQueue::dispatcherLoop(boost::asio::yield_context yield)
if (state->empty()) {
state->isIdle = true;
} else {
for (auto count = 0uz; count < kTAKE_HIGH_PRIO and not state->high.empty(); ++count) {
batch.push_back(std::move(state->high.front()));
state->high.pop();
}
if (not state->normal.empty()) {
batch.push_back(std::move(state->normal.front()));
state->normal.pop();
}
task = state->popNext();
}
}
if (not stopping_ and batch.empty()) {
if (not stopping_ and not task.has_value()) {
waitTimer_.expires_at(std::chrono::steady_clock::time_point::max());
boost::system::error_code ec;
waitTimer_.async_wait(yield[ec]);
} else {
for (auto task : std::move(batch)) {
util::spawn(
ioc_,
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(task)](auto yield) mutable {
auto const takenAt = std::chrono::system_clock::now();
auto const waited =
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();
} else if (task.has_value()) {
util::spawn(
ioc_,
[this, spawnedAt = std::chrono::system_clock::now(), task = std::move(*task)](auto yield) mutable {
auto const takenAt = std::chrono::system_clock::now();
auto const waited =
std::chrono::duration_cast<std::chrono::microseconds>(takenAt - spawnedAt).count();
++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();
++queued_.get();
durationUs_.get() += waited;
LOG(log_.info()) << "WorkQueue wait time: " << waited << ", queue size: " << size();
task(yield);
task(yield);
--curSize_.get();
}
);
}
boost::asio::post(ioc_.get_executor(), yield); // yield back to avoid hijacking the thread
--curSize_.get();
}
);
}
}

View File

@@ -38,7 +38,9 @@
#include <cstdint>
#include <functional>
#include <limits>
#include <optional>
#include <queue>
#include <utility>
namespace rpc {
@@ -79,6 +81,7 @@ private:
QueueType normal;
bool isIdle = false;
size_t highPriorityCounter = 0;
void
push(Priority priority, auto&& task)
@@ -96,6 +99,26 @@ private:
{
return high.empty() and normal.empty();
}
[[nodiscard]] std::optional<TaskType>
popNext()
{
if (not high.empty() and (highPriorityCounter < kTAKE_HIGH_PRIO or normal.empty())) {
auto task = std::move(high.front());
high.pop();
++highPriorityCounter;
return task;
}
if (not normal.empty()) {
auto task = std::move(normal.front());
normal.pop();
highPriorityCounter = 0;
return task;
}
return std::nullopt;
}
};
private:

381
src/util/Channel.hpp Normal file
View File

@@ -0,0 +1,381 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include <boost/asio/any_io_executor.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/concurrent_channel.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/system/detail/error_code.hpp>
#include <concepts>
#include <cstddef>
#include <memory>
#include <optional>
#include <type_traits>
#include <utility>
namespace util {
#ifdef __clang__
namespace detail {
// Forward declaration for compile-time check
template <typename T>
struct ChannelInstantiated;
} // namespace detail
#endif
/**
* @brief Represents a go-like channel, a multi-producer (Sender) multi-consumer (Receiver) thread-safe data pipe.
* @note Use INSTANTIATE_CHANNEL_FOR_CLANG macro when using this class. See docs at the bottom of the file for more
* details.
*
* @tparam T The type of data the channel transfers
*/
template <typename T>
class Channel {
private:
class ControlBlock {
using InternalChannelType = boost::asio::experimental::concurrent_channel<void(boost::system::error_code, T)>;
boost::asio::any_io_executor executor_;
InternalChannelType ch_;
public:
ControlBlock(auto&& context, std::size_t capacity) : executor_(context.get_executor()), ch_(context, capacity)
{
}
[[nodiscard]] InternalChannelType&
channel()
{
return ch_;
}
void
close()
{
if (not isClosed()) {
ch_.close();
// Workaround for Boost bug: close() alone doesn't cancel pending async operations.
// We must call cancel() to unblock them. The bug also causes cancel() to return
// error_code 0 instead of channel_cancelled, so async operations must check
// isClosed() to detect this case.
// https://github.com/chriskohlhoff/asio/issues/1575
ch_.cancel();
}
}
[[nodiscard]] bool
isClosed() const
{
return not ch_.is_open();
}
};
/**
* @brief This is used to close the channel once either all Senders or all Receivers are destroyed
*/
struct Guard {
std::shared_ptr<ControlBlock> shared;
~Guard()
{
shared->close();
}
};
/**
* @brief The sending end of a channel.
*
* Sender is copyable and movable. The channel remains open as long as at least one Sender exists.
* When all Sender instances are destroyed, the channel is closed and receivers will receive std::nullopt.
*/
class Sender {
std::shared_ptr<ControlBlock> shared_;
std::shared_ptr<Guard> guard_;
public:
/**
* @brief Constructs a Sender from a shared control block.
* @param shared The shared control block managing the channel state
*/
Sender(std::shared_ptr<ControlBlock> shared)
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
Sender(Sender&&) = default;
Sender(Sender const&) = default;
Sender&
operator=(Sender&&) = default;
Sender&
operator=(Sender const&) = default;
/**
* @brief Asynchronously sends data through the channel using a coroutine.
*
* Blocks the coroutine until the data is sent or the channel is closed.
*
* @tparam D The type of data to send (must be convertible to T)
* @param data The data to send
* @param yield The Boost.Asio yield context for coroutine suspension
* @return true if the data was sent successfully, false if the channel is closed
*/
template <typename D>
bool
asyncSend(D&& data, boost::asio::yield_context yield)
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
{
boost::system::error_code ecIn, ecOut;
shared_->channel().async_send(ecIn, std::forward<D>(data), yield[ecOut]);
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
if (not ecOut and shared_->isClosed())
return false;
return not ecOut;
}
/**
* @brief Asynchronously sends data through the channel using a callback.
*
* The callback is invoked when the send operation completes.
*
* @tparam D The type of data to send (must be convertible to T)
* @param data The data to send
* @param fn Callback function invoked with true if successful, false if the channel is closed
*/
template <typename D>
void
asyncSend(D&& data, std::invocable<bool> auto&& fn)
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
{
boost::system::error_code ecIn;
shared_->channel().async_send(
ecIn,
std::forward<D>(data),
[fn = std::forward<decltype(fn)>(fn), shared = shared_](boost::system::error_code ec) mutable {
// Workaround: asio channels bug returns ec=0 on cancel, check isClosed() instead
if (not ec and shared->isClosed()) {
fn(false);
return;
}
fn(not ec);
}
);
}
/**
* @brief Attempts to send data through the channel without blocking.
*
* @tparam D The type of data to send (must be convertible to T)
* @param data The data to send
* @return true if the data was sent successfully, false if the channel is full or closed
*/
template <typename D>
bool
trySend(D&& data)
requires(std::convertible_to<std::remove_cvref_t<D>, std::remove_cvref_t<T>>)
{
boost::system::error_code ec;
return shared_->channel().try_send(ec, std::forward<D>(data));
}
};
/**
* @brief The receiving end of a channel.
*
* Receiver is copyable and movable. Multiple receivers can consume from the same channel concurrently.
* When all Receiver instances are destroyed, the channel is closed and senders will fail to send.
*/
class Receiver {
std::shared_ptr<ControlBlock> shared_;
std::shared_ptr<Guard> guard_;
public:
/**
* @brief Constructs a Receiver from a shared control block.
* @param shared The shared control block managing the channel state
*/
Receiver(std::shared_ptr<ControlBlock> shared)
: shared_(std::move(shared)), guard_(std::make_shared<Guard>(shared_)) {};
Receiver(Receiver&&) = default;
Receiver(Receiver const&) = default;
Receiver&
operator=(Receiver&&) = default;
Receiver&
operator=(Receiver const&) = default;
/**
* @brief Attempts to receive data from the channel without blocking.
*
* @return std::optional containing the received value, or std::nullopt if the channel is empty or closed
*/
std::optional<T>
tryReceive()
{
std::optional<T> result;
shared_->channel().try_receive([&result](boost::system::error_code ec, auto&& value) {
if (not ec)
result = std::forward<decltype(value)>(value);
});
return result;
}
/**
* @brief Asynchronously receives data from the channel using a coroutine.
*
* Blocks the coroutine until data is available or the channel is closed.
*
* @param yield The Boost.Asio yield context for coroutine suspension
* @return std::optional containing the received value, or std::nullopt if the channel is closed
*/
[[nodiscard]] std::optional<T>
asyncReceive(boost::asio::yield_context yield)
{
boost::system::error_code ec;
auto value = shared_->channel().async_receive(yield[ec]);
if (ec)
return std::nullopt;
return value;
}
/**
* @brief Asynchronously receives data from the channel using a callback.
*
* The callback is invoked when data is available or the channel is closed.
*
* @param fn Callback function invoked with std::optional containing the value, or std::nullopt if closed
*/
void
asyncReceive(std::invocable<std::optional<std::remove_cvref_t<T>>> auto&& fn)
{
shared_->channel().async_receive(
[fn = std::forward<decltype(fn)>(fn)](boost::system::error_code ec, T&& value) mutable {
if (ec) {
fn(std::optional<T>(std::nullopt));
return;
}
fn(std::make_optional<T>(std::move(value)));
}
);
}
/**
* @brief Checks if the channel is closed.
*
* A channel is closed when all Sender instances have been destroyed.
*
* @return true if the channel is closed, false otherwise
*/
[[nodiscard]] bool
isClosed() const
{
return shared_->isClosed();
}
};
public:
/**
* @brief Factory function to create channel components.
* @param context A supported context type (either io_context or thread_pool)
* @param capacity Size of the internal buffer on the channel
* @return A pair of Sender and Receiver
*/
static std::pair<Sender, Receiver>
create(auto&& context, std::size_t capacity)
{
#ifdef __clang__
static_assert(
util::detail::ChannelInstantiated<T>::value,
"When using Channel<T> with Clang, you must add INSTANTIATE_CHANNEL_FOR_CLANG(T) "
"to one .cpp file. See documentation at the bottom of Channel.hpp for details."
);
#endif
auto shared = std::make_shared<ControlBlock>(std::forward<decltype(context)>(context), capacity);
auto sender = Sender{shared};
auto receiver = Receiver{std::move(shared)};
return {std::move(sender), std::move(receiver)};
}
};
} // namespace util
// ================================================================================================
// Clang/Apple Clang Workaround for Boost.Asio Experimental Channels
// ================================================================================================
//
// IMPORTANT: When using Channel<T> with Clang or Apple Clang, you MUST add the following line
// to ONE .cpp file that uses Channel<T>:
//
// INSTANTIATE_CHANNEL_FOR_CLANG(YourType)
//
// Example:
// // In ChannelTests.cpp or any .cpp file that uses Channel<int>:
// #include "util/Channel.hpp"
// INSTANTIATE_CHANNEL_FOR_CLANG(int)
//
// Why this is needed:
// Boost.Asio's experimental concurrent_channel has a bug where close() doesn't properly cancel
// pending async operations. When using cancellation signals (which we do in our workaround),
// Clang generates vtable references for internal cancellation_handler types but Boost.Asio
// doesn't provide the definitions, causing linker errors:
//
// Undefined symbols for architecture arm64:
// "boost::asio::detail::cancellation_handler<...>::call(boost::asio::cancellation_type)"
// "boost::asio::detail::cancellation_handler<...>::destroy()"
//
// This macro explicitly instantiates the required template specializations.
//
// See: https://github.com/chriskohlhoff/asio/issues/1575
//
#ifdef __clang__
#include <boost/asio/cancellation_signal.hpp>
#include <boost/asio/experimental/channel_traits.hpp>
#include <boost/asio/experimental/detail/channel_service.hpp>
namespace util::detail {
// Tag type used to verify that INSTANTIATE_CHANNEL_FOR_CLANG was called for a given type
template <typename T>
struct ChannelInstantiated : std::false_type {};
} // namespace util::detail
#define INSTANTIATE_CHANNEL_FOR_CLANG(T) \
/* NOLINTNEXTLINE(cppcoreguidelines-virtual-class-destructor) */ \
template class boost::asio::detail::cancellation_handler< \
boost::asio::experimental::detail::channel_service<boost::asio::detail::posix_mutex>:: \
op_cancellation<boost::asio::experimental::channel_traits<>, void(boost::system::error_code, T)>>; \
namespace util::detail { \
template <> \
struct ChannelInstantiated<T> : std::true_type {}; \
}
#else
// No workaround needed for non-Clang compilers
#define INSTANTIATE_CHANNEL_FOR_CLANG(T)
#endif

View File

@@ -361,6 +361,7 @@ getClioConfig()
{"cache.load", ConfigValue{ConfigType::String}.defaultValue("async").withConstraint(gValidateLoadMode)},
{"cache.file.path", ConfigValue{ConfigType::String}.optional()},
{"cache.file.max_sequence_age", ConfigValue{ConfigType::Integer}.defaultValue(5000)},
{"cache.file.async_save", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
{"log.channels.[].channel",
Array{ConfigValue{ConfigType::String}.optional().withConstraint(gValidateChannelName)}},

View File

@@ -282,6 +282,9 @@ This document provides a list of all available Clio configuration properties in
KV{.key = "cache.file.max_sequence_age",
.value = "Max allowed difference between the latest sequence in DB and in cache file. If the cache file is "
"too old (contains too low latest sequence) Clio will reject using it."},
KV{.key = "cache.file.async_save",
.value = "When false, Clio waits for cache saving to finish before shutting down. When true, "
"cache saving runs in parallel with other shutdown operations."},
KV{.key = "log.channels.[].channel", .value = "The name of the log channel."},
KV{.key = "log.channels.[].level", .value = "The log level for the specific log channel."},
KV{.key = "log.level",

View File

@@ -167,6 +167,7 @@ target_sources(
util/AccountUtilsTests.cpp
util/AssertTests.cpp
util/BytesConverterTests.cpp
util/ChannelTests.cpp
util/CoroutineTest.cpp
util/MoveTrackerTests.cpp
util/ObservableValueTest.cpp

View File

@@ -47,17 +47,23 @@ struct LedgerCacheSaverTest : virtual testing::Test {
constexpr static auto kFILE_PATH = "./cache.bin";
static ClioConfigDefinition
generateConfig(bool cacheFilePathHasValue)
generateConfig(bool cacheFilePathHasValue, bool asyncSave)
{
auto config = ClioConfigDefinition{{
{"cache.file.path", ConfigValue{ConfigType::String}.optional()},
{"cache.file.async_save", ConfigValue{ConfigType::Boolean}.defaultValue(false)},
}};
ConfigFileJson jsonFile{boost::json::object{}};
if (cacheFilePathHasValue) {
auto const jsonObject =
boost::json::parse(fmt::format(R"JSON({{"cache": {{"file": {{"path": "{}"}}}}}})JSON", kFILE_PATH))
.as_object();
auto const jsonObject = boost::json::parse(
fmt::format(
R"JSON({{"cache": {{"file": {{"path": "{}", "async_save": {} }} }} }})JSON",
kFILE_PATH,
asyncSave
)
)
.as_object();
jsonFile = ConfigFileJson{jsonObject};
}
auto const errors = config.parse(jsonFile);
@@ -68,7 +74,7 @@ struct LedgerCacheSaverTest : virtual testing::Test {
TEST_F(LedgerCacheSaverTest, SaveSuccessfully)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce(testing::Return(std::expected<void, std::string>{}));
@@ -79,7 +85,7 @@ TEST_F(LedgerCacheSaverTest, SaveSuccessfully)
TEST_F(LedgerCacheSaverTest, SaveWithError)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
EXPECT_CALL(cache, saveToFile(kFILE_PATH))
@@ -91,7 +97,7 @@ TEST_F(LedgerCacheSaverTest, SaveWithError)
TEST_F(LedgerCacheSaverTest, NoSaveWhenPathNotConfigured)
{
auto const config = generateConfig(false);
auto const config = generateConfig(/* cacheFilePathHasValue = */ false, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
saver.save();
@@ -100,7 +106,7 @@ TEST_F(LedgerCacheSaverTest, NoSaveWhenPathNotConfigured)
TEST_F(LedgerCacheSaverTest, DestructorWaitsForCompletion)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
std::binary_semaphore semaphore{1};
std::atomic_bool saveCompleted{false};
@@ -123,7 +129,7 @@ TEST_F(LedgerCacheSaverTest, DestructorWaitsForCompletion)
TEST_F(LedgerCacheSaverTest, WaitToFinishCanBeCalledMultipleTimes)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
EXPECT_CALL(cache, saveToFile(kFILE_PATH));
@@ -135,7 +141,7 @@ TEST_F(LedgerCacheSaverTest, WaitToFinishCanBeCalledMultipleTimes)
TEST_F(LedgerCacheSaverTest, WaitToFinishWithoutSaveIsSafe)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
EXPECT_NO_THROW(saver.waitToFinish());
}
@@ -144,13 +150,61 @@ struct LedgerCacheSaverAssertTest : LedgerCacheSaverTest, common::util::WithMock
TEST_F(LedgerCacheSaverAssertTest, MultipleSavesNotAllowed)
{
auto const config = generateConfig(true);
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
LedgerCacheSaver saver{config, cache};
std::binary_semaphore semaphore{0};
EXPECT_CALL(cache, saveToFile(kFILE_PATH));
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&](auto&&) {
semaphore.acquire();
return std::expected<void, std::string>{};
});
saver.save();
EXPECT_CLIO_ASSERT_FAIL({ saver.save(); });
semaphore.release();
saver.waitToFinish();
}
TEST_F(LedgerCacheSaverTest, SyncSaveWaitsForCompletion)
{
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ false);
std::atomic_bool saveCompleted{false};
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&]() {
std::this_thread::sleep_for(std::chrono::milliseconds(50));
saveCompleted = true;
return std::expected<void, std::string>{};
});
LedgerCacheSaver saver{config, cache};
saver.save();
EXPECT_TRUE(saveCompleted);
}
TEST_F(LedgerCacheSaverTest, AsyncSaveDoesNotWaitForCompletion)
{
auto const config = generateConfig(/* cacheFilePathHasValue = */ true, /* asyncSave = */ true);
std::binary_semaphore saveStarted{0};
std::binary_semaphore continueExecution{0};
std::atomic_bool saveCompleted{false};
EXPECT_CALL(cache, saveToFile(kFILE_PATH)).WillOnce([&]() {
saveStarted.release();
continueExecution.acquire();
saveCompleted = true;
return std::expected<void, std::string>{};
});
LedgerCacheSaver saver{config, cache};
saver.save();
EXPECT_TRUE(saveStarted.try_acquire_for(std::chrono::seconds{5}));
EXPECT_FALSE(saveCompleted);
continueExecution.release();
saver.waitToFinish();
EXPECT_TRUE(saveCompleted);
}

View File

@@ -985,13 +985,13 @@ static auto
generateSingleFlagTests()
{
return std::vector<SingleFlagTest>{
{"Locked", ripple::lsfMPTLocked, "mpt_locked"},
{"CanLock", ripple::lsfMPTCanLock, "mpt_can_lock"},
{"RequireAuth", ripple::lsfMPTRequireAuth, "mpt_require_auth"},
{"CanEscrow", ripple::lsfMPTCanEscrow, "mpt_can_escrow"},
{"CanTrade", ripple::lsfMPTCanTrade, "mpt_can_trade"},
{"CanTransfer", ripple::lsfMPTCanTransfer, "mpt_can_transfer"},
{"CanClawback", ripple::lsfMPTCanClawback, "mpt_can_clawback"},
{.testName = "Locked", .flag = ripple::lsfMPTLocked, .expectedJsonKey = "mpt_locked"},
{.testName = "CanLock", .flag = ripple::lsfMPTCanLock, .expectedJsonKey = "mpt_can_lock"},
{.testName = "RequireAuth", .flag = ripple::lsfMPTRequireAuth, .expectedJsonKey = "mpt_require_auth"},
{.testName = "CanEscrow", .flag = ripple::lsfMPTCanEscrow, .expectedJsonKey = "mpt_can_escrow"},
{.testName = "CanTrade", .flag = ripple::lsfMPTCanTrade, .expectedJsonKey = "mpt_can_trade"},
{.testName = "CanTransfer", .flag = ripple::lsfMPTCanTransfer, .expectedJsonKey = "mpt_can_transfer"},
{.testName = "CanClawback", .flag = ripple::lsfMPTCanClawback, .expectedJsonKey = "mpt_can_clawback"},
};
}
@@ -1059,14 +1059,30 @@ static auto
generateSingleMutableFlagTests()
{
return std::vector<SingleMutableFlagTest>{
{"CanMutateCanLock", ripple::lsmfMPTCanMutateCanLock, "mpt_can_mutate_can_lock"},
{"CanMutateRequireAuth", ripple::lsmfMPTCanMutateRequireAuth, "mpt_can_mutate_require_auth"},
{"CanMutateCanEscrow", ripple::lsmfMPTCanMutateCanEscrow, "mpt_can_mutate_can_escrow"},
{"CanMutateCanTrade", ripple::lsmfMPTCanMutateCanTrade, "mpt_can_mutate_can_trade"},
{"CanMutateCanTransfer", ripple::lsmfMPTCanMutateCanTransfer, "mpt_can_mutate_can_transfer"},
{"CanMutateCanClawback", ripple::lsmfMPTCanMutateCanClawback, "mpt_can_mutate_can_clawback"},
{"CanMutateMetadata", ripple::lsmfMPTCanMutateMetadata, "mpt_can_mutate_metadata"},
{"CanMutateTransferFee", ripple::lsmfMPTCanMutateTransferFee, "mpt_can_mutate_transfer_fee"},
{.testName = "CanMutateCanLock",
.mutableFlag = ripple::lsmfMPTCanMutateCanLock,
.expectedJsonKey = "mpt_can_mutate_can_lock"},
{.testName = "CanMutateRequireAuth",
.mutableFlag = ripple::lsmfMPTCanMutateRequireAuth,
.expectedJsonKey = "mpt_can_mutate_require_auth"},
{.testName = "CanMutateCanEscrow",
.mutableFlag = ripple::lsmfMPTCanMutateCanEscrow,
.expectedJsonKey = "mpt_can_mutate_can_escrow"},
{.testName = "CanMutateCanTrade",
.mutableFlag = ripple::lsmfMPTCanMutateCanTrade,
.expectedJsonKey = "mpt_can_mutate_can_trade"},
{.testName = "CanMutateCanTransfer",
.mutableFlag = ripple::lsmfMPTCanMutateCanTransfer,
.expectedJsonKey = "mpt_can_mutate_can_transfer"},
{.testName = "CanMutateCanClawback",
.mutableFlag = ripple::lsmfMPTCanMutateCanClawback,
.expectedJsonKey = "mpt_can_mutate_can_clawback"},
{.testName = "CanMutateMetadata",
.mutableFlag = ripple::lsmfMPTCanMutateMetadata,
.expectedJsonKey = "mpt_can_mutate_metadata"},
{.testName = "CanMutateTransferFee",
.mutableFlag = ripple::lsmfMPTCanMutateTransferFee,
.expectedJsonKey = "mpt_can_mutate_transfer_fee"},
};
}

View File

@@ -0,0 +1,759 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "util/Assert.hpp"
#include "util/Channel.hpp"
#include "util/Mutex.hpp"
#include "util/OverloadSet.hpp"
#include "util/Spawn.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp>
#include <boost/asio/spawn.hpp>
#include <boost/asio/steady_timer.hpp>
#include <boost/asio/strand.hpp>
#include <boost/asio/thread_pool.hpp>
#include <boost/system/detail/error_code.hpp>
#include <fmt/format.h>
#include <gmock/gmock.h>
#include <gtest/gtest.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstddef>
#include <memory>
#include <optional>
#include <semaphore>
#include <string>
#include <utility>
#include <variant>
#include <vector>
using namespace testing;
namespace {
constexpr auto kDEFAULT_THREAD_POOL_SIZE = 4;
constexpr auto kTEST_TIMEOUT = std::chrono::seconds{10};
constexpr auto kNUM_SENDERS = 3uz;
constexpr auto kNUM_RECEIVERS = 3uz;
constexpr auto kVALUES_PER_SENDER = 500uz;
constexpr auto kTOTAL_EXPECTED = kNUM_SENDERS * kVALUES_PER_SENDER;
enum class ContextType { IOContext, ThreadPool };
constexpr int
generateValue(std::size_t senderId, std::size_t i)
{
return static_cast<int>((senderId * 100) + i);
}
std::vector<int>
generateExpectedValues()
{
std::vector<int> expectedValues;
expectedValues.reserve(kTOTAL_EXPECTED);
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
expectedValues.push_back(generateValue(senderId, i));
}
}
std::ranges::sort(expectedValues);
return expectedValues;
}
std::vector<int> const kEXPECTED_VALUES = generateExpectedValues();
std::string
contextTypeToString(ContextType type)
{
return type == ContextType::IOContext ? "IOContext" : "ThreadPool";
}
class ContextWrapper {
public:
using ContextVariant = std::variant<boost::asio::io_context, boost::asio::thread_pool>;
explicit ContextWrapper(ContextType type)
: context_([type] {
if (type == ContextType::IOContext)
return ContextVariant(std::in_place_type_t<boost::asio::io_context>());
if (type == ContextType::ThreadPool)
return ContextVariant(std::in_place_type_t<boost::asio::thread_pool>(), kDEFAULT_THREAD_POOL_SIZE);
ASSERT(false, "Unknown new type of context");
std::unreachable();
}())
{
}
template <typename Fn>
void
withExecutor(Fn&& fn)
{
std::visit(std::forward<Fn>(fn), context_);
}
void
run()
{
std::visit(
util::OverloadSet{
[](boost::asio::io_context& context) { context.run_for(kTEST_TIMEOUT); },
[](boost::asio::thread_pool& context) { context.join(); },
},
context_
);
}
private:
ContextVariant context_;
};
} // namespace
class ChannelSpawnTest : public TestWithParam<ContextType> {
protected:
ChannelSpawnTest() : context_(GetParam())
{
}
ContextWrapper context_;
};
class ChannelCallbackTest : public TestWithParam<ContextType> {
protected:
ChannelCallbackTest() : context_(GetParam())
{
}
ContextWrapper context_;
};
TEST_P(ChannelSpawnTest, MultipleSendersOneReceiver)
{
context_.withExecutor([this](auto& executor) {
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
util::Mutex<std::vector<int>> receivedValues;
util::spawn(executor, [&receiver, &receivedValues](boost::asio::yield_context yield) mutable {
while (true) {
auto value = receiver.asyncReceive(yield);
if (not value.has_value())
break;
receivedValues.lock()->push_back(*value);
}
});
{
auto localSender = std::move(sender);
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
util::spawn(executor, [senderCopy = localSender, senderId](boost::asio::yield_context yield) mutable {
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
if (not senderCopy.asyncSend(generateValue(senderId, i), yield))
break;
}
});
}
}
context_.run();
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
std::ranges::sort(receivedValues.lock().get());
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
});
}
TEST_P(ChannelSpawnTest, MultipleSendersMultipleReceivers)
{
context_.withExecutor([this](auto& executor) {
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
util::Mutex<std::vector<int>> receivedValues;
std::vector<decltype(receiver)> receivers(kNUM_RECEIVERS, receiver);
for (auto receiverId = 0uz; receiverId < kNUM_RECEIVERS; ++receiverId) {
util::spawn(
executor,
[&receiverRef = receivers[receiverId], &receivedValues](boost::asio::yield_context yield) mutable {
while (true) {
auto value = receiverRef.asyncReceive(yield);
if (not value.has_value())
break;
receivedValues.lock()->push_back(*value);
}
}
);
}
{
auto localSender = std::move(sender);
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
util::spawn(executor, [senderCopy = localSender, senderId](boost::asio::yield_context yield) mutable {
for (auto i = 0uz; i < kVALUES_PER_SENDER; ++i) {
auto const value = generateValue(senderId, i);
if (not senderCopy.asyncSend(value, yield))
break;
}
});
}
}
context_.run();
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
std::ranges::sort(receivedValues.lock().get());
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
});
}
TEST_P(ChannelSpawnTest, ChannelClosureScenarios)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context yield) mutable {
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
EXPECT_FALSE(receiver.isClosed());
bool success = sender.asyncSend(42, yield);
EXPECT_TRUE(success);
auto value = receiver.asyncReceive(yield);
EXPECT_TRUE(value.has_value());
EXPECT_EQ(*value, 42);
{
[[maybe_unused]] auto tempSender = std::move(sender);
}
EXPECT_TRUE(receiver.isClosed());
auto closedValue = receiver.asyncReceive(yield);
EXPECT_FALSE(closedValue.has_value());
testCompleted = true;
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
TEST_P(ChannelSpawnTest, TrySendTryReceiveMethods)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context) mutable {
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
EXPECT_FALSE(receiver.tryReceive().has_value());
EXPECT_TRUE(sender.trySend(42));
EXPECT_TRUE(sender.trySend(43));
EXPECT_TRUE(sender.trySend(44));
EXPECT_FALSE(sender.trySend(45)); // channel full
auto value1 = receiver.tryReceive();
EXPECT_TRUE(value1.has_value());
EXPECT_EQ(*value1, 42);
auto value2 = receiver.tryReceive();
EXPECT_TRUE(value2.has_value());
EXPECT_EQ(*value2, 43);
EXPECT_TRUE(sender.trySend(46));
auto value3 = receiver.tryReceive();
EXPECT_TRUE(value3.has_value());
EXPECT_EQ(*value3, 44);
auto value4 = receiver.tryReceive();
EXPECT_TRUE(value4.has_value());
EXPECT_EQ(*value4, 46);
EXPECT_FALSE(receiver.tryReceive().has_value());
testCompleted = true;
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
TEST_P(ChannelSpawnTest, TryMethodsWithClosedChannel)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context) mutable {
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
EXPECT_TRUE(sender.trySend(42));
EXPECT_TRUE(sender.trySend(43));
{
[[maybe_unused]] auto tempSender = std::move(sender);
}
EXPECT_TRUE(receiver.isClosed());
auto value1 = receiver.tryReceive();
EXPECT_TRUE(value1.has_value());
EXPECT_EQ(*value1, 42);
auto value2 = receiver.tryReceive();
EXPECT_TRUE(value2.has_value());
EXPECT_EQ(*value2, 43);
EXPECT_FALSE(receiver.tryReceive().has_value());
testCompleted = true;
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
INSTANTIATE_TEST_SUITE_P(
SpawnTests,
ChannelSpawnTest,
Values(ContextType::IOContext, ContextType::ThreadPool),
[](TestParamInfo<ContextType> const& info) { return contextTypeToString(info.param); }
);
TEST_P(ChannelCallbackTest, MultipleSendersOneReceiver)
{
context_.withExecutor([this](auto& executor) {
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
util::Mutex<std::vector<int>> receivedValues;
auto receiveNext = [&receiver, &receivedValues](this auto&& self) -> void {
if (receivedValues.lock()->size() >= kTOTAL_EXPECTED)
return;
receiver.asyncReceive([&receivedValues, self = std::forward<decltype(self)>(self)](auto value) {
if (value.has_value()) {
receivedValues.lock()->push_back(*value);
self();
}
});
};
boost::asio::post(executor, receiveNext);
{
auto localSender = std::move(sender);
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
auto senderCopy = localSender;
boost::asio::post(executor, [senderCopy = std::move(senderCopy), senderId, &executor]() mutable {
auto sendNext = [senderCopy = std::move(senderCopy),
senderId,
&executor](this auto&& self, std::size_t i) -> void {
if (i >= kVALUES_PER_SENDER)
return;
senderCopy.asyncSend(
generateValue(senderId, i),
[self = std::forward<decltype(self)>(self), &executor, i](bool success) mutable {
if (success)
boost::asio::post(executor, [self = std::move(self), i]() mutable { self(i + 1); });
}
);
};
sendNext(0);
});
}
}
context_.run();
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
std::ranges::sort(receivedValues.lock().get());
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
});
}
TEST_P(ChannelCallbackTest, MultipleSendersMultipleReceivers)
{
context_.withExecutor([this](auto& executor) {
auto [sender, receiver] = util::Channel<int>::create(executor, 10);
util::Mutex<std::vector<int>> receivedValues;
std::vector<decltype(receiver)> receivers(kNUM_RECEIVERS, receiver);
for (auto receiverId = 0uz; receiverId < kNUM_RECEIVERS; ++receiverId) {
auto& receiverRef = receivers[receiverId];
auto receiveNext = [&receiverRef, &receivedValues](this auto&& self) -> void {
receiverRef.asyncReceive([&receivedValues, self = std::forward<decltype(self)>(self)](auto value) {
if (value.has_value()) {
receivedValues.lock()->push_back(*value);
self();
}
});
};
boost::asio::post(executor, receiveNext);
}
{
auto localSender = std::move(sender);
for (auto senderId = 0uz; senderId < kNUM_SENDERS; ++senderId) {
auto senderCopy = localSender;
boost::asio::post(executor, [senderCopy = std::move(senderCopy), senderId, &executor]() mutable {
auto sendNext = [senderCopy = std::move(senderCopy),
senderId,
&executor](this auto&& self, std::size_t i) -> void {
if (i >= kVALUES_PER_SENDER)
return;
senderCopy.asyncSend(
generateValue(senderId, i),
[self = std::forward<decltype(self)>(self), &executor, i](bool success) mutable {
if (success)
boost::asio::post(executor, [self = std::move(self), i]() mutable { self(i + 1); });
}
);
};
sendNext(0);
});
}
}
context_.run();
EXPECT_EQ(receivedValues.lock()->size(), kTOTAL_EXPECTED);
std::ranges::sort(receivedValues.lock().get());
EXPECT_EQ(receivedValues.lock().get(), kEXPECTED_VALUES);
});
}
TEST_P(ChannelCallbackTest, ChannelClosureScenarios)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
auto senderPtr = std::make_shared<std::optional<decltype(sender)>>(std::move(sender));
EXPECT_FALSE(receiverPtr->isClosed());
senderPtr->value().asyncSend(42, [&executor, receiverPtr, senderPtr, &testCompleted](bool success) {
EXPECT_TRUE(success);
receiverPtr->asyncReceive([&executor, receiverPtr, senderPtr, &testCompleted](auto value) {
EXPECT_TRUE(value.has_value());
EXPECT_EQ(*value, 42);
boost::asio::post(executor, [&executor, receiverPtr, senderPtr, &testCompleted]() {
senderPtr->reset();
EXPECT_TRUE(receiverPtr->isClosed());
boost::asio::post(executor, [receiverPtr, &testCompleted]() {
receiverPtr->asyncReceive([&testCompleted](auto closedValue) {
EXPECT_FALSE(closedValue.has_value());
testCompleted = true;
});
});
});
});
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
TEST_P(ChannelCallbackTest, TrySendTryReceiveMethods)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
auto [sender, receiver] = util::Channel<int>::create(executor, 2);
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
auto senderPtr = std::make_shared<decltype(sender)>(std::move(sender));
boost::asio::post(executor, [receiverPtr, senderPtr, &testCompleted]() {
EXPECT_FALSE(receiverPtr->tryReceive().has_value());
EXPECT_TRUE(senderPtr->trySend(100));
EXPECT_TRUE(senderPtr->trySend(101));
EXPECT_FALSE(senderPtr->trySend(102)); // channel full
auto value1 = receiverPtr->tryReceive();
EXPECT_TRUE(value1.has_value());
EXPECT_EQ(*value1, 100);
EXPECT_TRUE(senderPtr->trySend(103));
auto value2 = receiverPtr->tryReceive();
EXPECT_TRUE(value2.has_value());
EXPECT_EQ(*value2, 101);
auto value3 = receiverPtr->tryReceive();
EXPECT_TRUE(value3.has_value());
EXPECT_EQ(*value3, 103);
testCompleted = true;
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
TEST_P(ChannelCallbackTest, TryMethodsWithClosedChannel)
{
context_.withExecutor([this](auto& executor) {
std::atomic_bool testCompleted{false};
auto [sender, receiver] = util::Channel<int>::create(executor, 3);
auto receiverPtr = std::make_shared<decltype(receiver)>(std::move(receiver));
auto senderPtr = std::make_shared<std::optional<decltype(sender)>>(std::move(sender));
boost::asio::post(executor, [receiverPtr, senderPtr, &testCompleted]() {
EXPECT_TRUE(senderPtr->value().trySend(100));
EXPECT_TRUE(senderPtr->value().trySend(101));
senderPtr->reset();
EXPECT_TRUE(receiverPtr->isClosed());
auto value1 = receiverPtr->tryReceive();
EXPECT_TRUE(value1.has_value());
EXPECT_EQ(*value1, 100);
auto value2 = receiverPtr->tryReceive();
EXPECT_TRUE(value2.has_value());
EXPECT_EQ(*value2, 101);
EXPECT_FALSE(receiverPtr->tryReceive().has_value());
testCompleted = true;
});
context_.run();
EXPECT_TRUE(testCompleted);
});
}
INSTANTIATE_TEST_SUITE_P(
CallbackTests,
ChannelCallbackTest,
Values(ContextType::IOContext, ContextType::ThreadPool),
[](TestParamInfo<ContextType> const& info) { return contextTypeToString(info.param); }
);
TEST(ChannelTest, MultipleSenderCopiesErrorHandling)
{
boost::asio::io_context executor;
bool testCompleted = false;
util::spawn(executor, [&executor, &testCompleted](boost::asio::yield_context yield) mutable {
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
bool success = sender.asyncSend(42, yield);
EXPECT_TRUE(success);
auto value = receiver.asyncReceive(yield);
EXPECT_TRUE(value.has_value());
EXPECT_EQ(*value, 42);
auto senderCopy = sender;
{
[[maybe_unused]] auto tempSender = std::move(sender);
// tempSender destroyed here, but senderCopy still exists
}
EXPECT_FALSE(receiver.isClosed());
{
[[maybe_unused]] auto tempSender = std::move(senderCopy);
// now all senders are destroyed, channel should close
}
EXPECT_TRUE(receiver.isClosed());
auto closedValue = receiver.asyncReceive(yield);
EXPECT_FALSE(closedValue.has_value());
testCompleted = true;
});
executor.run_for(kTEST_TIMEOUT);
EXPECT_TRUE(testCompleted);
}
TEST(ChannelTest, ChannelClosesWhenAllSendersDestroyed)
{
boost::asio::io_context executor;
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
EXPECT_FALSE(receiver.isClosed());
auto senderCopy = sender;
{
[[maybe_unused]] auto temp = std::move(sender);
}
EXPECT_FALSE(receiver.isClosed()); // one sender still exists
{
[[maybe_unused]] auto temp = std::move(senderCopy);
}
EXPECT_TRUE(receiver.isClosed()); // all senders destroyed
}
TEST(ChannelTest, ChannelClosesWhenAllReceiversDestroyed)
{
boost::asio::io_context executor;
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
EXPECT_TRUE(sender.trySend(42));
auto receiverCopy = receiver;
{
[[maybe_unused]] auto temp = std::move(receiver);
}
EXPECT_TRUE(sender.trySend(43)); // one receiver still exists, can send
{
[[maybe_unused]] auto temp = std::move(receiverCopy);
}
EXPECT_FALSE(sender.trySend(44)); // all receivers destroyed, channel closed
}
TEST(ChannelTest, ChannelPreservesOrderFIFO)
{
boost::asio::io_context executor;
bool testCompleted = false;
std::vector<int> const valuesToSend = {42, 7, 99, 13, 5, 88, 21, 3, 67, 54};
util::spawn(executor, [&executor, &testCompleted, &valuesToSend](boost::asio::yield_context yield) mutable {
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
std::vector<int> receivedValues;
// Spawn a receiver coroutine that collects all values
util::spawn(executor, [&receiver, &receivedValues](boost::asio::yield_context yield) mutable {
auto value = receiver.asyncReceive(yield);
while (value.has_value()) {
receivedValues.push_back(*value);
value = receiver.asyncReceive(yield);
}
});
// Send all values
for (int const value : valuesToSend) {
EXPECT_TRUE(sender.asyncSend(value, yield));
}
// Close sender to signal end of data
{
[[maybe_unused]] auto temp = std::move(sender);
}
// Give receiver time to process all values
boost::asio::steady_timer timer(executor, std::chrono::milliseconds{50});
timer.async_wait(yield);
// Verify received values match sent values in the same order
EXPECT_EQ(receivedValues, valuesToSend);
testCompleted = true;
});
executor.run_for(kTEST_TIMEOUT);
EXPECT_TRUE(testCompleted);
}
TEST(ChannelTest, AsyncReceiveWakesUpWhenSenderDestroyed)
{
boost::asio::io_context executor;
bool testCompleted = false;
auto [sender, receiver] = util::Channel<int>::create(executor, 5);
auto senderPtr = std::make_shared<decltype(sender)>(std::move(sender));
util::spawn(
executor,
[&receiver, senderPtr = std::move(senderPtr), &testCompleted, &executor](boost::asio::yield_context) mutable {
// Start receiving - this will block because no data is sent
auto receiveTask = [&receiver, &testCompleted](boost::asio::yield_context yield) {
auto const value = receiver.asyncReceive(yield);
EXPECT_FALSE(value.has_value()); // Should receive nullopt when sender is destroyed
testCompleted = true;
};
util::spawn(executor, receiveTask);
senderPtr.reset();
}
);
executor.run_for(kTEST_TIMEOUT);
EXPECT_TRUE(testCompleted);
}
// This test verifies the workaround for a bug in boost::asio::experimental::concurrent_channel where close() does not
// cancel pending async operations. Our Channel wrapper calls cancel() after close() to ensure pending operations are
// unblocked.
// See: https://github.com/chriskohlhoff/asio/issues/1575
TEST(ChannelTest, PendingAsyncSendsAreCancelledOnClose)
{
boost::asio::thread_pool pool{4};
static constexpr auto kPENDING_NUM_SENDERS = 10uz;
// Channel with capacity 0 - all sends will block waiting for a receiver
auto [sender, receiver] = util::Channel<int>::create(pool, 0);
std::atomic<std::size_t> completedSends{0};
std::counting_semaphore<kPENDING_NUM_SENDERS> semaphore{kPENDING_NUM_SENDERS};
// Spawn multiple senders that will all block (no receiver is consuming)
for (auto i = 0uz; i < kPENDING_NUM_SENDERS; ++i) {
util::spawn(
pool, [senderCopy = sender, i, &completedSends, &semaphore](boost::asio::yield_context yield) mutable {
semaphore.release(1);
EXPECT_FALSE(senderCopy.asyncSend(static_cast<int>(i), yield));
++completedSends;
}
);
}
semaphore.acquire();
// Close the channel by destroying the only receiver we have.
// Our workaround calls cancel() after close() to unblock pending operations
{
[[maybe_unused]] auto r = std::move(receiver);
}
// All senders should complete (unblocked by our cancel() workaround)
pool.join();
// All sends should have completed (returned false due to closed channel)
EXPECT_EQ(completedSends, kPENDING_NUM_SENDERS);
}
INSTANTIATE_CHANNEL_FOR_CLANG(int);