Compare commits

..

6 Commits

Author SHA1 Message Date
Alex Kremer
d053a1f8c8 Merge branch 'develop' into release/2.6.0 2025-10-20 14:05:37 +01:00
Alex Kremer
c12601c0f5 Merge branch 'develop' into release/2.6.0 2025-10-09 16:58:32 +01:00
Alex Kremer
381009807f Merge remote-tracking branch 'origin/develop' into release/2.6.0 2025-10-08 13:52:49 +01:00
Alex Kremer
f530216688 Merge branch 'develop' into release/2.6.0 2025-10-03 16:30:41 +01:00
Alex Kremer
0cccb1a2c0 Merge branch 'develop' into release/2.6.0 2025-10-01 13:51:22 +01:00
Alex Kremer
1803990aa8 Use xrpl/2.6.1-rc1 2025-09-18 17:07:59 +01:00
284 changed files with 8711 additions and 8206 deletions

View File

@@ -49,7 +49,6 @@ IndentFunctionDeclarationAfterType: false
IndentWidth: 4 IndentWidth: 4
IndentWrappedFunctionNames: false IndentWrappedFunctionNames: false
IndentRequiresClause: true IndentRequiresClause: true
InsertNewlineAtEOF: true
RequiresClausePosition: OwnLine RequiresClausePosition: OwnLine
KeepEmptyLinesAtTheStartOfBlocks: false KeepEmptyLinesAtTheStartOfBlocks: false
MaxEmptyLinesToKeep: 1 MaxEmptyLinesToKeep: 1

View File

@@ -54,7 +54,7 @@ format:
_help_max_pargs_hwrap: _help_max_pargs_hwrap:
- If a positional argument group contains more than this many - If a positional argument group contains more than this many
- arguments, then force it to a vertical layout. - arguments, then force it to a vertical layout.
max_pargs_hwrap: 5 max_pargs_hwrap: 6
_help_max_rows_cmdline: _help_max_rows_cmdline:
- If a cmdline positional group consumes more than this many - If a cmdline positional group consumes more than this many
- lines without nesting, then invalidate the layout (and nest) - lines without nesting, then invalidate the layout (and nest)

View File

@@ -5,27 +5,25 @@ inputs:
targets: targets:
description: Space-separated build target names description: Space-separated build target names
default: all default: all
nproc_subtract: subtract_threads:
description: The number of processors to subtract when calculating parallelism. description: An option for the action get-threads-number.
required: true required: true
default: "0" default: "0"
runs: runs:
using: composite using: composite
steps: steps:
- name: Get number of processors - name: Get number of threads
uses: XRPLF/actions/.github/actions/get-nproc@046b1620f6bfd6cd0985dc82c3df02786801fe0a uses: ./.github/actions/get-threads-number
id: nproc id: number_of_threads
with: with:
subtract: ${{ inputs.nproc_subtract }} subtract_threads: ${{ inputs.subtract_threads }}
- name: Build targets - name: Build targets
shell: bash shell: bash
env:
CMAKE_TARGETS: ${{ inputs.targets }}
run: | run: |
cd build cd build
cmake \ cmake \
--build . \ --build . \
--parallel "${{ steps.nproc.outputs.nproc }}" \ --parallel "${{ steps.number_of_threads.outputs.threads_number }}" \
--target ${CMAKE_TARGETS} --target ${{ inputs.targets }}

View File

@@ -47,12 +47,12 @@ runs:
username: ${{ github.repository_owner }} username: ${{ github.repository_owner }}
password: ${{ env.GITHUB_TOKEN }} password: ${{ env.GITHUB_TOKEN }}
- uses: docker/setup-qemu-action@c7c53464625b32c7a7e944ae62b3e17d2b600130 # v3.7.0 - uses: docker/setup-qemu-action@29109295f81e9208d7d86ff1c6c12d2833863392 # v3.6.0
with: with:
cache-image: false cache-image: false
- uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 - uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1
- uses: docker/metadata-action@318604b99e75e41977312d83839a89be02ca4893 # v5.9.0 - uses: docker/metadata-action@c1e51972afc2121e065aed6d45c65596fe445f3f # v5.8.0
id: meta id: meta
with: with:
images: ${{ inputs.images }} images: ${{ inputs.images }}

View File

@@ -1,41 +0,0 @@
name: Cache key
description: Generate cache key for ccache
inputs:
conan_profile:
description: Conan profile name
required: true
build_type:
description: Current build type (e.g. Release, Debug)
required: true
default: Release
code_coverage:
description: Whether code coverage is on
required: true
default: "false"
outputs:
key:
description: Generated cache key for ccache
value: ${{ steps.key_without_commit.outputs.key }}-${{ steps.git_common_ancestor.outputs.commit }}
restore_keys:
description: Cache restore keys for fallback
value: ${{ steps.key_without_commit.outputs.key }}
runs:
using: composite
steps:
- name: Find common commit
id: git_common_ancestor
uses: ./.github/actions/git-common-ancestor
- name: Set cache key without commit
id: key_without_commit
shell: bash
env:
RUNNER_OS: ${{ runner.os }}
BUILD_TYPE: ${{ inputs.build_type }}
CODE_COVERAGE: ${{ inputs.code_coverage == 'true' && '-code_coverage' || '' }}
CONAN_PROFILE: ${{ inputs.conan_profile }}
run: |
echo "key=clio-ccache-${RUNNER_OS}-${BUILD_TYPE}${CODE_COVERAGE}-${CONAN_PROFILE}-develop" >> "${GITHUB_OUTPUT}"

View File

@@ -44,7 +44,6 @@ runs:
- name: Run cmake - name: Run cmake
shell: bash shell: bash
env: env:
BUILD_DIR: "${{ inputs.build_dir }}"
BUILD_TYPE: "${{ inputs.build_type }}" BUILD_TYPE: "${{ inputs.build_type }}"
SANITIZER_OPTION: |- SANITIZER_OPTION: |-
${{ endsWith(inputs.conan_profile, '.asan') && '-Dsan=address' || ${{ endsWith(inputs.conan_profile, '.asan') && '-Dsan=address' ||
@@ -59,7 +58,7 @@ runs:
PACKAGE: "${{ inputs.package == 'true' && 'ON' || 'OFF' }}" PACKAGE: "${{ inputs.package == 'true' && 'ON' || 'OFF' }}"
run: | run: |
cmake \ cmake \
-B "${BUILD_DIR}" \ -B ${{inputs.build_dir}} \
-S . \ -S . \
-G Ninja \ -G Ninja \
-DCMAKE_TOOLCHAIN_FILE:FILEPATH=build/generators/conan_toolchain.cmake \ -DCMAKE_TOOLCHAIN_FILE:FILEPATH=build/generators/conan_toolchain.cmake \

View File

@@ -24,7 +24,7 @@ runs:
-j8 --exclude-throw-branches -j8 --exclude-throw-branches
- name: Archive coverage report - name: Archive coverage report
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: coverage-report.xml name: coverage-report.xml
path: build/coverage_report.xml path: build/coverage_report.xml

View File

@@ -28,14 +28,11 @@ runs:
- name: Run conan - name: Run conan
shell: bash shell: bash
env: env:
BUILD_DIR: "${{ inputs.build_dir }}"
CONAN_BUILD_OPTION: "${{ inputs.force_conan_source_build == 'true' && '*' || 'missing' }}" CONAN_BUILD_OPTION: "${{ inputs.force_conan_source_build == 'true' && '*' || 'missing' }}"
BUILD_TYPE: "${{ inputs.build_type }}"
CONAN_PROFILE: "${{ inputs.conan_profile }}"
run: | run: |
conan \ conan \
install . \ install . \
-of "${BUILD_DIR}" \ -of build \
-b "${CONAN_BUILD_OPTION}" \ -b "$CONAN_BUILD_OPTION" \
-s "build_type=${BUILD_TYPE}" \ -s "build_type=${{ inputs.build_type }}" \
--profile:all "${CONAN_PROFILE}" --profile:all "${{ inputs.conan_profile }}"

View File

@@ -28,17 +28,12 @@ runs:
- name: Create an issue - name: Create an issue
id: create_issue id: create_issue
shell: bash shell: bash
env:
ISSUE_BODY: ${{ inputs.body }}
ISSUE_ASSIGNEES: ${{ inputs.assignees }}
ISSUE_LABELS: ${{ inputs.labels }}
ISSUE_TITLE: ${{ inputs.title }}
run: | run: |
echo -e "${ISSUE_BODY}" > issue.md echo -e '${{ inputs.body }}' > issue.md
gh issue create \ gh issue create \
--assignee "${ISSUE_ASSIGNEES}" \ --assignee '${{ inputs.assignees }}' \
--label "${ISSUE_LABELS}" \ --label '${{ inputs.labels }}' \
--title "${ISSUE_TITLE}" \ --title '${{ inputs.title }}' \
--body-file ./issue.md \ --body-file ./issue.md \
> create_issue.log > create_issue.log
created_issue="$(sed 's|.*/||' create_issue.log)" created_issue="$(sed 's|.*/||' create_issue.log)"

View File

@@ -0,0 +1,36 @@
name: Get number of threads
description: Determines number of threads to use on macOS and Linux
inputs:
subtract_threads:
description: How many threads to subtract from the calculated number
required: true
default: "0"
outputs:
threads_number:
description: Number of threads to use
value: ${{ steps.number_of_threads_export.outputs.num }}
runs:
using: composite
steps:
- name: Get number of threads on mac
id: mac_threads
if: ${{ runner.os == 'macOS' }}
shell: bash
run: echo "num=$(($(sysctl -n hw.logicalcpu) - 2))" >> $GITHUB_OUTPUT
- name: Get number of threads on Linux
id: linux_threads
if: ${{ runner.os == 'Linux' }}
shell: bash
run: echo "num=$(($(nproc) - 2))" >> $GITHUB_OUTPUT
- name: Shift and export number of threads
id: number_of_threads_export
shell: bash
run: |
num_of_threads="${{ steps.mac_threads.outputs.num || steps.linux_threads.outputs.num }}"
shift_by="${{ inputs.subtract_threads }}"
shifted="$((num_of_threads - shift_by))"
echo "num=$(( shifted > 1 ? shifted : 1 ))" >> $GITHUB_OUTPUT

View File

@@ -0,0 +1,38 @@
name: Restore cache
description: Find and restores ccache cache
inputs:
conan_profile:
description: Conan profile name
required: true
ccache_dir:
description: Path to .ccache directory
required: true
build_type:
description: Current build type (e.g. Release, Debug)
required: true
default: Release
code_coverage:
description: Whether code coverage is on
required: true
default: "false"
outputs:
ccache_cache_hit:
description: True if ccache cache has been downloaded
value: ${{ steps.ccache_cache.outputs.cache-hit }}
runs:
using: composite
steps:
- name: Find common commit
id: git_common_ancestor
uses: ./.github/actions/git-common-ancestor
- name: Restore ccache cache
uses: actions/cache/restore@v4
id: ccache_cache
if: ${{ env.CCACHE_DISABLE != '1' }}
with:
path: ${{ inputs.ccache_dir }}
key: clio-ccache-${{ runner.os }}-${{ inputs.build_type }}${{ inputs.code_coverage == 'true' && '-code_coverage' || '' }}-${{ inputs.conan_profile }}-develop-${{ steps.git_common_ancestor.outputs.commit }}

38
.github/actions/save-cache/action.yml vendored Normal file
View File

@@ -0,0 +1,38 @@
name: Save cache
description: Save ccache cache for develop branch
inputs:
conan_profile:
description: Conan profile name
required: true
ccache_dir:
description: Path to .ccache directory
required: true
build_type:
description: Current build type (e.g. Release, Debug)
required: true
default: Release
code_coverage:
description: Whether code coverage is on
required: true
default: "false"
ccache_cache_hit:
description: Whether ccache cache has been downloaded
required: true
ccache_cache_miss_rate:
description: How many ccache cache misses happened
runs:
using: composite
steps:
- name: Find common commit
id: git_common_ancestor
uses: ./.github/actions/git-common-ancestor
- name: Save ccache cache
if: ${{ inputs.ccache_cache_hit != 'true' || inputs.ccache_cache_miss_rate == '100.0' }}
uses: actions/cache/save@v4
with:
path: ${{ inputs.ccache_dir }}
key: clio-ccache-${{ runner.os }}-${{ inputs.build_type }}${{ inputs.code_coverage == 'true' && '-code_coverage' || '' }}-${{ inputs.conan_profile }}-develop-${{ steps.git_common_ancestor.outputs.commit }}

View File

@@ -91,6 +91,19 @@ updates:
prefix: "ci: [DEPENDABOT] " prefix: "ci: [DEPENDABOT] "
target-branch: develop target-branch: develop
- package-ecosystem: github-actions
directory: .github/actions/get-threads-number/
schedule:
interval: weekly
day: monday
time: "04:00"
timezone: Etc/GMT
reviewers:
- XRPLF/clio-dev-team
commit-message:
prefix: "ci: [DEPENDABOT] "
target-branch: develop
- package-ecosystem: github-actions - package-ecosystem: github-actions
directory: .github/actions/git-common-ancestor/ directory: .github/actions/git-common-ancestor/
schedule: schedule:
@@ -105,7 +118,20 @@ updates:
target-branch: develop target-branch: develop
- package-ecosystem: github-actions - package-ecosystem: github-actions
directory: .github/actions/cache-key/ directory: .github/actions/restore-cache/
schedule:
interval: weekly
day: monday
time: "04:00"
timezone: Etc/GMT
reviewers:
- XRPLF/clio-dev-team
commit-message:
prefix: "ci: [DEPENDABOT] "
target-branch: develop
- package-ecosystem: github-actions
directory: .github/actions/save-cache/
schedule: schedule:
interval: weekly interval: weekly
day: monday day: monday

View File

@@ -4,7 +4,7 @@ build_type=Release
compiler=apple-clang compiler=apple-clang
compiler.cppstd=20 compiler.cppstd=20
compiler.libcxx=libc++ compiler.libcxx=libc++
compiler.version=17.0 compiler.version=17
os=Macos os=Macos
[conf] [conf]

View File

@@ -4,7 +4,7 @@ import json
LINUX_OS = ["heavy", "heavy-arm64"] LINUX_OS = ["heavy", "heavy-arm64"]
LINUX_CONTAINERS = [ LINUX_CONTAINERS = [
'{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
] ]
LINUX_COMPILERS = ["gcc", "clang"] LINUX_COMPILERS = ["gcc", "clang"]

View File

@@ -31,16 +31,15 @@ TESTS=$($TEST_BINARY --gtest_list_tests | awk '/^ / {print suite $1} !/^ / {su
OUTPUT_DIR="./.sanitizer-report" OUTPUT_DIR="./.sanitizer-report"
mkdir -p "$OUTPUT_DIR" mkdir -p "$OUTPUT_DIR"
export TSAN_OPTIONS="die_after_fork=0"
export MallocNanoZone='0' # for MacOSX
for TEST in $TESTS; do for TEST in $TESTS; do
OUTPUT_FILE="$OUTPUT_DIR/${TEST//\//_}.log" OUTPUT_FILE="$OUTPUT_DIR/${TEST//\//_}"
$TEST_BINARY --gtest_filter="$TEST" > "$OUTPUT_FILE" 2>&1 export TSAN_OPTIONS="log_path=\"$OUTPUT_FILE\" die_after_fork=0"
export ASAN_OPTIONS="log_path=\"$OUTPUT_FILE\""
export UBSAN_OPTIONS="log_path=\"$OUTPUT_FILE\""
export MallocNanoZone='0' # for MacOSX
$TEST_BINARY --gtest_filter="$TEST" > /dev/null 2>&1
if [ $? -ne 0 ]; then if [ $? -ne 0 ]; then
echo "'$TEST' failed a sanitizer check." echo "'$TEST' failed a sanitizer check."
else
rm "$OUTPUT_FILE"
fi fi
done done

View File

@@ -38,37 +38,32 @@ on:
description: Whether to strip clio binary description: Whether to strip clio binary
default: true default: true
defaults:
run:
shell: bash
jobs: jobs:
build_and_publish_image: build_and_publish_image:
name: Build and publish image name: Build and publish image
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Download Clio binary from artifact - name: Download Clio binary from artifact
if: ${{ inputs.artifact_name != null }} if: ${{ inputs.artifact_name != null }}
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 uses: actions/download-artifact@v5
with: with:
name: ${{ inputs.artifact_name }} name: ${{ inputs.artifact_name }}
path: ./docker/clio/artifact/ path: ./docker/clio/artifact/
- name: Download Clio binary from url - name: Download Clio binary from url
if: ${{ inputs.clio_server_binary_url != null }} if: ${{ inputs.clio_server_binary_url != null }}
env: shell: bash
BINARY_URL: ${{ inputs.clio_server_binary_url }}
BINARY_SHA256: ${{ inputs.binary_sha256 }}
run: | run: |
wget "${BINARY_URL}" -P ./docker/clio/artifact/ wget "${{inputs.clio_server_binary_url}}" -P ./docker/clio/artifact/
if [ "$(sha256sum ./docker/clio/clio_server | awk '{print $1}')" != "${BINARY_SHA256}" ]; then if [ "$(sha256sum ./docker/clio/clio_server | awk '{print $1}')" != "${{inputs.binary_sha256}}" ]; then
echo "Binary sha256 sum doesn't match" echo "Binary sha256 sum doesn't match"
exit 1 exit 1
fi fi
- name: Unpack binary - name: Unpack binary
shell: bash
run: | run: |
sudo apt update && sudo apt install -y tar unzip sudo apt update && sudo apt install -y tar unzip
cd docker/clio/artifact cd docker/clio/artifact
@@ -85,6 +80,7 @@ jobs:
- name: Strip binary - name: Strip binary
if: ${{ inputs.strip_binary }} if: ${{ inputs.strip_binary }}
shell: bash
run: strip ./docker/clio/clio_server run: strip ./docker/clio/clio_server
- name: Set GHCR_REPO - name: Set GHCR_REPO

View File

@@ -33,10 +33,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/develop' && github.run_number || 'branch' }} group: ${{ github.workflow }}-${{ github.ref }}-${{ github.ref == 'refs/heads/develop' && github.run_number || 'branch' }}
cancel-in-progress: true cancel-in-progress: true
defaults:
run:
shell: bash
jobs: jobs:
build-and-test: build-and-test:
name: Build and Test name: Build and Test
@@ -49,7 +45,7 @@ jobs:
build_type: [Release, Debug] build_type: [Release, Debug]
container: container:
[ [
'{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }', '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }',
] ]
static: [true] static: [true]
@@ -79,11 +75,11 @@ jobs:
uses: ./.github/workflows/reusable-build.yml uses: ./.github/workflows/reusable-build.yml
with: with:
runs_on: heavy runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
conan_profile: gcc conan_profile: gcc
build_type: Debug build_type: Debug
download_ccache: true download_ccache: true
upload_ccache: true upload_ccache: false
code_coverage: true code_coverage: true
static: true static: true
upload_clio_server: false upload_clio_server: false
@@ -98,7 +94,7 @@ jobs:
uses: ./.github/workflows/reusable-build.yml uses: ./.github/workflows/reusable-build.yml
with: with:
runs_on: heavy runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
conan_profile: gcc conan_profile: gcc
build_type: Release build_type: Release
download_ccache: true download_ccache: true
@@ -115,16 +111,17 @@ jobs:
needs: build-and-test needs: build-and-test
runs-on: heavy runs-on: heavy
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 - uses: actions/download-artifact@v5
with: with:
name: clio_server_Linux_Release_gcc name: clio_server_Linux_Release_gcc
- name: Compare Config Description - name: Compare Config Description
shell: bash
run: | run: |
repoConfigFile=docs/config-description.md repoConfigFile=docs/config-description.md
configDescriptionFile=config_description_new.md configDescriptionFile=config_description_new.md

View File

@@ -12,33 +12,31 @@ concurrency:
env: env:
CONAN_PROFILE: gcc CONAN_PROFILE: gcc
defaults:
run:
shell: bash
jobs: jobs:
build: build:
name: Build Clio / `libXRPL ${{ github.event.client_payload.version }}` name: Build Clio / `libXRPL ${{ github.event.client_payload.version }}`
runs-on: heavy runs-on: heavy
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: true disable_ccache: true
- name: Update libXRPL version requirement - name: Update libXRPL version requirement
shell: bash
run: | run: |
sed -i.bak -E "s|'xrpl/[a-zA-Z0-9\\.\\-]+'|'xrpl/${{ github.event.client_payload.conan_ref }}'|g" conanfile.py sed -i.bak -E "s|'xrpl/[a-zA-Z0-9\\.\\-]+'|'xrpl/${{ github.event.client_payload.conan_ref }}'|g" conanfile.py
rm -f conanfile.py.bak rm -f conanfile.py.bak
- name: Update conan lockfile - name: Update conan lockfile
shell: bash
run: | run: |
conan lock create . --profile:all ${{ env.CONAN_PROFILE }} conan lock create . --profile:all ${{ env.CONAN_PROFILE }}
@@ -59,7 +57,7 @@ jobs:
run: strip build/clio_tests run: strip build/clio_tests
- name: Upload clio_tests - name: Upload clio_tests
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: clio_tests_check_libxrpl name: clio_tests_check_libxrpl
path: build/clio_tests path: build/clio_tests
@@ -69,10 +67,10 @@ jobs:
needs: build needs: build
runs-on: heavy runs-on: heavy
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
steps: steps:
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 - uses: actions/download-artifact@v5
with: with:
name: clio_tests_check_libxrpl name: clio_tests_check_libxrpl
@@ -92,7 +90,7 @@ jobs:
issues: write issues: write
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Create an issue - name: Create an issue
uses: ./.github/actions/create-issue uses: ./.github/actions/create-issue

View File

@@ -5,26 +5,20 @@ on:
types: [opened, edited, reopened, synchronize] types: [opened, edited, reopened, synchronize]
branches: [develop] branches: [develop]
defaults:
run:
shell: bash
jobs: jobs:
check_title: check_title:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: ytanikin/pr-conventional-commits@b72758283dcbee706975950e96bc4bf323a8d8c0 # 1.4.2 - uses: ytanikin/pr-conventional-commits@b72758283dcbee706975950e96bc4bf323a8d8c0 # v1.4.2
with: with:
task_types: '["build","feat","fix","docs","test","ci","style","refactor","perf","chore"]' task_types: '["build","feat","fix","docs","test","ci","style","refactor","perf","chore"]'
add_label: false add_label: false
custom_labels: '{"build":"build", "feat":"enhancement", "fix":"bug", "docs":"documentation", "test":"testability", "ci":"ci", "style":"refactoring", "refactor":"refactoring", "perf":"performance", "chore":"tooling"}' custom_labels: '{"build":"build", "feat":"enhancement", "fix":"bug", "docs":"documentation", "test":"testability", "ci":"ci", "style":"refactoring", "refactor":"refactoring", "perf":"performance", "chore":"tooling"}'
- name: Check if message starts with upper-case letter - name: Check if message starts with upper-case letter
env:
PR_TITLE: ${{ github.event.pull_request.title }}
run: | run: |
if [[ ! "${PR_TITLE}" =~ ^[a-z]+:\ [\[A-Z] ]]; then if [[ ! "${{ github.event.pull_request.title }}" =~ ^[a-z]+:\ [\[A-Z] ]]; then
echo "Error: PR title must start with an upper-case letter." echo "Error: PR title must start with an upper-case letter."
exit 1 exit 1
fi fi

View File

@@ -22,16 +22,12 @@ env:
CONAN_PROFILE: clang CONAN_PROFILE: clang
LLVM_TOOLS_VERSION: 20 LLVM_TOOLS_VERSION: 20
defaults:
run:
shell: bash
jobs: jobs:
clang_tidy: clang_tidy:
if: github.event_name != 'push' || contains(github.event.head_commit.message, 'clang-tidy auto fixes') if: github.event_name != 'push' || contains(github.event.head_commit.message, 'clang-tidy auto fixes')
runs-on: heavy runs-on: heavy
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
permissions: permissions:
contents: write contents: write
@@ -39,15 +35,22 @@ jobs:
pull-requests: write pull-requests: write
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: true disable_ccache: true
- name: Restore cache
uses: ./.github/actions/restore-cache
id: restore_cache
with:
conan_profile: ${{ env.CONAN_PROFILE }}
ccache_dir: ${{ env.CCACHE_DIR }}
- name: Run conan - name: Run conan
uses: ./.github/actions/conan uses: ./.github/actions/conan
with: with:
@@ -58,24 +61,27 @@ jobs:
with: with:
conan_profile: ${{ env.CONAN_PROFILE }} conan_profile: ${{ env.CONAN_PROFILE }}
- name: Get number of processors - name: Get number of threads
uses: XRPLF/actions/.github/actions/get-nproc@046b1620f6bfd6cd0985dc82c3df02786801fe0a uses: ./.github/actions/get-threads-number
id: nproc id: number_of_threads
- name: Run clang-tidy - name: Run clang-tidy
continue-on-error: true continue-on-error: true
shell: bash
id: run_clang_tidy id: run_clang_tidy
run: | run: |
run-clang-tidy-${{ env.LLVM_TOOLS_VERSION }} -p build -j "${{ steps.nproc.outputs.nproc }}" -fix -quiet 1>output.txt run-clang-tidy-${{ env.LLVM_TOOLS_VERSION }} -p build -j "${{ steps.number_of_threads.outputs.threads_number }}" -fix -quiet 1>output.txt
- name: Fix local includes and clang-format style - name: Fix local includes and clang-format style
if: ${{ steps.run_clang_tidy.outcome != 'success' }} if: ${{ steps.run_clang_tidy.outcome != 'success' }}
shell: bash
run: | run: |
pre-commit run --all-files fix-local-includes || true pre-commit run --all-files fix-local-includes || true
pre-commit run --all-files clang-format || true pre-commit run --all-files clang-format || true
- name: Print issues found - name: Print issues found
if: ${{ steps.run_clang_tidy.outcome != 'success' }} if: ${{ steps.run_clang_tidy.outcome != 'success' }}
shell: bash
run: | run: |
sed -i '/error\||/!d' ./output.txt sed -i '/error\||/!d' ./output.txt
cat output.txt cat output.txt
@@ -120,4 +126,5 @@ jobs:
- name: Fail the job - name: Fail the job
if: ${{ steps.run_clang_tidy.outcome != 'success' }} if: ${{ steps.run_clang_tidy.outcome != 'success' }}
shell: bash
run: exit 1 run: exit 1

View File

@@ -10,24 +10,20 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }} group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true cancel-in-progress: true
defaults:
run:
shell: bash
jobs: jobs:
build: build:
runs-on: ubuntu-latest runs-on: ubuntu-latest
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
steps: steps:
- name: Checkout - name: Checkout
uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 uses: actions/checkout@v4
with: with:
lfs: true lfs: true
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: true disable_ccache: true
@@ -43,10 +39,10 @@ jobs:
run: cmake --build . --target docs run: cmake --build . --target docs
- name: Setup Pages - name: Setup Pages
uses: actions/configure-pages@983d7736d9b0ae728b81ab479565c72886d7745b # v5.0.0 uses: actions/configure-pages@v5
- name: Upload artifact - name: Upload artifact
uses: actions/upload-pages-artifact@7b1f4a764d45c48632c6b24a0339c27f5614fb0b # v4.0.0 uses: actions/upload-pages-artifact@v4
with: with:
path: build_docs/html path: build_docs/html
name: docs-develop name: docs-develop
@@ -66,6 +62,6 @@ jobs:
steps: steps:
- name: Deploy to GitHub Pages - name: Deploy to GitHub Pages
id: deployment id: deployment
uses: actions/deploy-pages@d6db90164ac5ed86f2b6aed7e0febac5b3c0c03e # v4.0.5 uses: actions/deploy-pages@v4
with: with:
artifact_name: docs-develop artifact_name: docs-develop

View File

@@ -23,10 +23,6 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }} group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true cancel-in-progress: true
defaults:
run:
shell: bash
jobs: jobs:
build-and-test: build-and-test:
name: Build and Test name: Build and Test
@@ -43,17 +39,17 @@ jobs:
conan_profile: gcc conan_profile: gcc
build_type: Release build_type: Release
static: true static: true
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
- os: heavy - os: heavy
conan_profile: gcc conan_profile: gcc
build_type: Debug build_type: Debug
static: true static: true
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
- os: heavy - os: heavy
conan_profile: gcc.ubsan conan_profile: gcc.ubsan
build_type: Release build_type: Release
static: false static: false
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
uses: ./.github/workflows/reusable-build-test.yml uses: ./.github/workflows/reusable-build-test.yml
with: with:
@@ -77,7 +73,7 @@ jobs:
include: include:
- os: heavy - os: heavy
conan_profile: clang conan_profile: clang
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
static: true static: true
- os: macos15 - os: macos15
conan_profile: apple-clang conan_profile: apple-clang
@@ -97,29 +93,18 @@ jobs:
targets: all targets: all
analyze_build_time: true analyze_build_time: true
get_date:
name: Get Date
runs-on: ubuntu-latest
outputs:
date: ${{ steps.get_date.outputs.date }}
steps:
- name: Get current date
id: get_date
run: |
echo "date=$(date +'%Y%m%d')" >> $GITHUB_OUTPUT
nightly_release: nightly_release:
needs: [build-and-test, get_date] needs: build-and-test
uses: ./.github/workflows/reusable-release.yml uses: ./.github/workflows/reusable-release.yml
with: with:
delete_pattern: "nightly-*" overwrite_release: true
prerelease: true prerelease: true
title: "Clio development build (nightly-${{ needs.get_date.outputs.date }})" title: "Clio development (nightly) build"
version: nightly-${{ needs.get_date.outputs.date }} version: nightly
header: > header: >
> **Note:** Please remember that this is a development release and it is not recommended for production use. > **Note:** Please remember that this is a development release and it is not recommended for production use.
Changelog (including previous releases): <https://github.com/XRPLF/clio/commits/nightly-${{ needs.get_date.outputs.date }}> Changelog (including previous releases): <https://github.com/XRPLF/clio/commits/nightly>
generate_changelog: false generate_changelog: false
draft: false draft: false
@@ -145,7 +130,7 @@ jobs:
issues: write issues: write
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Create an issue - name: Create an issue
uses: ./.github/actions/create-issue uses: ./.github/actions/create-issue

View File

@@ -1,8 +1,8 @@
name: Pre-commit auto-update name: Pre-commit auto-update
on: on:
schedule:
# every first day of the month # every first day of the month
schedule:
- cron: "0 0 1 * *" - cron: "0 0 1 * *"
pull_request: pull_request:
branches: [release/*, develop] branches: [release/*, develop]

View File

@@ -8,7 +8,7 @@ on:
jobs: jobs:
run-hooks: run-hooks:
uses: XRPLF/actions/.github/workflows/pre-commit.yml@34790936fae4c6c751f62ec8c06696f9c1a5753a uses: XRPLF/actions/.github/workflows/pre-commit.yml@a8d7472b450eb53a1e5228f64552e5974457a21a
with: with:
runs_on: heavy runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-pre-commit:c117f470f2ef954520ab5d1c8a5ed2b9e68d6f8a" }' container: '{ "image": "ghcr.io/xrplf/clio-pre-commit:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'

View File

@@ -29,7 +29,7 @@ jobs:
conan_profile: gcc conan_profile: gcc
build_type: Release build_type: Release
static: true static: true
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
uses: ./.github/workflows/reusable-build-test.yml uses: ./.github/workflows/reusable-build-test.yml
with: with:
@@ -49,7 +49,7 @@ jobs:
needs: build-and-test needs: build-and-test
uses: ./.github/workflows/reusable-release.yml uses: ./.github/workflows/reusable-release.yml
with: with:
delete_pattern: "" overwrite_release: false
prerelease: ${{ contains(github.ref_name, '-') }} prerelease: ${{ contains(github.ref_name, '-') }}
title: "${{ github.ref_name }}" title: "${{ github.ref_name }}"
version: "${{ github.ref_name }}" version: "${{ github.ref_name }}"

View File

@@ -75,10 +75,6 @@ on:
CODECOV_TOKEN: CODECOV_TOKEN:
required: false required: false
defaults:
run:
shell: bash
jobs: jobs:
build: build:
name: Build name: Build
@@ -90,7 +86,7 @@ jobs:
if: ${{ runner.os == 'macOS' }} if: ${{ runner.os == 'macOS' }}
uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
# We need to fetch tags to have correct version in the release # We need to fetch tags to have correct version in the release
@@ -99,31 +95,25 @@ jobs:
ref: ${{ github.ref }} ref: ${{ github.ref }}
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: ${{ !inputs.download_ccache }} disable_ccache: ${{ !inputs.download_ccache }}
- name: Setup conan on macOS - name: Setup conan on macOS
if: ${{ runner.os == 'macOS' }} if: ${{ runner.os == 'macOS' }}
shell: bash
run: ./.github/scripts/conan/init.sh run: ./.github/scripts/conan/init.sh
- name: Generate cache key - name: Restore cache
uses: ./.github/actions/cache-key if: ${{ inputs.download_ccache }}
id: cache_key uses: ./.github/actions/restore-cache
id: restore_cache
with: with:
conan_profile: ${{ inputs.conan_profile }} conan_profile: ${{ inputs.conan_profile }}
ccache_dir: ${{ env.CCACHE_DIR }}
build_type: ${{ inputs.build_type }} build_type: ${{ inputs.build_type }}
code_coverage: ${{ inputs.code_coverage }} code_coverage: ${{ inputs.code_coverage }}
- name: Restore ccache cache
if: ${{ inputs.download_ccache && github.ref != 'refs/heads/develop' }}
uses: actions/cache/restore@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ${{ env.CCACHE_DIR }}
key: ${{ steps.cache_key.outputs.key }}
restore-keys: |
${{ steps.cache_key.outputs.restore_keys }}
- name: Run conan - name: Run conan
uses: ./.github/actions/conan uses: ./.github/actions/conan
with: with:
@@ -151,26 +141,24 @@ jobs:
ClangBuildAnalyzer --all build/ build_time_report.bin ClangBuildAnalyzer --all build/ build_time_report.bin
ClangBuildAnalyzer --analyze build_time_report.bin > build_time_report.txt ClangBuildAnalyzer --analyze build_time_report.bin > build_time_report.txt
cat build_time_report.txt cat build_time_report.txt
shell: bash
- name: Upload build time analyze report - name: Upload build time analyze report
if: ${{ inputs.analyze_build_time }} if: ${{ inputs.analyze_build_time }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: build_time_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: build_time_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build_time_report.txt path: build_time_report.txt
- name: Show ccache's statistics and zero it - name: Show ccache's statistics
if: ${{ inputs.download_ccache }} if: ${{ inputs.download_ccache }}
shell: bash
id: ccache_stats
run: | run: |
ccache --show-stats ccache -s > /tmp/ccache.stats
ccache --zero-stats miss_rate=$(cat /tmp/ccache.stats | grep 'Misses' | head -n1 | sed 's/.*(\(.*\)%).*/\1/')
echo "miss_rate=${miss_rate}" >> $GITHUB_OUTPUT
- name: Save ccache cache cat /tmp/ccache.stats
if: ${{ inputs.upload_ccache && github.ref == 'refs/heads/develop' }}
uses: actions/cache/save@0057852bfaa89a56745cba8c7296529d2fc39830 # v4.3.0
with:
path: ${{ env.CCACHE_DIR }}
key: ${{ steps.cache_key.outputs.key }}
- name: Strip unit_tests - name: Strip unit_tests
if: ${{ !endsWith(inputs.conan_profile, 'san') && !inputs.code_coverage && !inputs.analyze_build_time }} if: ${{ !endsWith(inputs.conan_profile, 'san') && !inputs.code_coverage && !inputs.analyze_build_time }}
@@ -182,32 +170,44 @@ jobs:
- name: Upload clio_server - name: Upload clio_server
if: ${{ inputs.upload_clio_server && !inputs.code_coverage && !inputs.analyze_build_time }} if: ${{ inputs.upload_clio_server && !inputs.code_coverage && !inputs.analyze_build_time }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: clio_server_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_server_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_server path: build/clio_server
- name: Upload clio_tests - name: Upload clio_tests
if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }} if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_tests path: build/clio_tests
- name: Upload clio_integration_tests - name: Upload clio_integration_tests
if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }} if: ${{ !inputs.code_coverage && !inputs.analyze_build_time && !inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/clio_integration_tests path: build/clio_integration_tests
- name: Upload Clio Linux package - name: Upload Clio Linux package
if: ${{ inputs.package }} if: ${{ inputs.package }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: clio_deb_package_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_deb_package_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: build/*.deb path: build/*.deb
- name: Save cache
if: ${{ inputs.upload_ccache && github.ref == 'refs/heads/develop' }}
uses: ./.github/actions/save-cache
with:
conan_profile: ${{ inputs.conan_profile }}
ccache_dir: ${{ env.CCACHE_DIR }}
build_type: ${{ inputs.build_type }}
code_coverage: ${{ inputs.code_coverage }}
ccache_cache_hit: ${{ steps.restore_cache.outputs.ccache_cache_hit }}
ccache_cache_miss_rate: ${{ steps.ccache_stats.outputs.miss_rate }}
# This is run as part of the build job, because it requires the following: # This is run as part of the build job, because it requires the following:
# - source code # - source code
# - conan packages # - conan packages
@@ -220,14 +220,13 @@ jobs:
- name: Verify expected version - name: Verify expected version
if: ${{ inputs.expected_version != '' }} if: ${{ inputs.expected_version != '' }}
env: shell: bash
INPUT_EXPECTED_VERSION: ${{ inputs.expected_version }}
run: | run: |
set -e set -e
EXPECTED_VERSION="clio-${INPUT_EXPECTED_VERSION}" EXPECTED_VERSION="clio-${{ inputs.expected_version }}"
actual_version=$(./build/clio_server --version) actual_version=$(./build/clio_server --version)
if [[ "$actual_version" != "$EXPECTED_VERSION" ]]; then if [[ "$actual_version" != "$EXPECTED_VERSION" ]]; then
echo "Expected version '${EXPECTED_VERSION}', but got '${actual_version}'" echo "Expected version '$EXPECTED_VERSION', but got '$actual_version'"
exit 1 exit 1
fi fi

View File

@@ -3,10 +3,10 @@ name: Make release
on: on:
workflow_call: workflow_call:
inputs: inputs:
delete_pattern: overwrite_release:
description: "Pattern to delete previous releases" description: "Overwrite the current release and tag"
required: true required: true
type: string type: boolean
prerelease: prerelease:
description: "Create a prerelease" description: "Create a prerelease"
@@ -38,15 +38,11 @@ on:
required: true required: true
type: boolean type: boolean
defaults:
run:
shell: bash
jobs: jobs:
release: release:
runs-on: heavy runs-on: heavy
container: container:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
env: env:
GH_REPO: ${{ github.repository }} GH_REPO: ${{ github.repository }}
GH_TOKEN: ${{ github.token }} GH_TOKEN: ${{ github.token }}
@@ -55,29 +51,29 @@ jobs:
contents: write contents: write
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: true disable_ccache: true
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 - uses: actions/download-artifact@v5
with: with:
path: release_artifacts path: release_artifacts
pattern: clio_server_* pattern: clio_server_*
- name: Create release notes - name: Create release notes
env: shell: bash
RELEASE_HEADER: ${{ inputs.header }}
run: | run: |
echo "# Release notes" > "${RUNNER_TEMP}/release_notes.md" echo "# Release notes" > "${RUNNER_TEMP}/release_notes.md"
echo "" >> "${RUNNER_TEMP}/release_notes.md" echo "" >> "${RUNNER_TEMP}/release_notes.md"
printf '%s\n' "${RELEASE_HEADER}" >> "${RUNNER_TEMP}/release_notes.md" printf '%s\n' "${{ inputs.header }}" >> "${RUNNER_TEMP}/release_notes.md"
- name: Generate changelog - name: Generate changelog
shell: bash
if: ${{ inputs.generate_changelog }} if: ${{ inputs.generate_changelog }}
run: | run: |
LAST_TAG="$(gh release view --json tagName -q .tagName --repo XRPLF/clio)" LAST_TAG="$(gh release view --json tagName -q .tagName --repo XRPLF/clio)"
@@ -87,39 +83,30 @@ jobs:
cat CHANGELOG.md >> "${RUNNER_TEMP}/release_notes.md" cat CHANGELOG.md >> "${RUNNER_TEMP}/release_notes.md"
- name: Prepare release artifacts - name: Prepare release artifacts
shell: bash
run: .github/scripts/prepare-release-artifacts.sh release_artifacts run: .github/scripts/prepare-release-artifacts.sh release_artifacts
- name: Upload release notes - name: Upload release notes
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: release_notes_${{ inputs.version }} name: release_notes_${{ inputs.version }}
path: "${RUNNER_TEMP}/release_notes.md" path: "${RUNNER_TEMP}/release_notes.md"
- name: Remove previous release with a pattern - name: Remove current release and tag
if: ${{ github.event_name != 'pull_request' && inputs.delete_pattern != '' }} if: ${{ github.event_name != 'pull_request' && inputs.overwrite_release }}
env: shell: bash
DELETE_PATTERN: ${{ inputs.delete_pattern }}
run: | run: |
RELEASES_TO_DELETE=$(gh release list --limit 50 --repo "${GH_REPO}" | grep -E "${DELETE_PATTERN}" | awk -F'\t' '{print $3}' || true) gh release delete ${{ inputs.version }} --yes || true
if [ -n "$RELEASES_TO_DELETE" ]; then git push origin :${{ inputs.version }} || true
for RELEASE in $RELEASES_TO_DELETE; do
echo "Deleting release: $RELEASE"
gh release delete "$RELEASE" --repo "${GH_REPO}" --yes --cleanup-tag
done
fi
- name: Publish release - name: Publish release
if: ${{ github.event_name != 'pull_request' }} if: ${{ github.event_name != 'pull_request' }}
env: shell: bash
RELEASE_VERSION: ${{ inputs.version }}
PRERELEASE_OPTION: ${{ inputs.prerelease && '--prerelease' || '' }}
RELEASE_TITLE: ${{ inputs.title }}
DRAFT_OPTION: ${{ inputs.draft && '--draft' || '' }}
run: | run: |
gh release create "${RELEASE_VERSION}" \ gh release create "${{ inputs.version }}" \
${PRERELEASE_OPTION} \ ${{ inputs.prerelease && '--prerelease' || '' }} \
--title "${RELEASE_TITLE}" \ --title "${{ inputs.title }}" \
--target "${GITHUB_SHA}" \ --target "${GITHUB_SHA}" \
${DRAFT_OPTION} \ ${{ inputs.draft && '--draft' || '' }} \
--notes-file "${RUNNER_TEMP}/release_notes.md" \ --notes-file "${RUNNER_TEMP}/release_notes.md" \
./release_artifacts/clio_server* ./release_artifacts/clio_server*

View File

@@ -33,10 +33,6 @@ on:
required: true required: true
type: boolean type: boolean
defaults:
run:
shell: bash
jobs: jobs:
unit_tests: unit_tests:
name: Unit testing name: Unit testing
@@ -47,22 +43,23 @@ jobs:
env: env:
# TODO: remove completely when we have fixed all currently existing issues with sanitizers # TODO: remove completely when we have fixed all currently existing issues with sanitizers
SANITIZER_IGNORE_ERRORS: ${{ endsWith(inputs.conan_profile, '.tsan') }} SANITIZER_IGNORE_ERRORS: ${{ endsWith(inputs.conan_profile, '.tsan') || inputs.conan_profile == 'clang.asan' || (inputs.conan_profile == 'gcc.asan' && inputs.build_type == 'Release') }}
steps: steps:
- name: Cleanup workspace - name: Cleanup workspace
if: ${{ runner.os == 'macOS' }} if: ${{ runner.os == 'macOS' }}
uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f uses: XRPLF/actions/.github/actions/cleanup-workspace@ea9970b7c211b18f4c8bcdb28c29f5711752029f
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 - uses: actions/download-artifact@v5
with: with:
name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
- name: Make clio_tests executable - name: Make clio_tests executable
shell: bash
run: chmod +x ./clio_tests run: chmod +x ./clio_tests
- name: Run clio_tests (regular) - name: Run clio_tests (regular)
@@ -71,10 +68,11 @@ jobs:
- name: Run clio_tests (sanitizer errors ignored) - name: Run clio_tests (sanitizer errors ignored)
if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' }} if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' }}
run: ./.github/scripts/execute-tests-under-sanitizer.sh ./clio_tests run: ./.github/scripts/execute-tests-under-sanitizer ./clio_tests
- name: Check for sanitizer report - name: Check for sanitizer report
if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' }} if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' }}
shell: bash
id: check_report id: check_report
run: | run: |
if ls .sanitizer-report/* 1> /dev/null 2>&1; then if ls .sanitizer-report/* 1> /dev/null 2>&1; then
@@ -85,7 +83,7 @@ jobs:
- name: Upload sanitizer report - name: Upload sanitizer report
if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' && steps.check_report.outputs.found_report == 'true' }} if: ${{ env.SANITIZER_IGNORE_ERRORS == 'true' && steps.check_report.outputs.found_report == 'true' }}
uses: actions/upload-artifact@330a01c490aca151604b8cf639adc76d48f6c5d4 # v5.0.0 uses: actions/upload-artifact@v4
with: with:
name: sanitizer_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: sanitizer_report_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}
path: .sanitizer-report/* path: .sanitizer-report/*
@@ -146,7 +144,7 @@ jobs:
sleep 5 sleep 5
done done
- uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 - uses: actions/download-artifact@v5
with: with:
name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }} name: clio_integration_tests_${{ runner.os }}_${{ inputs.build_type }}_${{ inputs.conan_profile }}

View File

@@ -6,22 +6,18 @@ on:
CODECOV_TOKEN: CODECOV_TOKEN:
required: true required: true
defaults:
run:
shell: bash
jobs: jobs:
upload_report: upload_report:
name: Upload report name: Upload report
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
with: with:
fetch-depth: 0 fetch-depth: 0
- name: Download report artifact - name: Download report artifact
uses: actions/download-artifact@018cc2cf5baa6db3ef3c5f8a56943fffe632ef53 # v6.0.0 uses: actions/download-artifact@v5
with: with:
name: coverage-report.xml name: coverage-report.xml
path: build path: build

View File

@@ -15,7 +15,7 @@ on:
- ".github/actions/**" - ".github/actions/**"
- "!.github/actions/build-docker-image/**" - "!.github/actions/build-docker-image/**"
- "!.github/actions/create-issue/**" - "!.github/actions/create-issue/**"
- .github/scripts/execute-tests-under-sanitizer.sh - .github/scripts/execute-tests-under-sanitizer
- CMakeLists.txt - CMakeLists.txt
- conanfile.py - conanfile.py
@@ -44,13 +44,14 @@ jobs:
uses: ./.github/workflows/reusable-build-test.yml uses: ./.github/workflows/reusable-build-test.yml
with: with:
runs_on: heavy runs_on: heavy
container: '{ "image": "ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7" }' container: '{ "image": "ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d" }'
download_ccache: false download_ccache: false
upload_ccache: false upload_ccache: false
conan_profile: ${{ matrix.compiler }}${{ matrix.sanitizer_ext }} conan_profile: ${{ matrix.compiler }}${{ matrix.sanitizer_ext }}
build_type: ${{ matrix.build_type }} build_type: ${{ matrix.build_type }}
static: false static: false
run_unit_tests: true # Currently, both gcc.tsan and clang.tsan unit tests hang
run_unit_tests: ${{ matrix.sanitizer_ext != '.tsan' }}
run_integration_tests: false run_integration_tests: false
upload_clio_server: false upload_clio_server: false
targets: clio_tests clio_integration_tests targets: clio_tests clio_integration_tests

View File

@@ -33,10 +33,6 @@ env:
GCC_MAJOR_VERSION: 15 GCC_MAJOR_VERSION: 15
GCC_VERSION: 15.2.0 GCC_VERSION: 15.2.0
defaults:
run:
shell: bash
jobs: jobs:
repo: repo:
name: Calculate repo name name: Calculate repo name
@@ -56,7 +52,7 @@ jobs:
needs: repo needs: repo
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -94,7 +90,7 @@ jobs:
needs: repo needs: repo
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -132,7 +128,7 @@ jobs:
needs: [repo, gcc-amd64, gcc-arm64] needs: [repo, gcc-amd64, gcc-arm64]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -141,7 +137,7 @@ jobs:
files: "docker/compilers/gcc/**" files: "docker/compilers/gcc/**"
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 uses: docker/setup-buildx-action@v3
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
if: ${{ github.event_name != 'pull_request' }} if: ${{ github.event_name != 'pull_request' }}
@@ -183,7 +179,7 @@ jobs:
needs: repo needs: repo
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -219,7 +215,7 @@ jobs:
needs: [repo, gcc-merge] needs: [repo, gcc-merge]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -250,7 +246,7 @@ jobs:
needs: [repo, gcc-merge] needs: [repo, gcc-merge]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -281,7 +277,7 @@ jobs:
needs: [repo, tools-amd64, tools-arm64] needs: [repo, tools-amd64, tools-arm64]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Get changed files - name: Get changed files
id: changed-files id: changed-files
@@ -290,7 +286,7 @@ jobs:
files: "docker/tools/**" files: "docker/tools/**"
- name: Set up Docker Buildx - name: Set up Docker Buildx
uses: docker/setup-buildx-action@e468171a9de216ec08956ac3ada2f0791b6bd435 # v3.11.1 uses: docker/setup-buildx-action@v3
- name: Login to GitHub Container Registry - name: Login to GitHub Container Registry
if: ${{ github.event_name != 'pull_request' }} if: ${{ github.event_name != 'pull_request' }}
@@ -316,7 +312,7 @@ jobs:
needs: [repo, tools-merge] needs: [repo, tools-merge]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- uses: ./.github/actions/build-docker-image - uses: ./.github/actions/build-docker-image
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@@ -338,7 +334,7 @@ jobs:
needs: [repo, gcc-merge, clang, tools-merge] needs: [repo, gcc-merge, clang, tools-merge]
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- uses: ./.github/actions/build-docker-image - uses: ./.github/actions/build-docker-image
env: env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -40,17 +40,13 @@ concurrency:
group: ${{ github.workflow }}-${{ github.ref }} group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true cancel-in-progress: true
defaults:
run:
shell: bash
jobs: jobs:
generate-matrix: generate-matrix:
runs-on: ubuntu-latest runs-on: ubuntu-latest
outputs: outputs:
matrix: ${{ steps.set-matrix.outputs.matrix }} matrix: ${{ steps.set-matrix.outputs.matrix }}
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Calculate conan matrix - name: Calculate conan matrix
id: set-matrix id: set-matrix
@@ -73,15 +69,16 @@ jobs:
CONAN_PROFILE: ${{ matrix.compiler }}${{ matrix.sanitizer_ext }} CONAN_PROFILE: ${{ matrix.compiler }}${{ matrix.sanitizer_ext }}
steps: steps:
- uses: actions/checkout@08c6903cd8c0fde910a37f88322edcfb5dd907a8 # v5.0.0 - uses: actions/checkout@v4
- name: Prepare runner - name: Prepare runner
uses: XRPLF/actions/.github/actions/prepare-runner@8abb0722cbff83a9a2dc7d06c473f7a4964b7382 uses: XRPLF/actions/.github/actions/prepare-runner@7951b682e5a2973b28b0719a72f01fc4b0d0c34f
with: with:
disable_ccache: true disable_ccache: true
- name: Setup conan on macOS - name: Setup conan on macOS
if: ${{ runner.os == 'macOS' }} if: ${{ runner.os == 'macOS' }}
shell: bash
run: ./.github/scripts/conan/init.sh run: ./.github/scripts/conan/init.sh
- name: Show conan profile - name: Show conan profile
@@ -102,6 +99,4 @@ jobs:
- name: Upload Conan packages - name: Upload Conan packages
if: ${{ github.repository_owner == 'XRPLF' && github.event_name != 'pull_request' && github.event_name != 'schedule' }} if: ${{ github.repository_owner == 'XRPLF' && github.event_name != 'pull_request' && github.event_name != 'schedule' }}
env: run: conan upload "*" -r=xrplf --confirm ${{ github.event.inputs.force_upload == 'true' && '--force' || '' }}
FORCE_OPTION: ${{ github.event.inputs.force_upload == 'true' && '--force' || '' }}
run: conan upload "*" -r=xrplf --confirm ${FORCE_OPTION}

View File

@@ -11,10 +11,7 @@
# #
# See https://pre-commit.com for more information # See https://pre-commit.com for more information
# See https://pre-commit.com/hooks.html for more hooks # See https://pre-commit.com/hooks.html for more hooks
exclude: | exclude: ^(docs/doxygen-awesome-theme/|conan\.lock$)
(?x)^(
docs/doxygen-awesome-theme/.*
)$
repos: repos:
# `pre-commit sample-config` default hooks # `pre-commit sample-config` default hooks

View File

@@ -75,6 +75,10 @@ if (san)
endif () endif ()
target_compile_options(clio_options INTERFACE ${SAN_OPTIMIZATION_FLAG} ${SAN_FLAG} -fno-omit-frame-pointer) target_compile_options(clio_options INTERFACE ${SAN_OPTIMIZATION_FLAG} ${SAN_FLAG} -fno-omit-frame-pointer)
target_compile_definitions(
clio_options INTERFACE $<$<STREQUAL:${san},address>:SANITIZER=ASAN> $<$<STREQUAL:${san},thread>:SANITIZER=TSAN>
$<$<STREQUAL:${san},memory>:SANITIZER=MSAN> $<$<STREQUAL:${san},undefined>:SANITIZER=UBSAN>
)
target_link_libraries(clio_options INTERFACE ${SAN_FLAG} ${SAN_LIB}) target_link_libraries(clio_options INTERFACE ${SAN_FLAG} ${SAN_LIB})
endif () endif ()

View File

@@ -34,6 +34,7 @@ Below are some useful docs to learn more about Clio.
- [How to configure Clio and rippled](./docs/configure-clio.md) - [How to configure Clio and rippled](./docs/configure-clio.md)
- [How to run Clio](./docs/run-clio.md) - [How to run Clio](./docs/run-clio.md)
- [Logging](./docs/logging.md)
- [Troubleshooting guide](./docs/trouble_shooting.md) - [Troubleshooting guide](./docs/trouble_shooting.md)
**General reference material:** **General reference material:**

View File

@@ -3,7 +3,7 @@
"requires": [ "requires": [
"zlib/1.3.1#b8bc2603263cf7eccbd6e17e66b0ed76%1756234269.497", "zlib/1.3.1#b8bc2603263cf7eccbd6e17e66b0ed76%1756234269.497",
"xxhash/0.8.3#681d36a0a6111fc56e5e45ea182c19cc%1756234289.683", "xxhash/0.8.3#681d36a0a6111fc56e5e45ea182c19cc%1756234289.683",
"xrpl/3.0.0-rc1#f5c8ecd42bdf511ad36f57bc702dacd2%1762975621.294", "xrpl/2.6.1#973af2bf9631f239941dd9f5a100bb84%1759275059.342",
"sqlite3/3.49.1#8631739a4c9b93bd3d6b753bac548a63%1756234266.869", "sqlite3/3.49.1#8631739a4c9b93bd3d6b753bac548a63%1756234266.869",
"spdlog/1.15.3#3ca0e9e6b83af4d0151e26541d140c86%1754401846.61", "spdlog/1.15.3#3ca0e9e6b83af4d0151e26541d140c86%1754401846.61",
"soci/4.0.3#a9f8d773cd33e356b5879a4b0564f287%1756234262.318", "soci/4.0.3#a9f8d773cd33e356b5879a4b0564f287%1756234262.318",
@@ -11,7 +11,7 @@
"rapidjson/cci.20220822#1b9d8c2256876a154172dc5cfbe447c6%1754325007.656", "rapidjson/cci.20220822#1b9d8c2256876a154172dc5cfbe447c6%1754325007.656",
"protobuf/3.21.12#d927114e28de9f4691a6bbcdd9a529d1%1756234251.614", "protobuf/3.21.12#d927114e28de9f4691a6bbcdd9a529d1%1756234251.614",
"openssl/1.1.1w#a8f0792d7c5121b954578a7149d23e03%1756223730.729", "openssl/1.1.1w#a8f0792d7c5121b954578a7149d23e03%1756223730.729",
"nudb/2.0.9#fb8dfd1a5557f5e0528114c2da17721e%1763150366.909", "nudb/2.0.9#c62cfd501e57055a7e0d8ee3d5e5427d%1756234237.107",
"minizip/1.2.13#9e87d57804bd372d6d1e32b1871517a3%1754325004.374", "minizip/1.2.13#9e87d57804bd372d6d1e32b1871517a3%1754325004.374",
"lz4/1.10.0#59fc63cac7f10fbe8e05c7e62c2f3504%1756234228.999", "lz4/1.10.0#59fc63cac7f10fbe8e05c7e62c2f3504%1756234228.999",
"libuv/1.46.0#dc28c1f653fa197f00db5b577a6f6011%1754325003.592", "libuv/1.46.0#dc28c1f653fa197f00db5b577a6f6011%1754325003.592",
@@ -45,7 +45,7 @@
], ],
"protobuf/3.21.12": [ "protobuf/3.21.12": [
null, null,
"protobuf/3.21.12#d927114e28de9f4691a6bbcdd9a529d1" "protobuf/3.21.12"
], ],
"lz4/1.9.4": [ "lz4/1.9.4": [
"lz4/1.10.0" "lz4/1.10.0"

View File

@@ -18,7 +18,7 @@ class ClioConan(ConanFile):
'protobuf/3.21.12', 'protobuf/3.21.12',
'grpc/1.50.1', 'grpc/1.50.1',
'openssl/1.1.1w', 'openssl/1.1.1w',
'xrpl/3.0.0-rc1', 'xrpl/2.6.1',
'zlib/1.3.1', 'zlib/1.3.1',
'libbacktrace/cci.20210118', 'libbacktrace/cci.20210118',
'spdlog/1.15.3', 'spdlog/1.15.3',

View File

@@ -55,11 +55,8 @@ RUN pip install -q --no-cache-dir \
# lxml 6.0.0 is not compatible with our image # lxml 6.0.0 is not compatible with our image
'lxml<6.0.0' \ 'lxml<6.0.0' \
cmake \ cmake \
conan==2.22.1 \ conan==2.20.1 \
gcovr \ gcovr
# We're adding pre-commit to this image as well,
# because clang-tidy workflow requires it
pre-commit
# Install LLVM tools # Install LLVM tools
ARG LLVM_TOOLS_VERSION=20 ARG LLVM_TOOLS_VERSION=20

View File

@@ -5,17 +5,17 @@ It is used in [Clio Github Actions](https://github.com/XRPLF/clio/actions) but c
The image is based on Ubuntu 20.04 and contains: The image is based on Ubuntu 20.04 and contains:
- ccache 4.12.1 - ccache 4.11.3
- Clang 19 - Clang 19
- ClangBuildAnalyzer 1.6.0 - ClangBuildAnalyzer 1.6.0
- Conan 2.22.1 - Conan 2.20.1
- Doxygen 1.15.0 - Doxygen 1.14
- GCC 15.2.0 - GCC 15.2.0
- GDB 16.3 - GDB 16.3
- gh 2.82.1 - gh 2.74
- git-cliff 2.10.1 - git-cliff 2.9.1
- mold 2.40.4 - mold 2.40.1
- Python 3.8 - Python 3.13
- and some other useful tools - and some other useful tools
Conan is set up to build Clio without any additional steps. Conan is set up to build Clio without any additional steps.

View File

@@ -3,13 +3,6 @@
{% set sanitizer_opt_map = {"asan": "address", "tsan": "thread", "ubsan": "undefined"} %} {% set sanitizer_opt_map = {"asan": "address", "tsan": "thread", "ubsan": "undefined"} %}
{% set sanitizer = sanitizer_opt_map[sani] %} {% set sanitizer = sanitizer_opt_map[sani] %}
{% set sanitizer_b2_flags_map = {
"address": "context-impl=ucontext address-sanitizer=norecover",
"thread": "context-impl=ucontext thread-sanitizer=norecover",
"undefined": "undefined-sanitizer=norecover"
} %}
{% set sanitizer_b2_flags_str = sanitizer_b2_flags_map[sanitizer] %}
{% set sanitizer_build_flags_str = "-fsanitize=" ~ sanitizer ~ " -g -O1 -fno-omit-frame-pointer" %} {% set sanitizer_build_flags_str = "-fsanitize=" ~ sanitizer ~ " -g -O1 -fno-omit-frame-pointer" %}
{% set sanitizer_build_flags = sanitizer_build_flags_str.split(' ') %} {% set sanitizer_build_flags = sanitizer_build_flags_str.split(' ') %}
{% set sanitizer_link_flags_str = "-fsanitize=" ~ sanitizer %} {% set sanitizer_link_flags_str = "-fsanitize=" ~ sanitizer %}
@@ -18,8 +11,7 @@
include({{ compiler }}) include({{ compiler }})
[options] [options]
boost/*:extra_b2_flags="{{ sanitizer_b2_flags_str }}" boost/*:extra_b2_flags="cxxflags=\"{{ sanitizer_build_flags_str }}\" linkflags=\"{{ sanitizer_link_flags_str }}\""
boost/*:without_context=False
boost/*:without_stacktrace=True boost/*:without_stacktrace=True
[conf] [conf]
@@ -28,10 +20,4 @@ tools.build:cxxflags+={{ sanitizer_build_flags }}
tools.build:exelinkflags+={{ sanitizer_link_flags }} tools.build:exelinkflags+={{ sanitizer_link_flags }}
tools.build:sharedlinkflags+={{ sanitizer_link_flags }} tools.build:sharedlinkflags+={{ sanitizer_link_flags }}
{% if sanitizer == "address" %} tools.info.package_id:confs+=["tools.build:cflags", "tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags"]
tools.build:defines+=["BOOST_USE_ASAN", "BOOST_USE_UCONTEXT"]
{% elif sanitizer == "thread" %}
tools.build:defines+=["BOOST_USE_TSAN", "BOOST_USE_UCONTEXT"]
{% endif %}
tools.info.package_id:confs+=["tools.build:cflags", "tools.build:cxxflags", "tools.build:exelinkflags", "tools.build:sharedlinkflags", "tools.build:defines"]

View File

@@ -8,7 +8,7 @@ ARG UBUNTU_VERSION
ARG GCC_MAJOR_VERSION ARG GCC_MAJOR_VERSION
ARG BUILD_VERSION=0 ARG BUILD_VERSION=1
ARG DEBIAN_FRONTEND=noninteractive ARG DEBIAN_FRONTEND=noninteractive
ARG TARGETARCH ARG TARGETARCH
@@ -34,7 +34,6 @@ RUN wget --progress=dot:giga https://gcc.gnu.org/pub/gcc/releases/gcc-$GCC_VERSI
WORKDIR /gcc-$GCC_VERSION WORKDIR /gcc-$GCC_VERSION
RUN ./contrib/download_prerequisites RUN ./contrib/download_prerequisites
# hadolint ignore=DL3059
RUN mkdir /gcc-build RUN mkdir /gcc-build
WORKDIR /gcc-build WORKDIR /gcc-build
RUN /gcc-$GCC_VERSION/configure \ RUN /gcc-$GCC_VERSION/configure \

View File

@@ -1,6 +1,6 @@
services: services:
clio_develop: clio_develop:
image: ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 image: ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
volumes: volumes:
- clio_develop_conan_data:/root/.conan2/p - clio_develop_conan_data:/root/.conan2/p
- clio_develop_ccache:/root/.ccache - clio_develop_ccache:/root/.ccache

View File

@@ -8,7 +8,7 @@ ARG TARGETARCH
SHELL ["/bin/bash", "-o", "pipefail", "-c"] SHELL ["/bin/bash", "-o", "pipefail", "-c"]
ARG BUILD_VERSION=0 ARG BUILD_VERSION=2
RUN apt-get update \ RUN apt-get update \
&& apt-get install -y --no-install-recommends --no-install-suggests \ && apt-get install -y --no-install-recommends --no-install-suggests \
@@ -24,7 +24,7 @@ RUN apt-get update \
WORKDIR /tmp WORKDIR /tmp
ARG MOLD_VERSION=2.40.4 ARG MOLD_VERSION=2.40.1
RUN wget --progress=dot:giga "https://github.com/rui314/mold/archive/refs/tags/v${MOLD_VERSION}.tar.gz" \ RUN wget --progress=dot:giga "https://github.com/rui314/mold/archive/refs/tags/v${MOLD_VERSION}.tar.gz" \
&& tar xf "v${MOLD_VERSION}.tar.gz" \ && tar xf "v${MOLD_VERSION}.tar.gz" \
&& cd "mold-${MOLD_VERSION}" \ && cd "mold-${MOLD_VERSION}" \
@@ -34,7 +34,7 @@ RUN wget --progress=dot:giga "https://github.com/rui314/mold/archive/refs/tags/v
&& ninja install \ && ninja install \
&& rm -rf /tmp/* /var/tmp/* && rm -rf /tmp/* /var/tmp/*
ARG CCACHE_VERSION=4.12.1 ARG CCACHE_VERSION=4.11.3
RUN wget --progress=dot:giga "https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}.tar.gz" \ RUN wget --progress=dot:giga "https://github.com/ccache/ccache/releases/download/v${CCACHE_VERSION}/ccache-${CCACHE_VERSION}.tar.gz" \
&& tar xf "ccache-${CCACHE_VERSION}.tar.gz" \ && tar xf "ccache-${CCACHE_VERSION}.tar.gz" \
&& cd "ccache-${CCACHE_VERSION}" \ && cd "ccache-${CCACHE_VERSION}" \
@@ -51,7 +51,7 @@ RUN apt-get update \
&& apt-get clean \ && apt-get clean \
&& rm -rf /var/lib/apt/lists/* && rm -rf /var/lib/apt/lists/*
ARG DOXYGEN_VERSION=1.15.0 ARG DOXYGEN_VERSION=1.14.0
RUN wget --progress=dot:giga "https://github.com/doxygen/doxygen/releases/download/Release_${DOXYGEN_VERSION//./_}/doxygen-${DOXYGEN_VERSION}.src.tar.gz" \ RUN wget --progress=dot:giga "https://github.com/doxygen/doxygen/releases/download/Release_${DOXYGEN_VERSION//./_}/doxygen-${DOXYGEN_VERSION}.src.tar.gz" \
&& tar xf "doxygen-${DOXYGEN_VERSION}.src.tar.gz" \ && tar xf "doxygen-${DOXYGEN_VERSION}.src.tar.gz" \
&& cd "doxygen-${DOXYGEN_VERSION}" \ && cd "doxygen-${DOXYGEN_VERSION}" \
@@ -71,13 +71,13 @@ RUN wget --progress=dot:giga "https://github.com/aras-p/ClangBuildAnalyzer/archi
&& ninja install \ && ninja install \
&& rm -rf /tmp/* /var/tmp/* && rm -rf /tmp/* /var/tmp/*
ARG GIT_CLIFF_VERSION=2.10.1 ARG GIT_CLIFF_VERSION=2.9.1
RUN wget --progress=dot:giga "https://github.com/orhun/git-cliff/releases/download/v${GIT_CLIFF_VERSION}/git-cliff-${GIT_CLIFF_VERSION}-x86_64-unknown-linux-musl.tar.gz" \ RUN wget --progress=dot:giga "https://github.com/orhun/git-cliff/releases/download/v${GIT_CLIFF_VERSION}/git-cliff-${GIT_CLIFF_VERSION}-x86_64-unknown-linux-musl.tar.gz" \
&& tar xf git-cliff-${GIT_CLIFF_VERSION}-x86_64-unknown-linux-musl.tar.gz \ && tar xf git-cliff-${GIT_CLIFF_VERSION}-x86_64-unknown-linux-musl.tar.gz \
&& mv git-cliff-${GIT_CLIFF_VERSION}/git-cliff /usr/local/bin/git-cliff \ && mv git-cliff-${GIT_CLIFF_VERSION}/git-cliff /usr/local/bin/git-cliff \
&& rm -rf /tmp/* /var/tmp/* && rm -rf /tmp/* /var/tmp/*
ARG GH_VERSION=2.82.1 ARG GH_VERSION=2.74.0
RUN wget --progress=dot:giga "https://github.com/cli/cli/releases/download/v${GH_VERSION}/gh_${GH_VERSION}_linux_${TARGETARCH}.tar.gz" \ RUN wget --progress=dot:giga "https://github.com/cli/cli/releases/download/v${GH_VERSION}/gh_${GH_VERSION}_linux_${TARGETARCH}.tar.gz" \
&& tar xf gh_${GH_VERSION}_linux_${TARGETARCH}.tar.gz \ && tar xf gh_${GH_VERSION}_linux_${TARGETARCH}.tar.gz \
&& mv gh_${GH_VERSION}_linux_${TARGETARCH}/bin/gh /usr/local/bin/gh \ && mv gh_${GH_VERSION}_linux_${TARGETARCH}/bin/gh /usr/local/bin/gh \

View File

@@ -191,7 +191,7 @@ Open the `index.html` file in your browser to see the documentation pages.
It is also possible to build Clio using [Docker](https://www.docker.com/) if you don't want to install all the dependencies on your machine. It is also possible to build Clio using [Docker](https://www.docker.com/) if you don't want to install all the dependencies on your machine.
```sh ```sh
docker run -it ghcr.io/xrplf/clio-ci:77387d8f9f13aea8f23831d221ac3e7683bb69b7 docker run -it ghcr.io/xrplf/clio-ci:b2be4b51d1d81548ca48e2f2b8f67356b880c96d
git clone https://github.com/XRPLF/clio git clone https://github.com/XRPLF/clio
cd clio cd clio
``` ```

View File

@@ -293,7 +293,7 @@ This document provides a list of all available Clio configuration properties in
- **Required**: True - **Required**: True
- **Type**: int - **Type**: int
- **Default value**: `1000` - **Default value**: `1`
- **Constraints**: The minimum value is `1`. The maximum value is `4294967295`. - **Constraints**: The minimum value is `1`. The maximum value is `4294967295`.
- **Description**: The maximum size of the server's request queue. If set to `0`, this means there is no queue size limit. - **Description**: The maximum size of the server's request queue. If set to `0`, this means there is no queue size limit.
@@ -391,7 +391,7 @@ This document provides a list of all available Clio configuration properties in
- **Type**: double - **Type**: double
- **Default value**: `10` - **Default value**: `10`
- **Constraints**: The value must be a positive double number. - **Constraints**: The value must be a positive double number.
- **Description**: The number of seconds the server waits to shutdown gracefully. If Clio does not shutdown gracefully after the specified value, it will be killed instead. - **Description**: The number of milliseconds the server waits to shutdown gracefully. If Clio does not shutdown gracefully after the specified value, it will be killed instead.
### cache.num_diffs ### cache.num_diffs
@@ -441,22 +441,6 @@ This document provides a list of all available Clio configuration properties in
- **Constraints**: The value must be one of the following: `sync`, `async`, `none`. - **Constraints**: The value must be one of the following: `sync`, `async`, `none`.
- **Description**: The strategy used for Cache loading. - **Description**: The strategy used for Cache loading.
### cache.file.path
- **Required**: False
- **Type**: string
- **Default value**: None
- **Constraints**: None
- **Description**: The path to a file where cache will be saved to on shutdown and loaded from on startup. If the file couldn't be read Clio will load cache as usual (from DB or from rippled).
### cache.file.max_sequence_age
- **Required**: True
- **Type**: int
- **Default value**: `5000`
- **Constraints**: None
- **Description**: Max allowed difference between the latest sequence in DB and in cache file. If the cache file is too old (contains too low latest sequence) Clio will reject using it.
### log.channels.[].channel ### log.channels.[].channel
- **Required**: False - **Required**: False

View File

@@ -61,7 +61,7 @@
"ip": "0.0.0.0", "ip": "0.0.0.0",
"port": 51233, "port": 51233,
// Max number of requests to queue up before rejecting further requests. // Max number of requests to queue up before rejecting further requests.
// Defaults to 1000 (use 0 to make the queue unbound). // Defaults to 0, which disables the limit.
"max_queue_size": 500, "max_queue_size": 500,
// If request contains header with authorization, Clio will check if it matches the prefix 'Password ' + this value's sha256 hash // If request contains header with authorization, Clio will check if it matches the prefix 'Password ' + this value's sha256 hash
// If matches, the request will be considered as admin request // If matches, the request will be considered as admin request
@@ -137,11 +137,7 @@
// "num_cursors_from_account": 3200, // Read the cursors from the account table until we have enough cursors to partition the ledger to load concurrently. // "num_cursors_from_account": 3200, // Read the cursors from the account table until we have enough cursors to partition the ledger to load concurrently.
"num_markers": 48, // The number of markers is the number of coroutines to load the cache concurrently. "num_markers": 48, // The number of markers is the number of coroutines to load the cache concurrently.
"page_fetch_size": 512, // The number of rows to load for each page. "page_fetch_size": 512, // The number of rows to load for each page.
"load": "async", // "sync" to load cache synchronously or "async" to load cache asynchronously or "none"/"no" to turn off the cache. "load": "async" // "sync" to load cache synchronously or "async" to load cache asynchronously or "none"/"no" to turn off the cache.
"file": {
"path": "./cache.bin",
"max_sequence_age": 5000
}
}, },
"prometheus": { "prometheus": {
"enabled": true, "enabled": true,

View File

@@ -44,13 +44,8 @@ def fix_colon_spacing(cpp_content: str) -> str:
def fix_indentation(cpp_content: str) -> str: def fix_indentation(cpp_content: str) -> str:
if "JSON(" not in cpp_content:
return cpp_content
lines = cpp_content.splitlines() lines = cpp_content.splitlines()
ends_with_newline = cpp_content.endswith('\n')
def find_indentation(line: str) -> int: def find_indentation(line: str) -> int:
return len(line) - len(line.lstrip()) return len(line) - len(line.lstrip())
@@ -71,11 +66,7 @@ def fix_indentation(cpp_content: str) -> str:
break break
lines[i] = lines[i][by_how_much:] if by_how_much > 0 else " " * (-by_how_much) + lines[i] lines[i] = lines[i][by_how_much:] if by_how_much > 0 else " " * (-by_how_much) + lines[i]
result = "\n".join(lines) return "\n".join(lines) + "\n"
if ends_with_newline:
result += "\n"
return result
def process_file(file_path: Path, dry_run: bool) -> bool: def process_file(file_path: Path, dry_run: bool) -> bool:

View File

@@ -2,6 +2,7 @@ add_subdirectory(util)
add_subdirectory(data) add_subdirectory(data)
add_subdirectory(cluster) add_subdirectory(cluster)
add_subdirectory(etl) add_subdirectory(etl)
add_subdirectory(etlng)
add_subdirectory(feed) add_subdirectory(feed)
add_subdirectory(rpc) add_subdirectory(rpc)
add_subdirectory(web) add_subdirectory(web)

View File

@@ -5,9 +5,10 @@ target_link_libraries(
clio_app clio_app
PUBLIC clio_cluster PUBLIC clio_cluster
clio_etl clio_etl
clio_etlng
clio_feed clio_feed
clio_migration
clio_rpc
clio_web clio_web
clio_rpc
clio_migration
PRIVATE Boost::program_options PRIVATE Boost::program_options
) )

View File

@@ -25,10 +25,11 @@
#include "data/AmendmentCenter.hpp" #include "data/AmendmentCenter.hpp"
#include "data/BackendFactory.hpp" #include "data/BackendFactory.hpp"
#include "data/LedgerCache.hpp" #include "data/LedgerCache.hpp"
#include "data/LedgerCacheSaver.hpp"
#include "etl/ETLService.hpp" #include "etl/ETLService.hpp"
#include "etl/LoadBalancer.hpp" #include "etl/LoadBalancer.hpp"
#include "etl/NetworkValidatedLedgers.hpp" #include "etl/NetworkValidatedLedgers.hpp"
#include "etlng/LoadBalancer.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManager.hpp" #include "feed/SubscriptionManager.hpp"
#include "migration/MigrationInspectorFactory.hpp" #include "migration/MigrationInspectorFactory.hpp"
#include "rpc/Counters.hpp" #include "rpc/Counters.hpp"
@@ -56,7 +57,6 @@
#include <cstdlib> #include <cstdlib>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string>
#include <thread> #include <thread>
#include <utility> #include <utility>
#include <vector> #include <vector>
@@ -99,23 +99,20 @@ ClioApplication::run(bool const useNgWebServer)
auto const threads = config_.get<uint16_t>("io_threads"); auto const threads = config_.get<uint16_t>("io_threads");
LOG(util::LogService::info()) << "Number of io threads = " << threads; LOG(util::LogService::info()) << "Number of io threads = " << threads;
// Similarly we need a context to run ETL on
// In the future we can remove the raw ioc and use ctx instead
// This context should be above ioc because its reference is getting into tasks inside ioc
util::async::CoroExecutionContext ctx{threads};
// IO context to handle all incoming requests, as well as other things. // IO context to handle all incoming requests, as well as other things.
// This is not the only io context in the application. // This is not the only io context in the application.
boost::asio::io_context ioc{threads}; boost::asio::io_context ioc{threads};
// Similarly we need a context to run ETLng on
// In the future we can remove the raw ioc and use ctx instead
util::async::CoroExecutionContext ctx{threads};
// Rate limiter, to prevent abuse // Rate limiter, to prevent abuse
auto whitelistHandler = web::dosguard::WhitelistHandler{config_}; auto whitelistHandler = web::dosguard::WhitelistHandler{config_};
auto const dosguardWeights = web::dosguard::Weights::make(config_); auto const dosguardWeights = web::dosguard::Weights::make(config_);
auto dosGuard = web::dosguard::DOSGuard{config_, whitelistHandler, dosguardWeights}; auto dosGuard = web::dosguard::DOSGuard{config_, whitelistHandler, dosguardWeights};
auto sweepHandler = web::dosguard::IntervalSweepHandler{config_, ioc, dosGuard}; auto sweepHandler = web::dosguard::IntervalSweepHandler{config_, ioc, dosGuard};
auto cache = data::LedgerCache{}; auto cache = data::LedgerCache{};
auto cacheSaver = data::LedgerCacheSaver{config_, cache};
// Interface to the database // Interface to the database
auto backend = data::makeBackend(config_, cache); auto backend = data::makeBackend(config_, cache);
@@ -145,12 +142,20 @@ ClioApplication::run(bool const useNgWebServer)
// ETL uses the balancer to extract data. // ETL uses the balancer to extract data.
// The server uses the balancer to forward RPCs to a rippled node. // The server uses the balancer to forward RPCs to a rippled node.
// The balancer itself publishes to streams (transactions_proposed and accounts_proposed) // The balancer itself publishes to streams (transactions_proposed and accounts_proposed)
auto balancer = etl::LoadBalancer::makeLoadBalancer( auto balancer = [&] -> std::shared_ptr<etlng::LoadBalancerInterface> {
if (config_.get<bool>("__ng_etl")) {
return etlng::LoadBalancer::makeLoadBalancer(
config_, ioc, backend, subscriptions, std::make_unique<util::MTRandomGenerator>(), ledgers config_, ioc, backend, subscriptions, std::make_unique<util::MTRandomGenerator>(), ledgers
); );
}
return etl::LoadBalancer::makeLoadBalancer(
config_, ioc, backend, subscriptions, std::make_unique<util::MTRandomGenerator>(), ledgers
);
}();
// ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes // ETL is responsible for writing and publishing to streams. In read-only mode, ETL only publishes
auto etl = etl::ETLService::makeETLService(config_, ctx, backend, subscriptions, balancer, ledgers); auto etl = etl::ETLService::makeETLService(config_, ioc, ctx, backend, subscriptions, balancer, ledgers);
auto workQueue = rpc::WorkQueue::makeWorkQueue(config_); auto workQueue = rpc::WorkQueue::makeWorkQueue(config_);
auto counters = rpc::Counters::makeCounters(workQueue); auto counters = rpc::Counters::makeCounters(workQueue);
@@ -196,7 +201,7 @@ ClioApplication::run(bool const useNgWebServer)
} }
appStopper_.setOnStop( appStopper_.setOnStop(
Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc) Stopper::makeOnStopCallback(httpServer.value(), *balancer, *etl, *subscriptions, *backend, ioc)
); );
// Blocks until stopped. // Blocks until stopped.
@@ -211,9 +216,6 @@ ClioApplication::run(bool const useNgWebServer)
auto handler = std::make_shared<web::RPCServerHandler<RPCEngineType>>(config_, backend, rpcEngine, etl, dosGuard); auto handler = std::make_shared<web::RPCServerHandler<RPCEngineType>>(config_, backend, rpcEngine, etl, dosGuard);
auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler, cache); auto const httpServer = web::makeHttpServer(config_, ioc, dosGuard, handler, cache);
appStopper_.setOnStop(
Stopper::makeOnStopCallback(*httpServer, *balancer, *etl, *subscriptions, *backend, cacheSaver, ioc)
);
// Blocks until stopped. // Blocks until stopped.
// When stopped, shared_ptrs fall out of scope // When stopped, shared_ptrs fall out of scope

View File

@@ -20,13 +20,12 @@
#pragma once #pragma once
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/LedgerCacheSaver.hpp" #include "etlng/ETLServiceInterface.hpp"
#include "etl/ETLServiceInterface.hpp" #include "etlng/LoadBalancerInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/CoroutineGroup.hpp" #include "util/CoroutineGroup.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "web/interface/Concepts.hpp" #include "web/ng/Server.hpp"
#include <boost/asio/executor_work_guard.hpp> #include <boost/asio/executor_work_guard.hpp>
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
@@ -72,25 +71,21 @@ public:
* @param etl The ETL service to stop. * @param etl The ETL service to stop.
* @param subscriptions The subscription manager to stop. * @param subscriptions The subscription manager to stop.
* @param backend The backend to stop. * @param backend The backend to stop.
* @param cacheSaver The ledger cache saver
* @param ioc The io_context to stop. * @param ioc The io_context to stop.
* @return The callback to be called on application stop. * @return The callback to be called on application stop.
*/ */
template <web::SomeServer ServerType, data::SomeLedgerCacheSaver LedgerCacheSaverType> template <web::ng::SomeServer ServerType>
static std::function<void(boost::asio::yield_context)> static std::function<void(boost::asio::yield_context)>
makeOnStopCallback( makeOnStopCallback(
ServerType& server, ServerType& server,
etl::LoadBalancerInterface& balancer, etlng::LoadBalancerInterface& balancer,
etl::ETLServiceInterface& etl, etlng::ETLServiceInterface& etl,
feed::SubscriptionManagerInterface& subscriptions, feed::SubscriptionManagerInterface& subscriptions,
data::BackendInterface& backend, data::BackendInterface& backend,
LedgerCacheSaverType& cacheSaver,
boost::asio::io_context& ioc boost::asio::io_context& ioc
) )
{ {
return [&](boost::asio::yield_context yield) { return [&](boost::asio::yield_context yield) {
cacheSaver.save();
util::CoroutineGroup coroutineGroup{yield}; util::CoroutineGroup coroutineGroup{yield};
coroutineGroup.spawn(yield, [&server](auto innerYield) { coroutineGroup.spawn(yield, [&server](auto innerYield) {
server.stop(innerYield); server.stop(innerYield);
@@ -111,8 +106,6 @@ public:
backend.waitForWritesToFinish(); backend.waitForWritesToFinish();
LOG(util::LogService::info()) << "Backend writes finished"; LOG(util::LogService::info()) << "Backend writes finished";
cacheSaver.waitToFinish();
ioc.stop(); ioc.stop();
LOG(util::LogService::info()) << "io_context stopped"; LOG(util::LogService::info()) << "io_context stopped";

View File

@@ -147,11 +147,6 @@ struct Amendments {
REGISTER(fixAMMClawbackRounding); REGISTER(fixAMMClawbackRounding);
REGISTER(fixMPTDeliveredAmount); REGISTER(fixMPTDeliveredAmount);
REGISTER(fixPriceOracleOrder); REGISTER(fixPriceOracleOrder);
REGISTER(DynamicMPT);
REGISTER(fixDelegateV1_1);
REGISTER(fixDirectoryLimit);
REGISTER(fixIncludeKeyletFields);
REGISTER(fixTokenEscrowV1);
// Obsolete but supported by libxrpl // Obsolete but supported by libxrpl
REGISTER(CryptoConditionsSuite); REGISTER(CryptoConditionsSuite);

View File

@@ -270,7 +270,7 @@ BackendInterface::updateRange(uint32_t newMax)
{ {
std::scoped_lock const lck(rngMtx_); std::scoped_lock const lck(rngMtx_);
if (range_.has_value() and newMax < range_->maxSequence) { if (range_.has_value() && newMax < range_->maxSequence) {
ASSERT( ASSERT(
false, false,
"Range shouldn't exist yet or newMax should be at least range->maxSequence. newMax = {}, " "Range shouldn't exist yet or newMax should be at least range->maxSequence. newMax = {}, "
@@ -280,14 +280,11 @@ BackendInterface::updateRange(uint32_t newMax)
); );
} }
updateRangeImpl(newMax); if (!range_.has_value()) {
} range_ = {.minSequence = newMax, .maxSequence = newMax};
} else {
void range_->maxSequence = newMax;
BackendInterface::forceUpdateRange(uint32_t newMax) }
{
std::scoped_lock const lck(rngMtx_);
updateRangeImpl(newMax);
} }
void void
@@ -413,14 +410,4 @@ BackendInterface::fetchFees(std::uint32_t const seq, boost::asio::yield_context
return fees; return fees;
} }
void
BackendInterface::updateRangeImpl(uint32_t newMax)
{
if (!range_.has_value()) {
range_ = {.minSequence = newMax, .maxSequence = newMax};
} else {
range_->maxSequence = newMax;
}
}
} // namespace data } // namespace data

View File

@@ -249,15 +249,6 @@ public:
void void
updateRange(uint32_t newMax); updateRange(uint32_t newMax);
/**
* @brief Updates the range of sequences that are stored in the DB without any checks
* @note In the most cases you should use updateRange() instead
*
* @param newMax The new maximum sequence available
*/
void
forceUpdateRange(uint32_t newMax);
/** /**
* @brief Sets the range of sequences that are stored in the DB. * @brief Sets the range of sequences that are stored in the DB.
* *
@@ -785,9 +776,6 @@ private:
*/ */
virtual bool virtual bool
doFinishWrites() = 0; doFinishWrites() = 0;
void
updateRangeImpl(uint32_t newMax);
}; };
} // namespace data } // namespace data

View File

@@ -5,7 +5,6 @@ target_sources(
BackendCounters.cpp BackendCounters.cpp
BackendInterface.cpp BackendInterface.cpp
LedgerCache.cpp LedgerCache.cpp
LedgerCacheSaver.cpp
LedgerHeaderCache.cpp LedgerHeaderCache.cpp
cassandra/impl/Future.cpp cassandra/impl/Future.cpp
cassandra/impl/Cluster.cpp cassandra/impl/Cluster.cpp
@@ -15,9 +14,6 @@ target_sources(
cassandra/impl/SslContext.cpp cassandra/impl/SslContext.cpp
cassandra/Handle.cpp cassandra/Handle.cpp
cassandra/SettingsProvider.cpp cassandra/SettingsProvider.cpp
impl/InputFile.cpp
impl/LedgerCacheFile.cpp
impl/OutputFile.cpp
) )
target_link_libraries(clio_data PUBLIC cassandra-cpp-driver::cassandra-cpp-driver clio_util) target_link_libraries(clio_data PUBLIC cassandra-cpp-driver::cassandra-cpp-driver clio_util)

View File

@@ -20,22 +20,16 @@
#include "data/LedgerCache.hpp" #include "data/LedgerCache.hpp"
#include "data/Types.hpp" #include "data/Types.hpp"
#include "data/impl/LedgerCacheFile.hpp" #include "etlng/Models.hpp"
#include "etl/Models.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include <xrpl/basics/base_uint.h> #include <xrpl/basics/base_uint.h>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <cstdlib>
#include <cstring>
#include <map>
#include <mutex> #include <mutex>
#include <optional> #include <optional>
#include <shared_mutex> #include <shared_mutex>
#include <string>
#include <utility>
#include <vector> #include <vector>
namespace data { namespace data {
@@ -95,7 +89,7 @@ LedgerCache::update(std::vector<LedgerObject> const& objs, uint32_t seq, bool is
} }
void void
LedgerCache::update(std::vector<etl::model::Object> const& objs, uint32_t seq) LedgerCache::update(std::vector<etlng::model::Object> const& objs, uint32_t seq)
{ {
if (disabled_) if (disabled_)
return; return;
@@ -257,34 +251,4 @@ LedgerCache::getSuccessorHitRate() const
return static_cast<float>(successorHitCounter_.get().value()) / successorReqCounter_.get().value(); return static_cast<float>(successorHitCounter_.get().value()) / successorReqCounter_.get().value();
} }
std::expected<void, std::string>
LedgerCache::saveToFile(std::string const& path) const
{
if (not isFull()) {
return std::unexpected{"Ledger cache is not full"};
}
impl::LedgerCacheFile file{path};
std::shared_lock const lock{mtx_};
impl::LedgerCacheFile::DataView const data{.latestSeq = latestSeq_, .map = map_, .deleted = deleted_};
return file.write(data);
}
std::expected<void, std::string>
LedgerCache::loadFromFile(std::string const& path, uint32_t minLatestSequence)
{
impl::LedgerCacheFile file{path};
auto data = file.read(minLatestSequence);
if (not data.has_value()) {
return std::unexpected(std::move(data).error());
}
auto [latestSeq, map, deleted] = std::move(data).value();
std::unique_lock const lock{mtx_};
latestSeq_ = latestSeq;
map_ = std::move(map);
deleted_ = std::move(deleted);
full_ = true;
return {};
}
} // namespace data } // namespace data

View File

@@ -21,7 +21,7 @@
#include "data/LedgerCacheInterface.hpp" #include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp" #include "data/Types.hpp"
#include "etl/Models.hpp" #include "etlng/Models.hpp"
#include "util/prometheus/Bool.hpp" #include "util/prometheus/Bool.hpp"
#include "util/prometheus/Counter.hpp" #include "util/prometheus/Counter.hpp"
#include "util/prometheus/Label.hpp" #include "util/prometheus/Label.hpp"
@@ -37,7 +37,6 @@
#include <map> #include <map>
#include <optional> #include <optional>
#include <shared_mutex> #include <shared_mutex>
#include <string>
#include <unordered_set> #include <unordered_set>
#include <vector> #include <vector>
@@ -47,16 +46,11 @@ namespace data {
* @brief Cache for an entire ledger. * @brief Cache for an entire ledger.
*/ */
class LedgerCache : public LedgerCacheInterface { class LedgerCache : public LedgerCacheInterface {
public:
/** @brief An entry of the cache */
struct CacheEntry { struct CacheEntry {
uint32_t seq = 0; uint32_t seq = 0;
Blob blob; Blob blob;
}; };
using CacheMap = std::map<ripple::uint256, CacheEntry>;
private:
// counters for fetchLedgerObject(s) hit rate // counters for fetchLedgerObject(s) hit rate
std::reference_wrapper<util::prometheus::CounterInt> objectReqCounter_{PrometheusService::counterInt( std::reference_wrapper<util::prometheus::CounterInt> objectReqCounter_{PrometheusService::counterInt(
"ledger_cache_counter_total_number", "ledger_cache_counter_total_number",
@@ -79,8 +73,8 @@ private:
util::prometheus::Labels({{"type", "cache_hit"}, {"fetch", "successor_key"}}) util::prometheus::Labels({{"type", "cache_hit"}, {"fetch", "successor_key"}})
)}; )};
CacheMap map_; std::map<ripple::uint256, CacheEntry> map_;
CacheMap deleted_; std::map<ripple::uint256, CacheEntry> deleted_;
mutable std::shared_mutex mtx_; mutable std::shared_mutex mtx_;
std::condition_variable_any cv_; std::condition_variable_any cv_;
@@ -104,7 +98,7 @@ public:
update(std::vector<LedgerObject> const& objs, uint32_t seq, bool isBackground) override; update(std::vector<LedgerObject> const& objs, uint32_t seq, bool isBackground) override;
void void
update(std::vector<etl::model::Object> const& objs, uint32_t seq) override; update(std::vector<etlng::model::Object> const& objs, uint32_t seq) override;
std::optional<Blob> std::optional<Blob>
get(ripple::uint256 const& key, uint32_t seq) const override; get(ripple::uint256 const& key, uint32_t seq) const override;
@@ -144,12 +138,6 @@ public:
void void
waitUntilCacheContainsSeq(uint32_t seq) override; waitUntilCacheContainsSeq(uint32_t seq) override;
std::expected<void, std::string>
saveToFile(std::string const& path) const override;
std::expected<void, std::string>
loadFromFile(std::string const& path, uint32_t minLatestSequence) override;
}; };
} // namespace data } // namespace data

View File

@@ -20,16 +20,14 @@
#pragma once #pragma once
#include "data/Types.hpp" #include "data/Types.hpp"
#include "etl/Models.hpp" #include "etlng/Models.hpp"
#include <xrpl/basics/base_uint.h> #include <xrpl/basics/base_uint.h>
#include <xrpl/basics/hardened_hash.h> #include <xrpl/basics/hardened_hash.h>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <expected>
#include <optional> #include <optional>
#include <string>
#include <vector> #include <vector>
namespace data { namespace data {
@@ -65,7 +63,7 @@ public:
* @param seq The sequence to update cache for * @param seq The sequence to update cache for
*/ */
virtual void virtual void
update(std::vector<etl::model::Object> const& objs, uint32_t seq) = 0; update(std::vector<etlng::model::Object> const& objs, uint32_t seq) = 0;
/** /**
* @brief Fetch a cached object by its key and sequence number. * @brief Fetch a cached object by its key and sequence number.
@@ -170,27 +168,6 @@ public:
*/ */
virtual void virtual void
waitUntilCacheContainsSeq(uint32_t seq) = 0; waitUntilCacheContainsSeq(uint32_t seq) = 0;
/**
* @brief Save the cache to file
* @note This operation takes about 7 seconds and it keeps a shared lock of mtx_
*
* @param path The file path to save the cache to
* @return An error as a string if any
*/
[[nodiscard]] virtual std::expected<void, std::string>
saveToFile(std::string const& path) const = 0;
/**
* @brief Load the cache from file
* @note This operation takes about 7 seconds and it keeps mtx_ exclusively locked
*
* @param path The file path to load data from
* @param minLatestSequence The minimum allowed value of the latestLedgerSequence in cache file
* @return An error as a string if any
*/
[[nodiscard]] virtual std::expected<void, std::string>
loadFromFile(std::string const& path, uint32_t minLatestSequence) = 0;
}; };
} // namespace data } // namespace data

View File

@@ -1,70 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/LedgerCacheSaver.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <string>
#include <thread>
namespace data {
LedgerCacheSaver::LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache)
: cacheFilePath_(config.maybeValue<std::string>("cache.file.path")), cache_(cache)
{
}
LedgerCacheSaver::~LedgerCacheSaver()
{
waitToFinish();
}
void
LedgerCacheSaver::save()
{
ASSERT(not savingThread_.has_value(), "Multiple save() calls are not allowed");
savingThread_ = std::thread([this]() {
if (not cacheFilePath_.has_value()) {
return;
}
LOG(util::LogService::info()) << "Saving ledger cache to " << *cacheFilePath_;
if (auto const [success, durationMs] = util::timed([&]() { return cache_.get().saveToFile(*cacheFilePath_); });
success.has_value()) {
LOG(util::LogService::info()) << "Successfully saved ledger cache in " << durationMs << " ms";
} else {
LOG(util::LogService::error()) << "Error saving LedgerCache to file: " << success.error();
}
});
}
void
LedgerCacheSaver::waitToFinish()
{
if (savingThread_.has_value() and savingThread_->joinable()) {
savingThread_->join();
}
savingThread_.reset();
}
} // namespace data

View File

@@ -1,93 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/LedgerCacheInterface.hpp"
#include "util/config/ConfigDefinition.hpp"
#include <concepts>
#include <functional>
#include <optional>
#include <string>
#include <thread>
namespace data {
/**
* @brief A concept for a class that can save ledger cache asynchronously.
*
* This concept defines the interface requirements for any type that manages
* asynchronous saving of ledger cache to persistent storage.
*/
template <typename T>
concept SomeLedgerCacheSaver = requires(T a) {
{ a.save() } -> std::same_as<void>;
{ a.waitToFinish() } -> std::same_as<void>;
};
/**
* @brief Manages asynchronous saving of ledger cache to a file.
*
* This class provides functionality to save the ledger cache to a file in a separate thread,
* allowing the main application to continue without blocking. The file path is configured
* through the application's configuration system.
*/
class LedgerCacheSaver {
std::optional<std::string> cacheFilePath_;
std::reference_wrapper<LedgerCacheInterface const> cache_;
std::optional<std::thread> savingThread_;
public:
/**
* @brief Constructs a LedgerCacheSaver instance.
*
* @param config The configuration object containing the cache file path setting
* @param cache Reference to the ledger cache interface to be saved
*/
LedgerCacheSaver(util::config::ClioConfigDefinition const& config, LedgerCacheInterface const& cache);
/**
* @brief Destructor that ensures the saving thread is properly joined.
*
* Waits for any ongoing save operation to complete before destruction.
*/
~LedgerCacheSaver();
/**
* @brief Initiates an asynchronous save operation of the ledger cache.
*
* Spawns a new thread that saves the ledger cache to the configured file path.
* If no file path is configured, the operation is skipped. Logs the progress
* and result of the save operation.
*/
void
save();
/**
* @brief Waits for the saving thread to complete.
*
* Blocks until the saving operation finishes if a thread is currently active.
* Safe to call multiple times or when no save operation is in progress.
*/
void
waitToFinish();
};
} // namespace data

View File

@@ -247,9 +247,6 @@ struct MPTHoldersAndCursor {
struct LedgerRange { struct LedgerRange {
std::uint32_t minSequence = 0; std::uint32_t minSequence = 0;
std::uint32_t maxSequence = 0; std::uint32_t maxSequence = 0;
bool
operator==(LedgerRange const&) const = default;
}; };
/** /**

View File

@@ -1,58 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/impl/InputFile.hpp"
#include <xrpl/basics/base_uint.h>
#include <cstddef>
#include <cstring>
#include <ios>
#include <iosfwd>
#include <string>
#include <utility>
namespace data::impl {
InputFile::InputFile(std::string const& path) : file_(path, std::ios::binary | std::ios::in)
{
}
bool
InputFile::isOpen() const
{
return file_.is_open();
}
bool
InputFile::readRaw(char* data, size_t size)
{
file_.read(data, size);
shasum_.update(data, size);
return not file_.fail();
}
ripple::uint256
InputFile::hash() const
{
auto sum = shasum_;
return std::move(sum).finalize();
}
} // namespace data::impl

View File

@@ -1,210 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/impl/LedgerCacheFile.hpp"
#include "data/LedgerCache.hpp"
#include "data/Types.hpp"
#include <fmt/format.h>
#include <xrpl/basics/base_uint.h>
#include <algorithm>
#include <array>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <filesystem>
#include <string>
#include <utility>
namespace data::impl {
using Hash = ripple::uint256;
using Separator = std::array<char, 16>;
static constexpr Separator kSEPARATOR = {0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0};
namespace {
std::expected<std::pair<ripple::uint256, LedgerCache::CacheEntry>, std::string>
readCacheEntry(InputFile& file, size_t i)
{
ripple::uint256 key;
if (not file.readRaw(reinterpret_cast<char*>(key.data()), ripple::base_uint<256>::bytes)) {
return std::unexpected(fmt::format("Failed to read key at index {}", i));
}
uint32_t seq{};
if (not file.read(seq)) {
return std::unexpected(fmt::format("Failed to read sequence at index {}", i));
}
size_t blobSize{};
if (not file.read(blobSize)) {
return std::unexpected(fmt::format("Failed to read blob size at index {}", i));
}
Blob blob(blobSize);
if (not file.readRaw(reinterpret_cast<char*>(blob.data()), blobSize)) {
return std::unexpected(fmt::format("Failed to read blob data at index {}", i));
}
return std::make_pair(key, LedgerCache::CacheEntry{.seq = seq, .blob = std::move(blob)});
}
std::expected<void, std::string>
verifySeparator(Separator const& s)
{
if (not std::ranges::all_of(s, [](char c) { return c == 0; })) {
return std::unexpected{"Separator verification failed - data corruption detected"};
}
return {};
}
} // anonymous namespace
LedgerCacheFile::LedgerCacheFile(std::string path) : path_(std::move(path))
{
}
std::expected<void, std::string>
LedgerCacheFile::write(DataView dataView)
{
auto const newFilePath = fmt::format("{}.new", path_);
auto file = OutputFile{newFilePath};
if (not file.isOpen()) {
return std::unexpected{fmt::format("Couldn't open file: {}", newFilePath)};
}
Header const header{
.latestSeq = dataView.latestSeq, .mapSize = dataView.map.size(), .deletedSize = dataView.deleted.size()
};
file.write(header);
file.write(kSEPARATOR);
for (auto const& [k, v] : dataView.map) {
file.write(k.data(), decltype(k)::bytes);
file.write(v.seq);
file.write(v.blob.size());
file.writeRaw(reinterpret_cast<char const*>(v.blob.data()), v.blob.size());
}
file.write(kSEPARATOR);
for (auto const& [k, v] : dataView.deleted) {
file.write(k.data(), decltype(k)::bytes);
file.write(v.seq);
file.write(v.blob.size());
file.writeRaw(reinterpret_cast<char const*>(v.blob.data()), v.blob.size());
}
file.write(kSEPARATOR);
auto const hash = file.hash();
file.write(hash.data(), decltype(hash)::bytes);
try {
std::filesystem::rename(newFilePath, path_);
} catch (std::exception const& e) {
return std::unexpected{fmt::format("Error moving cache file from {} to {}: {}", newFilePath, path_, e.what())};
}
return {};
}
std::expected<LedgerCacheFile::Data, std::string>
LedgerCacheFile::read(uint32_t minLatestSequence)
{
try {
auto file = InputFile{path_};
if (not file.isOpen()) {
return std::unexpected{fmt::format("Couldn't open file: {}", path_)};
}
Data result;
Header header{};
if (not file.read(header)) {
return std::unexpected{"Error reading cache header"};
}
if (header.version != kVERSION) {
return std::unexpected{
fmt::format("Cache has wrong version: expected {} found {}", kVERSION, header.version)
};
}
if (header.latestSeq < minLatestSequence) {
return std::unexpected{fmt::format("Latest sequence ({}) in the cache file is too low.", header.latestSeq)};
}
result.latestSeq = header.latestSeq;
Separator separator{};
if (not file.readRaw(separator.data(), separator.size())) {
return std::unexpected{"Error reading cache header"};
}
if (auto verificationResult = verifySeparator(separator); not verificationResult.has_value()) {
return std::unexpected{std::move(verificationResult).error()};
}
for (size_t i = 0; i < header.mapSize; ++i) {
auto cacheEntryExpected = readCacheEntry(file, i);
if (not cacheEntryExpected.has_value()) {
return std::unexpected{std::move(cacheEntryExpected).error()};
}
// Using insert with hint here to decrease insert operation complexity to the amortized constant instead of
// logN
result.map.insert(result.map.end(), std::move(cacheEntryExpected).value());
}
if (not file.readRaw(separator.data(), separator.size())) {
return std::unexpected{"Error reading separator"};
}
if (auto verificationResult = verifySeparator(separator); not verificationResult.has_value()) {
return std::unexpected{std::move(verificationResult).error()};
}
for (size_t i = 0; i < header.deletedSize; ++i) {
auto cacheEntryExpected = readCacheEntry(file, i);
if (not cacheEntryExpected.has_value()) {
return std::unexpected{std::move(cacheEntryExpected).error()};
}
result.deleted.insert(result.deleted.end(), std::move(cacheEntryExpected).value());
}
if (not file.readRaw(separator.data(), separator.size())) {
return std::unexpected{"Error reading separator"};
}
if (auto verificationResult = verifySeparator(separator); not verificationResult.has_value()) {
return std::unexpected{std::move(verificationResult).error()};
}
auto const dataHash = file.hash();
ripple::uint256 hashFromFile{};
if (not file.readRaw(reinterpret_cast<char*>(hashFromFile.data()), decltype(hashFromFile)::bytes)) {
return std::unexpected{"Error reading hash"};
}
if (dataHash != hashFromFile) {
return std::unexpected{"Hash file corruption detected"};
}
return result;
} catch (std::exception const& e) {
return std::unexpected{fmt::format(" Error reading cache file: {}", e.what())};
} catch (...) {
return std::unexpected{fmt::format(" Error reading cache file")};
}
}
} // namespace data::impl

View File

@@ -1,70 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/LedgerCache.hpp"
#include "data/impl/InputFile.hpp"
#include "data/impl/OutputFile.hpp"
#include <fmt/format.h>
#include <xrpl/basics/base_uint.h>
#include <cstddef>
#include <cstdint>
#include <cstring>
#include <string>
namespace data::impl {
class LedgerCacheFile {
public:
struct Header {
uint32_t version = kVERSION;
uint32_t latestSeq{};
uint64_t mapSize{};
uint64_t deletedSize{};
};
private:
static constexpr uint32_t kVERSION = 1;
std::string path_;
public:
template <typename T>
struct DataBase {
uint32_t latestSeq{0};
T map;
T deleted;
};
using DataView = DataBase<LedgerCache::CacheMap const&>;
using Data = DataBase<LedgerCache::CacheMap>;
LedgerCacheFile(std::string path);
std::expected<void, std::string>
write(DataView dataView);
std::expected<Data, std::string>
read(uint32_t minLatestSequence);
};
} // namespace data::impl

View File

@@ -1,62 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "data/impl/OutputFile.hpp"
#include <xrpl/basics/base_uint.h>
#include <cstddef>
#include <cstring>
#include <ios>
#include <string>
#include <utility>
namespace data::impl {
OutputFile::OutputFile(std::string const& path) : file_(path, std::ios::binary | std::ios::out)
{
}
bool
OutputFile::isOpen() const
{
return file_.is_open();
}
void
OutputFile::writeRaw(char const* data, size_t size)
{
writeToFile(data, size);
}
void
OutputFile::writeToFile(char const* data, size_t size)
{
file_.write(data, size);
shasum_.update(data, size);
}
ripple::uint256
OutputFile::hash() const
{
auto sum = shasum_;
return std::move(sum).finalize();
}
} // namespace data::impl

View File

@@ -1,68 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "util/Shasum.hpp"
#include <xrpl/basics/base_uint.h>
#include <cstddef>
#include <cstring>
#include <fstream>
#include <string>
namespace data::impl {
class OutputFile {
std::ofstream file_;
util::Sha256sum shasum_;
public:
OutputFile(std::string const& path);
bool
isOpen() const;
template <typename T>
void
write(T&& data)
{
writeRaw(reinterpret_cast<char const*>(&data), sizeof(T));
}
template <typename T>
void
write(T const* data, size_t const size)
{
writeRaw(reinterpret_cast<char const*>(data), size);
}
void
writeRaw(char const* data, size_t size);
ripple::uint256
hash() const;
private:
void
writeToFile(char const* data, size_t size);
};
} // namespace data::impl

View File

@@ -7,24 +7,14 @@ target_sources(
ETLService.cpp ETLService.cpp
ETLState.cpp ETLState.cpp
LoadBalancer.cpp LoadBalancer.cpp
MPTHelpers.cpp
NetworkValidatedLedgers.cpp NetworkValidatedLedgers.cpp
NFTHelpers.cpp NFTHelpers.cpp
Source.cpp Source.cpp
MPTHelpers.cpp
impl/AmendmentBlockHandler.cpp impl/AmendmentBlockHandler.cpp
impl/AsyncGrpcCall.cpp
impl/Extraction.cpp
impl/ForwardingSource.cpp impl/ForwardingSource.cpp
impl/GrpcSource.cpp impl/GrpcSource.cpp
impl/Loading.cpp
impl/Monitor.cpp
impl/SubscriptionSource.cpp impl/SubscriptionSource.cpp
impl/TaskManager.cpp
impl/ext/Cache.cpp
impl/ext/Core.cpp
impl/ext/MPT.cpp
impl/ext/NFT.cpp
impl/ext/Successor.cpp
) )
target_link_libraries(clio_etl PUBLIC clio_data) target_link_libraries(clio_etl PUBLIC clio_data)

View File

@@ -21,20 +21,16 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/LedgerCacheInterface.hpp" #include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/CacheLoaderInterface.hpp"
#include "etl/CacheLoaderSettings.hpp" #include "etl/CacheLoaderSettings.hpp"
#include "etl/impl/CacheLoader.hpp" #include "etl/impl/CacheLoader.hpp"
#include "etl/impl/CursorFromAccountProvider.hpp" #include "etl/impl/CursorFromAccountProvider.hpp"
#include "etl/impl/CursorFromDiffProvider.hpp" #include "etl/impl/CursorFromDiffProvider.hpp"
#include "etl/impl/CursorFromFixDiffNumProvider.hpp" #include "etl/impl/CursorFromFixDiffNumProvider.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/context/BasicExecutionContext.hpp" #include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <algorithm>
#include <cstdint> #include <cstdint>
#include <functional> #include <functional>
#include <memory> #include <memory>
@@ -52,7 +48,7 @@ namespace etl {
* @tparam ExecutionContextType The type of the execution context to use * @tparam ExecutionContextType The type of the execution context to use
*/ */
template <typename ExecutionContextType = util::async::CoroExecutionContext> template <typename ExecutionContextType = util::async::CoroExecutionContext>
class CacheLoader : public CacheLoaderInterface { class CacheLoader : public etlng::CacheLoaderInterface {
using CacheLoaderType = impl::CacheLoaderImpl<data::LedgerCacheInterface>; using CacheLoaderType = impl::CacheLoaderImpl<data::LedgerCacheInterface>;
util::Logger log_{"ETL"}; util::Logger log_{"ETL"};
@@ -102,10 +98,6 @@ public:
return; return;
} }
if (loadCacheFromFile()) {
return;
}
std::shared_ptr<impl::BaseCursorProvider> provider; std::shared_ptr<impl::BaseCursorProvider> provider;
if (settings_.numCacheCursorsFromDiff != 0) { if (settings_.numCacheCursorsFromDiff != 0) {
LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_diff=" LOG(log_.info()) << "Loading cache with cursor from num_cursors_from_diff="
@@ -157,36 +149,6 @@ public:
if (loader_ != nullptr) if (loader_ != nullptr)
loader_->wait(); loader_->wait();
} }
private:
bool
loadCacheFromFile()
{
if (not settings_.cacheFileSettings.has_value()) {
return false;
}
LOG(log_.info()) << "Loading ledger cache from " << settings_.cacheFileSettings->path;
auto const minLatestSequence =
backend_->fetchLedgerRange()
.transform([this](data::LedgerRange const& range) {
return std::max(range.maxSequence - settings_.cacheFileSettings->maxAge, range.minSequence);
})
.value_or(0);
auto const [success, duration_ms] = util::timed([&]() {
return cache_.get().loadFromFile(settings_.cacheFileSettings->path, minLatestSequence);
});
if (not success.has_value()) {
LOG(log_.warn()) << "Error loading cache from file: " << success.error();
return false;
}
LOG(log_.info()) << "Loaded cache from file in " << duration_ms
<< " ms. Latest sequence: " << cache_.get().latestLedgerSequence();
backend_->forceUpdateRange(cache_.get().latestLedgerSequence());
return true;
}
}; };
} // namespace etl } // namespace etl

View File

@@ -26,7 +26,6 @@
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <string> #include <string>
#include <utility>
namespace etl { namespace etl {
@@ -64,12 +63,6 @@ makeCacheLoaderSettings(util::config::ClioConfigDefinition const& config)
settings.numCacheMarkers = cache.get<std::size_t>("num_markers"); settings.numCacheMarkers = cache.get<std::size_t>("num_markers");
settings.cachePageFetchSize = cache.get<std::size_t>("page_fetch_size"); settings.cachePageFetchSize = cache.get<std::size_t>("page_fetch_size");
if (auto filePath = cache.maybeValue<std::string>("file.path"); filePath.has_value()) {
settings.cacheFileSettings = CacheLoaderSettings::CacheFileSettings{
.path = std::move(filePath).value(), .maxAge = cache.get<uint32_t>("file.max_sequence_age")
};
}
auto const entry = cache.get<std::string>("load"); auto const entry = cache.get<std::string>("load");
if (boost::iequals(entry, "sync")) if (boost::iequals(entry, "sync"))
settings.loadStyle = CacheLoaderSettings::LoadStyle::SYNC; settings.loadStyle = CacheLoaderSettings::LoadStyle::SYNC;

View File

@@ -22,9 +22,6 @@
#include "util/config/ConfigDefinition.hpp" #include "util/config/ConfigDefinition.hpp"
#include <cstddef> #include <cstddef>
#include <cstdint>
#include <optional>
#include <string>
namespace etl { namespace etl {
@@ -35,15 +32,6 @@ struct CacheLoaderSettings {
/** @brief Ways to load the cache */ /** @brief Ways to load the cache */
enum class LoadStyle { ASYNC, SYNC, NONE }; enum class LoadStyle { ASYNC, SYNC, NONE };
/** @brief Settings for cache file operations */
struct CacheFileSettings {
std::string path; /**< path to the file to load cache from on start and save cache to on shutdown */
uint32_t maxAge = 5000; /**< max difference between latest sequence in cache file and DB */
auto
operator<=>(CacheFileSettings const&) const = default;
};
size_t numCacheDiffs = 32; /**< number of diffs to use to generate cursors */ size_t numCacheDiffs = 32; /**< number of diffs to use to generate cursors */
size_t numCacheMarkers = 48; /**< number of markers to use at one time to traverse the ledger */ size_t numCacheMarkers = 48; /**< number of markers to use at one time to traverse the ledger */
size_t cachePageFetchSize = 512; /**< number of ledger objects to fetch concurrently per marker */ size_t cachePageFetchSize = 512; /**< number of ledger objects to fetch concurrently per marker */
@@ -52,7 +40,6 @@ struct CacheLoaderSettings {
size_t numCacheCursorsFromAccount = 0; /**< number of cursors to fetch from account_tx */ size_t numCacheCursorsFromAccount = 0; /**< number of cursors to fetch from account_tx */
LoadStyle loadStyle = LoadStyle::ASYNC; /**< how to load the cache */ LoadStyle loadStyle = LoadStyle::ASYNC; /**< how to load the cache */
std::optional<CacheFileSettings> cacheFileSettings; /**< optional settings for cache file operations */
auto auto
operator<=>(CacheLoaderSettings const&) const = default; operator<=>(CacheLoaderSettings const&) const = default;

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* /*
This file is part of clio: https://github.com/XRPLF/clio This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers. Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above
@@ -20,103 +20,101 @@
#include "etl/ETLService.hpp" #include "etl/ETLService.hpp"
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/CacheLoader.hpp" #include "etl/CacheLoader.hpp"
#include "etl/CacheLoaderInterface.hpp"
#include "etl/CacheUpdaterInterface.hpp"
#include "etl/CorruptionDetector.hpp" #include "etl/CorruptionDetector.hpp"
#include "etl/ETLServiceInterface.hpp" #include "etl/LoadBalancer.hpp"
#include "etl/ETLState.hpp"
#include "etl/ExtractorInterface.hpp"
#include "etl/InitialLoadObserverInterface.hpp"
#include "etl/LedgerPublisherInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/LoaderInterface.hpp"
#include "etl/MonitorInterface.hpp"
#include "etl/MonitorProviderInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "etl/TaskManagerProviderInterface.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp" #include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/CacheUpdater.hpp" #include "etl/impl/ExtractionDataPipe.hpp"
#include "etl/impl/Extraction.hpp" #include "etl/impl/Extractor.hpp"
#include "etl/impl/LedgerFetcher.hpp" #include "etl/impl/LedgerFetcher.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/LedgerPublisher.hpp"
#include "etl/impl/Loading.hpp" #include "etl/impl/Transformer.hpp"
#include "etl/impl/MonitorProvider.hpp" #include "etlng/ETLService.hpp"
#include "etl/impl/Registry.hpp" #include "etlng/ETLServiceInterface.hpp"
#include "etl/impl/Scheduling.hpp" #include "etlng/LoadBalancer.hpp"
#include "etl/impl/TaskManager.hpp" #include "etlng/LoadBalancerInterface.hpp"
#include "etl/impl/TaskManagerProvider.hpp" #include "etlng/impl/LedgerPublisher.hpp"
#include "etl/impl/ext/Cache.hpp" #include "etlng/impl/MonitorProvider.hpp"
#include "etl/impl/ext/Core.hpp" #include "etlng/impl/TaskManagerProvider.hpp"
#include "etl/impl/ext/MPT.hpp" #include "etlng/impl/ext/Cache.hpp"
#include "etl/impl/ext/NFT.hpp" #include "etlng/impl/ext/Core.hpp"
#include "etl/impl/ext/Successor.hpp" #include "etlng/impl/ext/MPT.hpp"
#include "etlng/impl/ext/NFT.hpp"
#include "etlng/impl/ext/Successor.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/Profiler.hpp" #include "util/Constants.hpp"
#include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyExecutionContext.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/json/object.hpp> #include <boost/asio/io_context.hpp>
#include <boost/signals2/connection.hpp> #include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/protocol/LedgerHeader.h> #include <xrpl/protocol/LedgerHeader.h>
#include <chrono> #include <chrono>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <functional>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <stdexcept>
#include <thread>
#include <utility> #include <utility>
#include <vector>
namespace etl { namespace etl {
std::shared_ptr<ETLServiceInterface> std::shared_ptr<etlng::ETLServiceInterface>
ETLService::makeETLService( ETLService::makeETLService(
util::config::ClioConfigDefinition const& config, util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
util::async::AnyExecutionContext ctx, util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<LoadBalancerInterface> balancer, std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
) )
{ {
std::shared_ptr<ETLServiceInterface> ret; std::shared_ptr<etlng::ETLServiceInterface> ret;
auto state = std::make_shared<SystemState>(); if (config.get<bool>("__ng_etl")) {
ASSERT(
std::dynamic_pointer_cast<etlng::LoadBalancer>(balancer), "LoadBalancer type must be etlng::LoadBalancer"
);
auto state = std::make_shared<etl::SystemState>();
state->isStrictReadonly = config.get<bool>("read_only"); state->isStrictReadonly = config.get<bool>("read_only");
auto fetcher = std::make_shared<impl::LedgerFetcher>(backend, balancer); auto fetcher = std::make_shared<etl::impl::LedgerFetcher>(backend, balancer);
auto extractor = std::make_shared<impl::Extractor>(fetcher); auto extractor = std::make_shared<etlng::impl::Extractor>(fetcher);
auto publisher = std::make_shared<impl::LedgerPublisher>(ctx, backend, subscriptions, *state); auto publisher = std::make_shared<etlng::impl::LedgerPublisher>(ioc, backend, subscriptions, *state);
auto cacheLoader = std::make_shared<CacheLoader<>>(config, backend, backend->cache()); auto cacheLoader = std::make_shared<etl::CacheLoader<>>(config, backend, backend->cache());
auto cacheUpdater = std::make_shared<impl::CacheUpdater>(backend->cache()); auto cacheUpdater = std::make_shared<etlng::impl::CacheUpdater>(backend->cache());
auto amendmentBlockHandler = std::make_shared<impl::AmendmentBlockHandler>(ctx, *state); auto amendmentBlockHandler = std::make_shared<etlng::impl::AmendmentBlockHandler>(ctx, *state);
auto monitorProvider = std::make_shared<impl::MonitorProvider>(); auto monitorProvider = std::make_shared<etlng::impl::MonitorProvider>();
backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()}); backend->setCorruptionDetector(CorruptionDetector{*state, backend->cache()});
auto loader = std::make_shared<impl::Loader>( auto loader = std::make_shared<etlng::impl::Loader>(
backend, backend,
impl::makeRegistry( etlng::impl::makeRegistry(
*state, *state,
impl::CacheExt{cacheUpdater}, etlng::impl::CacheExt{cacheUpdater},
impl::CoreExt{backend}, etlng::impl::CoreExt{backend},
impl::SuccessorExt{backend, backend->cache()}, etlng::impl::SuccessorExt{backend, backend->cache()},
impl::NFTExt{backend}, etlng::impl::NFTExt{backend},
impl::MPTExt{backend} etlng::impl::MPTExt{backend}
), ),
amendmentBlockHandler, amendmentBlockHandler,
state state
); );
auto taskManagerProvider = std::make_shared<impl::TaskManagerProvider>(*ledgers, extractor, loader); auto taskManagerProvider = std::make_shared<etlng::impl::TaskManagerProvider>(*ledgers, extractor, loader);
ret = std::make_shared<ETLService>( ret = std::make_shared<etlng::ETLService>(
ctx, ctx,
config, config,
backend, backend,
@@ -132,280 +130,261 @@ ETLService::makeETLService(
monitorProvider, monitorProvider,
state state
); );
} else {
ASSERT(std::dynamic_pointer_cast<etl::LoadBalancer>(balancer), "LoadBalancer type must be etl::LoadBalancer");
ret = std::make_shared<etl::ETLService>(config, ioc, backend, subscriptions, balancer, ledgers);
}
// inject networkID into subscriptions, as transaction feed require it to inject CTID in response // inject networkID into subscriptions, as transaction feed require it to inject CTID in response
if (auto const etlState = ret->getETLState(); etlState) if (auto const state = ret->getETLState(); state)
subscriptions->setNetworkID(etlState->networkID); subscriptions->setNetworkID(state->networkID);
ret->run(); ret->run();
return ret; return ret;
} }
ETLService::ETLService( // Database must be populated when this starts
util::async::AnyExecutionContext ctx, std::optional<uint32_t>
std::reference_wrapper<util::config::ClioConfigDefinition const> config, ETLService::runETLPipeline(uint32_t startSequence, uint32_t numExtractors)
std::shared_ptr<data::BackendInterface> backend,
std::shared_ptr<LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<MonitorProviderInterface> monitorProvider,
std::shared_ptr<SystemState> state
)
: ctx_(std::move(ctx))
, config_(config)
, backend_(std::move(backend))
, balancer_(std::move(balancer))
, ledgers_(std::move(ledgers))
, publisher_(std::move(publisher))
, cacheLoader_(std::move(cacheLoader))
, cacheUpdater_(std::move(cacheUpdater))
, extractor_(std::move(extractor))
, loader_(std::move(loader))
, initialLoadObserver_(std::move(initialLoadObserver))
, taskManagerProvider_(std::move(taskManagerProvider))
, monitorProvider_(std::move(monitorProvider))
, state_(std::move(state))
, startSequence_(config.get().maybeValue<uint32_t>("start_sequence"))
, finishSequence_(config.get().maybeValue<uint32_t>("finish_sequence"))
{ {
ASSERT(not state_->isWriting, "ETL should never start in writer mode"); if (finishSequence_ && startSequence > *finishSequence_)
return {};
if (startSequence_.has_value()) LOG(log_.debug()) << "Wait for cache containing seq " << startSequence - 1
LOG(log_.info()) << "Start sequence: " << *startSequence_; << " current cache last seq =" << backend_->cache().latestLedgerSequence();
backend_->cache().waitUntilCacheContainsSeq(startSequence - 1);
if (finishSequence_.has_value()) LOG(log_.debug()) << "Starting etl pipeline";
LOG(log_.info()) << "Finish sequence: " << *finishSequence_; state_.isWriting = true;
LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE"); auto const rng = backend_->hardFetchLedgerRangeNoThrow();
ASSERT(rng.has_value(), "Parent ledger range can't be null");
ASSERT(
rng->maxSequence >= startSequence - 1,
"Got not parent ledger. rnd->maxSequence = {}, startSequence = {}",
rng->maxSequence,
startSequence
);
auto const begin = std::chrono::system_clock::now();
auto extractors = std::vector<std::unique_ptr<ExtractorType>>{};
auto pipe = DataPipeType{numExtractors, startSequence};
for (auto i = 0u; i < numExtractors; ++i) {
extractors.push_back(
std::make_unique<ExtractorType>(
pipe, networkValidatedLedgers_, ledgerFetcher_, startSequence + i, finishSequence_, state_
)
);
}
auto transformer =
TransformerType{pipe, backend_, ledgerLoader_, ledgerPublisher_, amendmentBlockHandler_, startSequence, state_};
transformer.waitTillFinished(); // suspend current thread until exit condition is met
pipe.cleanup(); // TODO: this should probably happen automatically using destructor
// wait for all of the extractors to stop
for (auto& t : extractors)
t->waitTillFinished();
auto const end = std::chrono::system_clock::now();
auto const lastPublishedSeq = ledgerPublisher_.getLastPublishedSequence();
static constexpr auto kNANOSECONDS_PER_SECOND = 1'000'000'000.0;
LOG(log_.debug()) << "Extracted and wrote " << lastPublishedSeq.value_or(startSequence) - startSequence << " in "
<< ((end - begin).count()) / kNANOSECONDS_PER_SECOND;
state_.isWriting = false;
LOG(log_.debug()) << "Stopping etl pipeline";
return lastPublishedSeq;
} }
ETLService::~ETLService() // Main loop of ETL.
// The software begins monitoring the ledgers that are validated by the network.
// The member networkValidatedLedgers_ keeps track of the sequences of ledgers validated by the network.
// Whenever a ledger is validated by the network, the software looks for that ledger in the database. Once the ledger is
// found in the database, the software publishes that ledger to the ledgers stream. If a network validated ledger is not
// found in the database after a certain amount of time, then the software attempts to take over responsibility of the
// ETL process, where it writes new ledgers to the database. The software will relinquish control of the ETL process if
// it detects that another process has taken over ETL.
void
ETLService::monitor()
{ {
stop(); auto rng = backend_->hardFetchLedgerRangeNoThrow();
LOG(log_.debug()) << "Destroying ETL"; if (!rng) {
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
std::optional<ripple::LedgerHeader> ledger;
try {
if (startSequence_) {
LOG(log_.info()) << "ledger sequence specified in config. "
<< "Will begin ETL process starting with ledger " << *startSequence_;
ledger = ledgerLoader_.loadInitialLedger(*startSequence_);
} else {
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> mostRecentValidated = networkValidatedLedgers_->getMostRecent();
if (mostRecentValidated) {
LOG(log_.info()) << "Ledger " << *mostRecentValidated << " has been validated. Downloading...";
ledger = ledgerLoader_.loadInitialLedger(*mostRecentValidated);
} else {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return;
}
}
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to load initial ledger: " << e.what();
amendmentBlockHandler_.notifyAmendmentBlocked();
return;
}
if (ledger) {
rng = backend_->hardFetchLedgerRangeNoThrow();
} else {
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
return;
}
} else {
if (startSequence_)
LOG(log_.warn()) << "start sequence specified but db is already populated";
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
cacheLoader_.load(rng->maxSequence);
}
ASSERT(rng.has_value(), "Ledger range can't be null");
uint32_t nextSequence = rng->maxSequence + 1;
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
while (not isStopping()) {
nextSequence = publishNextSequence(nextSequence);
}
}
uint32_t
ETLService::publishNextSequence(uint32_t nextSequence)
{
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng && rng->maxSequence >= nextSequence) {
ledgerPublisher_.publish(nextSequence, {});
++nextSequence;
} else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence, util::kMILLISECONDS_PER_SECOND)) {
LOG(log_.info()) << "Ledger with sequence = " << nextSequence << " has been validated by the network. "
<< "Attempting to find in database and publish";
// Attempt to take over responsibility of ETL writer after 10 failed
// attempts to publish the ledger. publishLedger() fails if the
// ledger that has been validated by the network is not found in the
// database after the specified number of attempts. publishLedger()
// waits one second between each attempt to read the ledger from the
// database
constexpr size_t kTIMEOUT_SECONDS = 10;
bool const success = ledgerPublisher_.publish(nextSequence, kTIMEOUT_SECONDS);
if (!success) {
LOG(log_.warn()) << "Failed to publish ledger with sequence = " << nextSequence << " . Beginning ETL";
// returns the most recent sequence published. empty optional if no sequence was published
std::optional<uint32_t> lastPublished = runETLPipeline(nextSequence, extractorThreads_);
LOG(log_.info()) << "Aborting ETL. Falling back to publishing";
// if no ledger was published, don't increment nextSequence
if (lastPublished)
nextSequence = *lastPublished + 1;
} else {
++nextSequence;
}
}
return nextSequence;
}
void
ETLService::monitorReadOnly()
{
LOG(log_.debug()) << "Starting reporting in strict read only mode";
auto const latestSequenceOpt = [this]() -> std::optional<uint32_t> {
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (!rng) {
if (auto net = networkValidatedLedgers_->getMostRecent()) {
return net;
}
return std::nullopt;
}
return rng->maxSequence;
}();
if (!latestSequenceOpt.has_value()) {
return;
}
uint32_t latestSequence = *latestSequenceOpt;
cacheLoader_.load(latestSequence);
latestSequence++;
while (not isStopping()) {
if (auto rng = backend_->hardFetchLedgerRangeNoThrow(); rng && rng->maxSequence >= latestSequence) {
ledgerPublisher_.publish(latestSequence, {});
latestSequence = latestSequence + 1;
} else {
// if we can't, wait until it's validated by the network, or 1 second passes, whichever occurs
// first. Even if we don't hear from rippled, if ledgers are being written to the db, we publish
// them.
networkValidatedLedgers_->waitUntilValidatedByNetwork(latestSequence, util::kMILLISECONDS_PER_SECOND);
}
}
} }
void void
ETLService::run() ETLService::run()
{ {
LOG(log_.info()) << "Running ETL..."; LOG(log_.info()) << "Starting reporting etl";
state_.isStopping = false;
mainLoop_.emplace(ctx_.execute([this] { doWork();
auto const rng = loadInitialLedgerIfNeeded();
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
if (not mostRecentValidated) {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return;
}
if (not rng.has_value()) {
LOG(log_.warn()) << "Initial ledger download got cancelled - stopping ETL service";
return;
}
auto nextSequence = rng->maxSequence + 1;
if (backend_->cache().latestLedgerSequence() != 0) {
nextSequence = backend_->cache().latestLedgerSequence();
}
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
startMonitor(nextSequence);
// If we are a writer as the result of loading the initial ledger - start loading
if (state_->isWriting)
startLoading(nextSequence);
}));
} }
void void
ETLService::stop() ETLService::doWork()
{ {
LOG(log_.info()) << "Stop called"; worker_ = std::thread([this]() {
beast::setCurrentThreadName("ETLService worker");
if (mainLoop_) if (state_.isStrictReadonly) {
mainLoop_->wait(); monitorReadOnly();
if (taskMan_) } else {
taskMan_->stop(); monitor();
if (monitor_)
monitor_->stop();
}
boost::json::object
ETLService::getInfo() const
{
boost::json::object result;
result["etl_sources"] = balancer_->toJson();
result["is_writer"] = static_cast<int>(state_->isWriting);
result["read_only"] = static_cast<int>(state_->isStrictReadonly);
auto last = publisher_->getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds());
return result;
}
bool
ETLService::isAmendmentBlocked() const
{
return state_->isAmendmentBlocked;
}
bool
ETLService::isCorruptionDetected() const
{
return state_->isCorruptionDetected;
}
std::optional<ETLState>
ETLService::getETLState() const
{
return balancer_->getETLState();
}
std::uint32_t
ETLService::lastCloseAgeSeconds() const
{
return publisher_->lastCloseAgeSeconds();
}
std::optional<data::LedgerRange>
ETLService::loadInitialLedgerIfNeeded()
{
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (not rng.has_value()) {
ASSERT(
not state_->isStrictReadonly,
"Database is empty but this node is in strict readonly mode. Can't write initial ledger."
);
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
state_->isWriting = true; // immediately become writer as the db is empty
auto const getMostRecent = [this]() {
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
return ledgers_->getMostRecent();
};
if (auto const maybeSeq = startSequence_.or_else(getMostRecent); maybeSeq.has_value()) {
auto const seq = *maybeSeq;
LOG(log_.info()) << "Starting from sequence " << seq
<< ". Initial ledger download and extraction can take a while...";
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then(
[this, seq](auto&& data) -> std::optional<ripple::LedgerHeader> {
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
auto res = balancer_->loadInitialLedger(seq, *initialLoadObserver_);
if (not res.has_value() and res.error() == InitialLedgerLoadError::Cancelled) {
LOG(log_.debug()) << "Initial ledger load got cancelled";
return std::nullopt;
} }
ASSERT(res.has_value(), "Initial ledger retry logic failed");
data.edgeKeys = std::move(res).value();
return loader_->loadInitialLedger(data);
}
);
}); });
if (not ledger.has_value()) {
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
return std::nullopt;
}
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
return backend_->hardFetchLedgerRangeNoThrow();
}
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return std::nullopt;
}
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
if (not backend_->cache().isFull()) {
cacheLoader_->load(rng->maxSequence);
}
return rng;
} }
void ETLService::ETLService(
ETLService::startMonitor(uint32_t seq) util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
)
: backend_(backend)
, loadBalancer_(balancer)
, networkValidatedLedgers_(std::move(ledgers))
, cacheLoader_(config, backend, backend->cache())
, ledgerFetcher_(backend, balancer)
, ledgerLoader_(backend, balancer, ledgerFetcher_, state_)
, ledgerPublisher_(ioc, backend, backend->cache(), subscriptions, state_)
, amendmentBlockHandler_(ioc, state_)
{ {
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq); startSequence_ = config.maybeValue<uint32_t>("start_sequence");
finishSequence_ = config.maybeValue<uint32_t>("finish_sequence");
state_.isStrictReadonly = config.get<bool>("read_only");
extractorThreads_ = config.get<uint32_t>("extractor_threads");
monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) { // This should probably be done in the backend factory but we don't have state available until here
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq; backend_->setCorruptionDetector(CorruptionDetector{state_, backend->cache()});
if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
});
cacheUpdater_->update(seq, diff);
backend_->updateRange(seq);
}
publisher_->publish(seq, {});
});
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
attemptTakeoverWriter();
});
monitor_->run();
} }
void
ETLService::startLoading(uint32_t seq)
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq, finishSequence_);
// FIXME: this legacy name "extractor_threads" is no longer accurate (we have coroutines now)
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
}
void
ETLService::attemptTakeoverWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
auto rng = backend_->hardFetchLedgerRangeNoThrow();
ASSERT(rng.has_value(), "Ledger range can't be null");
state_->isWriting = true; // switch to writer
LOG(log_.info()) << "Taking over the ETL writer seat";
startLoading(rng->maxSequence + 1);
}
void
ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}
} // namespace etl } // namespace etl

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* /*
This file is part of clio: https://github.com/XRPLF/clio This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers. Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above
@@ -20,64 +20,57 @@
#pragma once #pragma once
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/Types.hpp" #include "etl/CacheLoader.hpp"
#include "etl/CacheLoaderInterface.hpp"
#include "etl/CacheUpdaterInterface.hpp"
#include "etl/ETLServiceInterface.hpp"
#include "etl/ETLState.hpp" #include "etl/ETLState.hpp"
#include "etl/ExtractorInterface.hpp"
#include "etl/InitialLoadObserverInterface.hpp"
#include "etl/LedgerPublisherInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/LoaderInterface.hpp"
#include "etl/MonitorInterface.hpp"
#include "etl/MonitorProviderInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "etl/TaskManagerInterface.hpp"
#include "etl/TaskManagerProviderInterface.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp" #include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/CacheUpdater.hpp" #include "etl/impl/ExtractionDataPipe.hpp"
#include "etl/impl/Extraction.hpp" #include "etl/impl/Extractor.hpp"
#include "etl/impl/LedgerFetcher.hpp" #include "etl/impl/LedgerFetcher.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "etl/impl/LedgerPublisher.hpp" #include "etl/impl/LedgerPublisher.hpp"
#include "etl/impl/Loading.hpp" #include "etl/impl/Transformer.hpp"
#include "etl/impl/Registry.hpp" #include "etlng/ETLServiceInterface.hpp"
#include "etl/impl/Scheduling.hpp" #include "etlng/LoadBalancerInterface.hpp"
#include "etl/impl/TaskManager.hpp" #include "etlng/impl/LedgerPublisher.hpp"
#include "etl/impl/ext/Cache.hpp" #include "etlng/impl/TaskManagerProvider.hpp"
#include "etl/impl/ext/Core.hpp"
#include "etl/impl/ext/NFT.hpp"
#include "etl/impl/ext/Successor.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp> #include <grpcpp/grpcpp.h>
#include <fmt/format.h> #include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/basics/Blob.h> #include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <functional>
#include <memory> #include <memory>
#include <optional> #include <optional>
#include <string> #include <string>
#include <thread>
struct AccountTransactionsData;
struct NFTTransactionsData;
struct NFTsData;
/**
* @brief This namespace contains everything to do with the ETL and ETL sources.
*/
namespace etl { namespace etl {
/**
* @brief A tag class to help identify ETLService in templated code.
*/
struct ETLServiceTag {
virtual ~ETLServiceTag() = default;
};
template <typename T>
concept SomeETLService = std::derived_from<T, ETLServiceTag>;
/** /**
* @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the * @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the
* databases. * databases.
@@ -91,42 +84,71 @@ namespace etl {
* the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring * the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring
* to writing and from writing to monitoring, based on the activity of other processes running on different machines. * to writing and from writing to monitoring, based on the activity of other processes running on different machines.
*/ */
class ETLService : public ETLServiceInterface { class ETLService : public etlng::ETLServiceInterface, ETLServiceTag {
// TODO: make these template parameters in ETLService
using DataPipeType = etl::impl::ExtractionDataPipe<org::xrpl::rpc::v1::GetLedgerResponse>;
using CacheLoaderType = etl::CacheLoader<>;
using LedgerFetcherType = etl::impl::LedgerFetcher;
using ExtractorType = etl::impl::Extractor<DataPipeType, LedgerFetcherType>;
using LedgerLoaderType = etl::impl::LedgerLoader<LedgerFetcherType>;
using LedgerPublisherType = etl::impl::LedgerPublisher;
using AmendmentBlockHandlerType = etl::impl::AmendmentBlockHandler;
using TransformerType =
etl::impl::Transformer<DataPipeType, LedgerLoaderType, LedgerPublisherType, AmendmentBlockHandlerType>;
util::Logger log_{"ETL"}; util::Logger log_{"ETL"};
util::async::AnyExecutionContext ctx_;
std::reference_wrapper<util::config::ClioConfigDefinition const> config_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<LoadBalancerInterface> balancer_; std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers_; std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
std::shared_ptr<LedgerPublisherInterface> publisher_;
std::shared_ptr<CacheLoaderInterface> cacheLoader_;
std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
std::shared_ptr<ExtractorInterface> extractor_;
std::shared_ptr<LoaderInterface> loader_;
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider_;
std::shared_ptr<MonitorProviderInterface> monitorProvider_;
std::shared_ptr<SystemState> state_;
std::uint32_t extractorThreads_ = 1;
std::thread worker_;
CacheLoaderType cacheLoader_;
LedgerFetcherType ledgerFetcher_;
LedgerLoaderType ledgerLoader_;
LedgerPublisherType ledgerPublisher_;
AmendmentBlockHandlerType amendmentBlockHandler_;
SystemState state_;
size_t numMarkers_ = 2;
std::optional<uint32_t> startSequence_; std::optional<uint32_t> startSequence_;
std::optional<uint32_t> finishSequence_; std::optional<uint32_t> finishSequence_;
std::unique_ptr<MonitorInterface> monitor_;
std::unique_ptr<TaskManagerInterface> taskMan_;
boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;
std::optional<util::async::AnyOperation<void>> mainLoop_;
public: public:
/**
* @brief Create an instance of ETLService.
*
* @param config The configuration to use
* @param ioc io context to run on
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager
* @param balancer Load balancer to use
* @param ledgers The network validated ledgers datastructure
*/
ETLService(
util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
);
/**
* @brief Move constructor is deleted because ETL service shares its fields by reference
*/
ETLService(ETLService&&) = delete;
/** /**
* @brief A factory function to spawn new ETLService instances. * @brief A factory function to spawn new ETLService instances.
* *
* Creates and runs the ETL service. * Creates and runs the ETL service.
* *
* @param config The configuration to use * @param config The configuration to use
* @param ioc io context to run on
* @param ctx Execution context for asynchronous operations * @param ctx Execution context for asynchronous operations
* @param backend BackendInterface implementation * @param backend BackendInterface implementation
* @param subscriptions Subscription manager * @param subscriptions Subscription manager
@@ -134,89 +156,182 @@ public:
* @param ledgers The network validated ledgers datastructure * @param ledgers The network validated ledgers datastructure
* @return A shared pointer to a new instance of ETLService * @return A shared pointer to a new instance of ETLService
*/ */
static std::shared_ptr<ETLServiceInterface> static std::shared_ptr<etlng::ETLServiceInterface>
makeETLService( makeETLService(
util::config::ClioConfigDefinition const& config, util::config::ClioConfigDefinition const& config,
boost::asio::io_context& ioc,
util::async::AnyExecutionContext ctx, util::async::AnyExecutionContext ctx,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<LoadBalancerInterface> balancer, std::shared_ptr<etlng::LoadBalancerInterface> balancer,
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers std::shared_ptr<NetworkValidatedLedgersInterface> ledgers
); );
/** /**
* @brief Create an instance of ETLService. * @brief Stops components and joins worker thread.
*
* @param ctx The execution context for asynchronous operations
* @param config The Clio configuration definition
* @param backend Interface to the backend database
* @param balancer Load balancer for distributing work
* @param ledgers Interface for accessing network validated ledgers
* @param publisher Interface for publishing ledger data
* @param cacheLoader Interface for loading cache data
* @param cacheUpdater Interface for updating cache data
* @param extractor The extractor to use
* @param loader Interface for loading data
* @param initialLoadObserver The observer for initial data loading
* @param taskManagerProvider The provider of the task manager instance
* @param monitorProvider The provider of the monitor instance
* @param state System state tracking object
*/ */
ETLService( ~ETLService() override
util::async::AnyExecutionContext ctx, {
std::reference_wrapper<util::config::ClioConfigDefinition const> config, if (not state_.isStopping)
std::shared_ptr<data::BackendInterface> backend, stop();
std::shared_ptr<LoadBalancerInterface> balancer, }
std::shared_ptr<NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<MonitorProviderInterface> monitorProvider,
std::shared_ptr<SystemState> state
);
~ETLService() override; /**
* @brief Stop the ETL service.
* @note This method blocks until the ETL service has stopped.
*/
void
stop() override
{
LOG(log_.info()) << "Stop called";
state_.isStopping = true;
cacheLoader_.stop();
if (worker_.joinable())
worker_.join();
LOG(log_.debug()) << "Joined ETLService worker thread";
}
/**
* @brief Get time passed since last ledger close, in seconds.
*
* @return Time passed since last ledger close
*/
std::uint32_t
lastCloseAgeSeconds() const override
{
return ledgerPublisher_.lastCloseAgeSeconds();
}
/**
* @brief Check for the amendment blocked state.
*
* @return true if currently amendment blocked; false otherwise
*/
bool
isAmendmentBlocked() const override
{
return state_.isAmendmentBlocked;
}
/**
* @brief Check whether Clio detected DB corruptions.
*
* @return true if corruption of DB was detected and cache was stopped.
*/
bool
isCorruptionDetected() const override
{
return state_.isCorruptionDetected;
}
/**
* @brief Get state of ETL as a JSON object
*
* @return The state of ETL as a JSON object
*/
boost::json::object
getInfo() const override
{
boost::json::object result;
result["etl_sources"] = loadBalancer_->toJson();
result["is_writer"] = static_cast<int>(state_.isWriting);
result["read_only"] = static_cast<int>(state_.isStrictReadonly);
auto last = ledgerPublisher_.getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(ledgerPublisher_.lastPublishAgeSeconds());
return result;
}
/**
* @brief Get the etl nodes' state
* @return The etl nodes' state, nullopt if etl nodes are not connected
*/
std::optional<etl::ETLState>
getETLState() const noexcept override
{
return loadBalancer_->getETLState();
}
/**
* @brief Start all components to run ETL service.
*/
void void
run() override; run() override;
void
stop() override;
boost::json::object
getInfo() const override;
bool
isAmendmentBlocked() const override;
bool
isCorruptionDetected() const override;
std::optional<ETLState>
getETLState() const override;
std::uint32_t
lastCloseAgeSeconds() const override;
private: private:
std::optional<data::LedgerRange> /**
loadInitialLedgerIfNeeded(); * @brief Run the ETL pipeline.
*
* Extracts ledgers and writes them to the database, until a write conflict occurs (or the server shuts down).
* @note database must already be populated when this function is called
*
* @param startSequence the first ledger to extract
* @param numExtractors number of extractors to use
* @return The last ledger written to the database, if any
*/
std::optional<uint32_t>
runETLPipeline(uint32_t startSequence, uint32_t numExtractors);
/**
* @brief Monitor the network for newly validated ledgers.
*
* Also monitor the database to see if any process is writing those ledgers.
* This function is called when the application starts, and will only return when the application is shutting down.
* If the software detects the database is empty, this function will call loadInitialLedger(). If the software
* detects ledgers are not being written, this function calls runETLPipeline(). Otherwise, this function publishes
* ledgers as they are written to the database.
*/
void void
startMonitor(uint32_t seq); monitor();
void /**
startLoading(uint32_t seq); * @brief Monitor the network for newly validated ledgers and publish them to the ledgers stream
*
* @param nextSequence the ledger sequence to publish
* @return The next ledger sequence to publish
*/
uint32_t
publishNextSequence(uint32_t nextSequence);
/**
* @brief Monitor the database for newly written ledgers.
*
* Similar to the monitor(), except this function will never call runETLPipeline() or loadInitialLedger().
* This function only publishes ledgers as they are written to the database.
*/
void void
attemptTakeoverWriter(); monitorReadOnly();
/**
* @return true if stopping; false otherwise
*/
bool
isStopping() const
{
return state_.isStopping;
}
/**
* @brief Get the number of markers to use during the initial ledger download.
*
* This is equivalent to the degree of parallelism during the initial ledger download.
*
* @return The number of markers
*/
std::uint32_t
getNumMarkers() const
{
return numMarkers_;
}
/**
* @brief Spawn the worker thread and start monitoring.
*/
void void
giveUpWriter(); doWork();
}; };
} // namespace etl } // namespace etl

View File

@@ -21,10 +21,9 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/ETLState.hpp" #include "etl/ETLState.hpp"
#include "etl/InitialLoadObserverInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
@@ -65,7 +64,7 @@ using util::prometheus::Labels;
namespace etl { namespace etl {
std::shared_ptr<LoadBalancerInterface> std::shared_ptr<etlng::LoadBalancerInterface>
LoadBalancer::makeLoadBalancer( LoadBalancer::makeLoadBalancer(
ClioConfigDefinition const& config, ClioConfigDefinition const& config,
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
@@ -159,6 +158,7 @@ LoadBalancer::LoadBalancer(
auto source = sourceFactory( auto source = sourceFactory(
*it, *it,
ioc, ioc,
backend,
subscriptions, subscriptions,
validatedLedgers, validatedLedgers,
forwardingTimeout, forwardingTimeout,
@@ -212,32 +212,26 @@ LoadBalancer::LoadBalancer(
} }
} }
InitialLedgerLoadResult std::vector<std::string>
LoadBalancer::loadInitialLedger( LoadBalancer::loadInitialLedger(uint32_t sequence, std::chrono::steady_clock::duration retryAfter)
uint32_t sequence,
InitialLoadObserverInterface& loadObserver,
std::chrono::steady_clock::duration retryAfter
)
{ {
InitialLedgerLoadResult response; std::vector<std::string> response;
execute( execute(
[this, &response, &sequence, &loadObserver](auto& source) { [this, &response, &sequence](auto& source) {
auto res = source->loadInitialLedger(sequence, downloadRanges_, loadObserver); auto [data, res] = source->loadInitialLedger(sequence, downloadRanges_);
if (not res.has_value() and res.error() == InitialLedgerLoadError::Errored) { if (!res) {
LOG(log_.error()) << "Failed to download initial ledger." LOG(log_.error()) << "Failed to download initial ledger."
<< " Sequence = " << sequence << " source = " << source->toString(); << " Sequence = " << sequence << " source = " << source->toString();
return false; // should retry on error } else {
response = std::move(data);
} }
response = std::move(res); // cancelled or data received return res;
return true;
}, },
sequence, sequence,
retryAfter retryAfter
); );
return response; return response;
} }

View File

@@ -21,12 +21,13 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/ETLState.hpp" #include "etl/ETLState.hpp"
#include "etl/InitialLoadObserverInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
#include "util/Assert.hpp"
#include "util/Mutex.hpp" #include "util/Mutex.hpp"
#include "util/Random.hpp" #include "util/Random.hpp"
#include "util/ResponseExpirationCache.hpp" #include "util/ResponseExpirationCache.hpp"
@@ -53,6 +54,7 @@
#include <optional> #include <optional>
#include <string> #include <string>
#include <string_view> #include <string_view>
#include <utility>
#include <vector> #include <vector>
namespace etl { namespace etl {
@@ -74,7 +76,7 @@ concept SomeLoadBalancer = std::derived_from<T, LoadBalancerTag>;
* which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also * which ledgers have been validated by the network, and the range of ledgers each etl source has). This class also
* allows requests for ledger data to be load balanced across all possible ETL sources. * allows requests for ledger data to be load balanced across all possible ETL sources.
*/ */
class LoadBalancer : public LoadBalancerInterface, LoadBalancerTag { class LoadBalancer : public etlng::LoadBalancerInterface, LoadBalancerTag {
public: public:
using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject; using RawLedgerObjectType = org::xrpl::rpc::v1::RawLedgerObject;
using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse; using GetLedgerResponseType = org::xrpl::rpc::v1::GetLedgerResponse;
@@ -147,7 +149,7 @@ public:
* @param backend BackendInterface implementation * @param backend BackendInterface implementation
* @param subscriptions Subscription manager * @param subscriptions Subscription manager
* @param randomGenerator A random generator to use for selecting sources * @param randomGenerator A random generator to use for selecting sources
* @param validatedLedgers The network validated ledgers datastructure * @param validatedLedgers The network validated ledgers data structure
* @param sourceFactory A factory function to create a source * @param sourceFactory A factory function to create a source
* @return A shared pointer to a new instance of LoadBalancer * @return A shared pointer to a new instance of LoadBalancer
*/ */
@@ -162,6 +164,20 @@ public:
SourceFactory sourceFactory = makeSource SourceFactory sourceFactory = makeSource
); );
/**
* @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded.
*
* @param sequence Sequence of ledger to download
* @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::vector<std::string> The ledger data
*/
std::vector<std::string>
loadInitialLedger(
uint32_t sequence,
std::chrono::steady_clock::duration retryAfter = std::chrono::seconds{2}
) override;
/** /**
* @brief Load the initial ledger, writing data to the queue. * @brief Load the initial ledger, writing data to the queue.
* @note This function will retry indefinitely until the ledger is downloaded or the download is cancelled. * @note This function will retry indefinitely until the ledger is downloaded or the download is cancelled.
@@ -171,12 +187,16 @@ public:
* @param retryAfter Time to wait between retries (2 seconds by default) * @param retryAfter Time to wait between retries (2 seconds by default)
* @return A std::expected with ledger edge keys on success, or InitialLedgerLoadError on failure * @return A std::expected with ledger edge keys on success, or InitialLedgerLoadError on failure
*/ */
InitialLedgerLoadResult etlng::InitialLedgerLoadResult
loadInitialLedger( loadInitialLedger(
uint32_t sequence, [[maybe_unused]] uint32_t sequence,
InitialLoadObserverInterface& observer, [[maybe_unused]] etlng::InitialLoadObserverInterface& observer,
std::chrono::steady_clock::duration retryAfter [[maybe_unused]] std::chrono::steady_clock::duration retryAfter
) override; ) override
{
ASSERT(false, "Not available for old ETL");
std::unreachable();
}
/** /**
* @brief Fetch data for a specific ledger. * @brief Fetch data for a specific ledger.

View File

@@ -6,7 +6,7 @@ A single Clio node has one or more ETL sources specified in the config file. Cli
Upon receiving a message on the stream, Clio fetches the data associated with the newly validated ledger from one of the ETL sources. The fetch is performed via a gRPC request called `GetLedger`. This request returns the ledger header, transactions and metadata blobs, and every ledger object added/modified/deleted as part of this ledger. The ETL subsystem then writes all of this data to the databases, and moves on to the next ledger. Upon receiving a message on the stream, Clio fetches the data associated with the newly validated ledger from one of the ETL sources. The fetch is performed via a gRPC request called `GetLedger`. This request returns the ledger header, transactions and metadata blobs, and every ledger object added/modified/deleted as part of this ledger. The ETL subsystem then writes all of this data to the databases, and moves on to the next ledger.
If the database is not empty, clio will first come up in a "soft" read-only mode. In read-only mode, the server does not perform ETL and simply publishes new ledgers as they are written to the database. If the database is not updated within a certain time period (currently hard coded at 10 seconds), clio will begin the ETL process and start writing to the database. The database will report an error when trying to write a record with a key that already exists. ETL uses this error to determine that another process is writing to the database, and subsequently falls back to a soft read-only mode. clio can also operate in strict read-only mode, in which case they will never write to the database. If the database is not empty, clio will first come up in a "soft" read-only mode. In read-only mode, the server does not perform ETL and simply publishes new ledgers as they are written to the database. If the database is not updated within a certain time period (currently hard coded at 20 seconds), clio will begin the ETL process and start writing to the database. The database will report an error when trying to write a record with a key that already exists. ETL uses this error to determine that another process is writing to the database, and subsequently falls back to a soft read-only mode. clio can also operate in strict read-only mode, in which case they will never write to the database.
## Ledger cache ## Ledger cache

View File

@@ -19,6 +19,7 @@
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "data/BackendInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/impl/ForwardingSource.hpp" #include "etl/impl/ForwardingSource.hpp"
#include "etl/impl/GrpcSource.hpp" #include "etl/impl/GrpcSource.hpp"
@@ -40,6 +41,7 @@ SourcePtr
makeSource( makeSource(
util::config::ObjectView const& config, util::config::ObjectView const& config,
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers, std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout, std::chrono::steady_clock::duration forwardingTimeout,
@@ -53,7 +55,7 @@ makeSource(
auto const grpcPort = config.get<std::string>("grpc_port"); auto const grpcPort = config.get<std::string>("grpc_port");
impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout}; impl::ForwardingSource forwardingSource{ip, wsPort, forwardingTimeout};
impl::GrpcSource grpcSource{ip, grpcPort}; impl::GrpcSource grpcSource{ip, grpcPort, std::move(backend)};
auto subscriptionSource = std::make_unique<impl::SubscriptionSource>( auto subscriptionSource = std::make_unique<impl::SubscriptionSource>(
ioc, ioc,
ip, ip,

View File

@@ -19,8 +19,7 @@
#pragma once #pragma once
#include "etl/InitialLoadObserverInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp" #include "etl/NetworkValidatedLedgersInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "rpc/Errors.hpp" #include "rpc/Errors.hpp"
@@ -48,6 +47,7 @@ namespace etl {
/** /**
* @brief Provides an implementation of a ETL source * @brief Provides an implementation of a ETL source
*
*/ */
class SourceBase { class SourceBase {
public: public:
@@ -77,7 +77,7 @@ public:
* *
* @return true if source is connected; false otherwise * @return true if source is connected; false otherwise
*/ */
[[nodiscard]] virtual bool virtual bool
isConnected() const = 0; isConnected() const = 0;
/** /**
@@ -93,11 +93,11 @@ public:
* *
* @return JSON representation of the source * @return JSON representation of the source
*/ */
[[nodiscard]] virtual boost::json::object virtual boost::json::object
toJson() const = 0; toJson() const = 0;
/** @return String representation of the source (for debug) */ /** @return String representation of the source (for debug) */
[[nodiscard]] virtual std::string virtual std::string
toString() const = 0; toString() const = 0;
/** /**
@@ -106,7 +106,7 @@ public:
* @param sequence The ledger sequence to check * @param sequence The ledger sequence to check
* @return true if ledger is in the range of this source; false otherwise * @return true if ledger is in the range of this source; false otherwise
*/ */
[[nodiscard]] virtual bool virtual bool
hasLedger(uint32_t sequence) const = 0; hasLedger(uint32_t sequence) const = 0;
/** /**
@@ -120,7 +120,7 @@ public:
* @param getObjectNeighbors Whether to request object neighbors; defaults to false * @param getObjectNeighbors Whether to request object neighbors; defaults to false
* @return A std::pair of the response status and the response itself * @return A std::pair of the response status and the response itself
*/ */
[[nodiscard]] virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse> virtual std::pair<grpc::Status, org::xrpl::rpc::v1::GetLedgerResponse>
fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0; fetchLedger(uint32_t sequence, bool getObjects = true, bool getObjectNeighbors = false) = 0;
/** /**
@@ -128,11 +128,10 @@ public:
* *
* @param sequence Sequence of the ledger to download * @param sequence Sequence of the ledger to download
* @param numMarkers Number of markers to generate for async calls * @param numMarkers Number of markers to generate for async calls
* @param loader InitialLoadObserverInterface implementation
* @return A std::pair of the data and a bool indicating whether the download was successful * @return A std::pair of the data and a bool indicating whether the download was successful
*/ */
virtual InitialLedgerLoadResult virtual std::pair<std::vector<std::string>, bool>
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, InitialLoadObserverInterface& loader) = 0; loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) = 0;
/** /**
* @brief Forward a request to rippled. * @brief Forward a request to rippled.
@@ -143,7 +142,7 @@ public:
* @param yield The coroutine context * @param yield The coroutine context
* @return Response on success or error on failure * @return Response on success or error on failure
*/ */
[[nodiscard]] virtual std::expected<boost::json::object, rpc::ClioError> virtual std::expected<boost::json::object, rpc::ClioError>
forwardToRippled( forwardToRippled(
boost::json::object const& request, boost::json::object const& request,
std::optional<std::string> const& forwardToRippledClientIp, std::optional<std::string> const& forwardToRippledClientIp,
@@ -157,6 +156,7 @@ using SourcePtr = std::unique_ptr<SourceBase>;
using SourceFactory = std::function<SourcePtr( using SourceFactory = std::function<SourcePtr(
util::config::ObjectView const& config, util::config::ObjectView const& config,
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers, std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout, std::chrono::steady_clock::duration forwardingTimeout,
@@ -170,6 +170,7 @@ using SourceFactory = std::function<SourcePtr(
* *
* @param config The configuration to use * @param config The configuration to use
* @param ioc The io_context to run on * @param ioc The io_context to run on
* @param backend BackendInterface implementation
* @param subscriptions Subscription manager * @param subscriptions Subscription manager
* @param validatedLedgers The network validated ledgers data structure * @param validatedLedgers The network validated ledgers data structure
* @param forwardingTimeout The timeout for forwarding to rippled * @param forwardingTimeout The timeout for forwarding to rippled
@@ -179,10 +180,11 @@ using SourceFactory = std::function<SourcePtr(
* as forwarding. * as forwarding.
* @return The created source * @return The created source
*/ */
[[nodiscard]] SourcePtr SourcePtr
makeSource( makeSource(
util::config::ObjectView const& config, util::config::ObjectView const& config,
boost::asio::io_context& ioc, boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers, std::shared_ptr<NetworkValidatedLedgersInterface> validatedLedgers,
std::chrono::steady_clock::duration forwardingTimeout, std::chrono::steady_clock::duration forwardingTimeout,

View File

@@ -20,12 +20,12 @@
#include "etl/impl/AmendmentBlockHandler.hpp" #include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <optional>
#include <utility> #include <utility>
namespace etl::impl { namespace etl::impl {
@@ -37,35 +37,20 @@ AmendmentBlockHandler::ActionType const AmendmentBlockHandler::kDEFAULT_AMENDMEN
}; };
AmendmentBlockHandler::AmendmentBlockHandler( AmendmentBlockHandler::AmendmentBlockHandler(
util::async::AnyExecutionContext ctx, boost::asio::io_context& ioc,
SystemState& state, SystemState& state,
std::chrono::steady_clock::duration interval, std::chrono::steady_clock::duration interval,
ActionType action ActionType action
) )
: state_{std::ref(state)}, interval_{interval}, ctx_{std::move(ctx)}, action_{std::move(action)} : state_{std::ref(state)}, repeat_{ioc}, interval_{interval}, action_{std::move(action)}
{ {
} }
AmendmentBlockHandler::~AmendmentBlockHandler()
{
stop();
}
void void
AmendmentBlockHandler::notifyAmendmentBlocked() AmendmentBlockHandler::notifyAmendmentBlocked()
{ {
state_.get().isAmendmentBlocked = true; state_.get().isAmendmentBlocked = true;
if (not operation_.has_value()) repeat_.start(interval_, action_);
operation_.emplace(ctx_.executeRepeatedly(interval_, action_));
}
void
AmendmentBlockHandler::stop()
{
if (operation_.has_value()) {
operation_->abort();
operation_.reset();
}
} }
} // namespace etl::impl } // namespace etl::impl

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* /*
This file is part of clio: https://github.com/XRPLF/clio This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2024, the clio developers. Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above
@@ -19,10 +19,8 @@
#pragma once #pragma once
#include "etl/AmendmentBlockHandlerInterface.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "util/async/AnyExecutionContext.hpp" #include "util/Repeat.hpp"
#include "util/async/AnyOperation.hpp"
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
@@ -30,19 +28,17 @@
#include <chrono> #include <chrono>
#include <functional> #include <functional>
#include <optional>
namespace etl::impl { namespace etl::impl {
class AmendmentBlockHandler : public AmendmentBlockHandlerInterface { class AmendmentBlockHandler {
public: public:
using ActionType = std::function<void()>; using ActionType = std::function<void()>;
private: private:
std::reference_wrapper<SystemState> state_; std::reference_wrapper<SystemState> state_;
util::Repeat repeat_;
std::chrono::steady_clock::duration interval_; std::chrono::steady_clock::duration interval_;
util::async::AnyExecutionContext ctx_;
std::optional<util::async::AnyOperation<void>> operation_;
ActionType action_; ActionType action_;
@@ -50,19 +46,14 @@ public:
static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION; static ActionType const kDEFAULT_AMENDMENT_BLOCK_ACTION;
AmendmentBlockHandler( AmendmentBlockHandler(
util::async::AnyExecutionContext ctx, boost::asio::io_context& ioc,
SystemState& state, SystemState& state,
std::chrono::steady_clock::duration interval = std::chrono::seconds{1}, std::chrono::steady_clock::duration interval = std::chrono::seconds{1},
ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION ActionType action = kDEFAULT_AMENDMENT_BLOCK_ACTION
); );
~AmendmentBlockHandler() override;
void void
stop() override; notifyAmendmentBlocked();
void
notifyAmendmentBlocked() override;
}; };
} // namespace etl::impl } // namespace etl::impl

221
src/etl/impl/AsyncData.hpp Normal file
View File

@@ -0,0 +1,221 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/Types.hpp"
#include "etl/ETLHelpers.hpp"
#include "etl/MPTHelpers.hpp"
#include "etl/NFTHelpers.hpp"
#include "util/Assert.hpp"
#include "util/log/Logger.hpp"
#include <grpcpp/client_context.h>
#include <grpcpp/grpcpp.h>
#include <grpcpp/support/status.h>
#include <org/xrpl/rpc/v1/get_ledger_data.pb.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
namespace etl::impl {
class AsyncCallData {
util::Logger log_{"ETL"};
std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> cur_;
std::unique_ptr<org::xrpl::rpc::v1::GetLedgerDataResponse> next_;
org::xrpl::rpc::v1::GetLedgerDataRequest request_;
std::unique_ptr<grpc::ClientContext> context_;
grpc::Status status_;
unsigned char nextPrefix_;
std::string lastKey_;
public:
AsyncCallData(uint32_t seq, ripple::uint256 const& marker, std::optional<ripple::uint256> const& nextMarker)
{
request_.mutable_ledger()->set_sequence(seq);
if (marker.isNonZero()) {
request_.set_marker(marker.data(), ripple::uint256::size());
}
request_.set_user("ETL");
nextPrefix_ = 0x00;
if (nextMarker)
nextPrefix_ = nextMarker->data()[0];
unsigned char const prefix = marker.data()[0];
LOG(log_.debug()) << "Setting up AsyncCallData. marker = " << ripple::strHex(marker)
<< " . prefix = " << ripple::strHex(std::string(1, prefix))
<< " . nextPrefix_ = " << ripple::strHex(std::string(1, nextPrefix_));
ASSERT(
nextPrefix_ > prefix || nextPrefix_ == 0x00,
"Next prefix must be greater than current prefix. Got: nextPrefix_ = {}, prefix = {}",
nextPrefix_,
prefix
);
cur_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
next_ = std::make_unique<org::xrpl::rpc::v1::GetLedgerDataResponse>();
context_ = std::make_unique<grpc::ClientContext>();
}
enum class CallStatus { MORE, DONE, ERRORED };
CallStatus
process(
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub,
grpc::CompletionQueue& cq,
BackendInterface& backend,
bool abort,
bool cacheOnly = false
)
{
LOG(log_.trace()) << "Processing response. "
<< "Marker prefix = " << getMarkerPrefix();
if (abort) {
LOG(log_.error()) << "AsyncCallData aborted";
return CallStatus::ERRORED;
}
if (!status_.ok()) {
LOG(log_.error()) << "AsyncCallData status_ not ok: code = " << status_.error_code()
<< " message = " << status_.error_message();
return CallStatus::ERRORED;
}
if (!next_->is_unlimited()) {
LOG(log_.warn()) << "AsyncCallData is_unlimited is false. "
<< "Make sure secure_gateway is set correctly at the ETL source";
}
std::swap(cur_, next_);
bool more = true;
// if no marker returned, we are done
if (cur_->marker().empty())
more = false;
// if returned marker is greater than our end, we are done
unsigned char const prefix = cur_->marker()[0];
if (nextPrefix_ != 0x00 && prefix >= nextPrefix_)
more = false;
// if we are not done, make the next async call
if (more) {
request_.set_marker(cur_->marker());
call(stub, cq);
}
auto const numObjects = cur_->ledger_objects().objects_size();
LOG(log_.debug()) << "Writing " << numObjects << " objects";
std::vector<data::LedgerObject> cacheUpdates;
cacheUpdates.reserve(numObjects);
for (int i = 0; i < numObjects; ++i) {
auto& obj = *(cur_->mutable_ledger_objects()->mutable_objects(i));
if (!more && nextPrefix_ != 0x00) {
if (static_cast<unsigned char>(obj.key()[0]) >= nextPrefix_)
continue;
}
cacheUpdates.push_back(
{*ripple::uint256::fromVoidChecked(obj.key()), {obj.data().begin(), obj.data().end()}}
);
if (!cacheOnly) {
if (!lastKey_.empty())
backend.writeSuccessor(std::move(lastKey_), request_.ledger().sequence(), std::string{obj.key()});
lastKey_ = obj.key();
backend.writeNFTs(getNFTDataFromObj(request_.ledger().sequence(), obj.key(), obj.data()));
auto const maybeMPTHolder = getMPTHolderFromObj(obj.key(), obj.data());
if (maybeMPTHolder)
backend.writeMPTHolders({*maybeMPTHolder});
backend.writeLedgerObject(
std::move(*obj.mutable_key()), request_.ledger().sequence(), std::move(*obj.mutable_data())
);
}
}
backend.cache().update(cacheUpdates, request_.ledger().sequence(), cacheOnly);
LOG(log_.debug()) << "Wrote " << numObjects << " objects. Got more: " << (more ? "YES" : "NO");
return more ? CallStatus::MORE : CallStatus::DONE;
}
void
call(std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub>& stub, grpc::CompletionQueue& cq)
{
context_ = std::make_unique<grpc::ClientContext>();
std::unique_ptr<grpc::ClientAsyncResponseReader<org::xrpl::rpc::v1::GetLedgerDataResponse>> rpc(
stub->PrepareAsyncGetLedgerData(context_.get(), request_, &cq)
);
rpc->StartCall();
rpc->Finish(next_.get(), &status_, this);
}
std::string
getMarkerPrefix()
{
if (next_->marker().empty()) {
return "";
}
return ripple::strHex(std::string{next_->marker().data()[0]});
}
std::string
getLastKey()
{
return lastKey_;
}
};
inline std::vector<AsyncCallData>
makeAsyncCallData(uint32_t const sequence, uint32_t const numMarkers)
{
auto const markers = getMarkers(numMarkers);
std::vector<AsyncCallData> result;
result.reserve(markers.size());
for (size_t i = 0; i + 1 < markers.size(); ++i) {
result.emplace_back(sequence, markers[i], markers[i + 1]);
}
if (not markers.empty()) {
result.emplace_back(sequence, markers.back(), std::nullopt);
}
return result;
}
} // namespace etl::impl

View File

@@ -53,7 +53,7 @@ class CacheLoaderImpl {
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<CacheType> cache_; std::reference_wrapper<CacheType> cache_;
ThreadSafeQueue<CursorPair> queue_; etl::ThreadSafeQueue<CursorPair> queue_;
std::atomic_int16_t remaining_; std::atomic_int16_t remaining_;
std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now(); std::chrono::steady_clock::time_point startTime_ = std::chrono::steady_clock::now();

View File

@@ -0,0 +1,136 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/ETLHelpers.hpp"
#include "util/log/Logger.hpp"
#include <cstddef>
#include <cstdint>
#include <memory>
#include <optional>
#include <vector>
namespace etl::impl {
/**
* @brief A collection of thread safe async queues used by Extractor and Transformer to communicate
*/
template <typename RawDataType>
class ExtractionDataPipe {
public:
using DataType = std::optional<RawDataType>;
using QueueType = ThreadSafeQueue<DataType>; // TODO: probably should use boost::lockfree::queue instead?
static constexpr auto kTOTAL_MAX_IN_QUEUE = 1000u;
private:
util::Logger log_{"ETL"};
uint32_t stride_;
uint32_t startSequence_;
std::vector<std::shared_ptr<QueueType>> queues_;
public:
/**
* @brief Create a new instance of the extraction data pipe
*
* @param stride
* @param startSequence
*/
ExtractionDataPipe(uint32_t stride, uint32_t startSequence) : stride_{stride}, startSequence_{startSequence}
{
auto const maxQueueSize = kTOTAL_MAX_IN_QUEUE / stride;
for (size_t i = 0; i < stride_; ++i)
queues_.push_back(std::make_unique<QueueType>(maxQueueSize));
}
/**
* @brief Push new data package for the specified sequence.
*
* Note: Potentially blocks until the underlying queue can accommodate another entry.
*
* @param sequence The sequence for which to enqueue the data package
* @param data The data to store
*/
void
push(uint32_t sequence, DataType&& data)
{
getQueue(sequence)->push(std::move(data));
}
/**
* @brief Get data package for the given sequence
*
* Note: Potentially blocks until data is available.
*
* @param sequence The sequence for which data is required
* @return The data wrapped in an optional; nullopt means that there is no more data to expect
*/
DataType
popNext(uint32_t sequence)
{
return getQueue(sequence)->pop();
}
/**
* @return Get the stride
*/
uint32_t
getStride() const
{
return stride_;
}
/**
* @brief Hint the Transformer that the queue is done sending data
* @param sequence The sequence for which the extractor queue is to be hinted
*/
void
finish(uint32_t sequence)
{
// empty optional hints the Transformer to shut down
push(sequence, std::nullopt);
}
/**
* @brief Unblocks internal queues
*
* Note: For now this must be called by the ETL when Transformer exits.
*/
void
cleanup()
{
// TODO: this should not have to be called by hand. it should be done via RAII
for (auto i = 0u; i < stride_; ++i)
getQueue(i)->tryPop(); // pop from each queue that might be blocked on a push
}
private:
std::shared_ptr<QueueType>
getQueue(uint32_t sequence)
{
LOG(log_.debug()) << "Grabbing extraction queue for " << sequence << "; start was " << startSequence_;
return queues_[(sequence - startSequence_) % stride_];
}
};
} // namespace etl::impl

147
src/etl/impl/Extractor.hpp Normal file
View File

@@ -0,0 +1,147 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/beast/core/CurrentThreadName.h>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <thread>
#include <utility>
namespace etl::impl {
/**
* @brief Extractor thread that is fetching GRPC data and enqueue it on the DataPipeType
*/
template <typename DataPipeType, typename LedgerFetcherType>
class Extractor {
util::Logger log_{"ETL"};
std::reference_wrapper<DataPipeType> pipe_;
std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers_;
std::reference_wrapper<LedgerFetcherType> ledgerFetcher_;
uint32_t startSequence_;
std::optional<uint32_t> finishSequence_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL
std::thread thread_;
public:
Extractor(
DataPipeType& pipe,
std::shared_ptr<NetworkValidatedLedgersInterface> networkValidatedLedgers,
LedgerFetcherType& ledgerFetcher,
uint32_t startSequence,
std::optional<uint32_t> finishSequence,
SystemState const& state
)
: pipe_(std::ref(pipe))
, networkValidatedLedgers_{std::move(networkValidatedLedgers)}
, ledgerFetcher_{std::ref(ledgerFetcher)}
, startSequence_{startSequence}
, finishSequence_{finishSequence}
, state_{std::cref(state)}
{
thread_ = std::thread([this]() { process(); });
}
~Extractor()
{
if (thread_.joinable())
thread_.join();
}
void
waitTillFinished()
{
ASSERT(thread_.joinable(), "Extractor thread must be joinable");
thread_.join();
}
private:
void
process()
{
beast::setCurrentThreadName("ETLService extract");
double totalTime = 0.0;
auto currentSequence = startSequence_;
while (!shouldFinish(currentSequence) &&
networkValidatedLedgers_->waitUntilValidatedByNetwork(currentSequence)) {
auto [fetchResponse, time] = ::util::timed<std::chrono::duration<double>>([this, currentSequence]() {
return ledgerFetcher_.get().fetchDataAndDiff(currentSequence);
});
totalTime += time;
// if the fetch is unsuccessful, stop. fetchLedger only returns false if the server is shutting down, or
// if the ledger was found in the database (which means another process already wrote the ledger that
// this process was trying to extract; this is a form of a write conflict). Otherwise, fetchDataAndDiff
// will keep trying to fetch the specified ledger until successful.
if (!fetchResponse)
break;
// TODO: extract this part into a strategy perhaps
auto const tps = fetchResponse->transactions_list().transactions_size() / time;
LOG(log_.info()) << "Extract phase time = " << time << "; Extract phase tps = " << tps
<< "; Avg extract time = " << totalTime / (currentSequence - startSequence_ + 1)
<< "; seq = " << currentSequence;
pipe_.get().push(currentSequence, std::move(fetchResponse));
currentSequence += pipe_.get().getStride();
}
pipe_.get().finish(startSequence_);
}
[[nodiscard]] bool
isStopping() const
{
return state_.get().isStopping;
}
[[nodiscard]] bool
hasWriteConflict() const
{
return state_.get().writeConflict;
}
[[nodiscard]] bool
shouldFinish(uint32_t seq) const
{
// Stopping conditions:
// - if there is a write conflict in the load thread, the ETL mechanism should stop.
// - if the entire server is shutting down - this can be detected in a variety of ways.
// - when the given sequence is past the finishSequence in case one is specified
return hasWriteConflict() || isStopping() || (finishSequence_ && seq > *finishSequence_);
}
};
} // namespace etl::impl

View File

@@ -19,14 +19,13 @@
#include "etl/impl/GrpcSource.hpp" #include "etl/impl/GrpcSource.hpp"
#include "etl/InitialLoadObserverInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/LoadBalancerInterface.hpp" #include "etl/impl/AsyncData.hpp"
#include "etl/impl/AsyncGrpcCall.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "web/Resolver.hpp"
#include <boost/asio/spawn.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <fmt/format.h> #include <fmt/format.h>
#include <grpc/grpc.h> #include <grpc/grpc.h>
#include <grpcpp/client_context.h> #include <grpcpp/client_context.h>
@@ -36,41 +35,33 @@
#include <org/xrpl/rpc/v1/get_ledger.pb.h> #include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> #include <org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono> #include <chrono>
#include <cstddef> #include <cstddef>
#include <cstdint> #include <cstdint>
#include <exception> #include <exception>
#include <expected>
#include <memory> #include <memory>
#include <sstream>
#include <stdexcept> #include <stdexcept>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector> #include <vector>
namespace {
std::string
resolve(std::string const& ip, std::string const& port)
{
web::Resolver resolver;
if (auto const results = resolver.resolve(ip, port); not results.empty())
return results.at(0);
throw std::runtime_error("Failed to resolve " + ip + ":" + port);
}
} // namespace
namespace etl::impl { namespace etl::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::chrono::system_clock::duration deadline) GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)) : log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
, initialLoadShouldStop_(std::make_unique<std::atomic_bool>(false))
, deadline_{deadline}
{ {
try { try {
boost::asio::io_context ctx;
boost::asio::ip::tcp::resolver resolver{ctx};
auto const resolverResult = resolver.resolve(ip, grpcPort);
if (resolverResult.empty())
throw std::runtime_error("Failed to resolve " + ip + ":" + grpcPort);
std::stringstream ss;
ss << resolverResult.begin()->endpoint();
grpc::ChannelArguments chArgs; grpc::ChannelArguments chArgs;
chArgs.SetMaxReceiveMessageSize(-1); chArgs.SetMaxReceiveMessageSize(-1);
chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS); chArgs.SetInt(GRPC_ARG_KEEPALIVE_TIME_MS, kKEEPALIVE_PING_INTERVAL_MS);
@@ -79,7 +70,7 @@ GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::
chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA); chArgs.SetInt(GRPC_ARG_HTTP2_MAX_PINGS_WITHOUT_DATA, kMAX_PINGS_WITHOUT_DATA);
stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub( stub_ = org::xrpl::rpc::v1::XRPLedgerAPIService::NewStub(
grpc::CreateCustomChannel(resolve(ip, grpcPort), grpc::InsecureChannelCredentials(), chArgs) grpc::CreateCustomChannel(ss.str(), grpc::InsecureChannelCredentials(), chArgs)
); );
LOG(log_.debug()) << "Made stub for remote."; LOG(log_.debug()) << "Made stub for remote.";
@@ -98,7 +89,7 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
org::xrpl::rpc::v1::GetLedgerRequest request; org::xrpl::rpc::v1::GetLedgerRequest request;
grpc::ClientContext context; grpc::ClientContext context;
context.set_deadline(std::chrono::system_clock::now() + deadline_); // Prevent indefinite blocking context.set_deadline(std::chrono::system_clock::now() + kDEADLINE); // Prevent indefinite blocking
request.mutable_ledger()->set_sequence(sequence); request.mutable_ledger()->set_sequence(sequence);
request.set_transactions(true); request.set_transactions(true);
@@ -109,7 +100,7 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
grpc::Status const status = stub_->GetLedger(&context, request, &response); grpc::Status const status = stub_->GetLedger(&context, request, &response);
if (status.ok() and not response.is_unlimited()) { if (status.ok() && !response.is_unlimited()) {
log_.warn() << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. Status = " log_.warn() << "is_unlimited is false. Make sure secure_gateway is set correctly on the ETL source. Status = "
<< status.error_message(); << status.error_message();
} }
@@ -117,46 +108,41 @@ GrpcSource::fetchLedger(uint32_t sequence, bool getObjects, bool getObjectNeighb
return {status, std::move(response)}; return {status, std::move(response)};
} }
InitialLedgerLoadResult std::pair<std::vector<std::string>, bool>
GrpcSource::loadInitialLedger( GrpcSource::loadInitialLedger(uint32_t const sequence, uint32_t const numMarkers)
uint32_t const sequence,
uint32_t const numMarkers,
InitialLoadObserverInterface& observer
)
{ {
if (*initialLoadShouldStop_)
return std::unexpected{InitialLedgerLoadError::Cancelled};
if (!stub_) if (!stub_)
return std::unexpected{InitialLedgerLoadError::Errored}; return {{}, false};
std::vector<AsyncGrpcCall> calls = AsyncGrpcCall::makeAsyncCalls(sequence, numMarkers); std::vector<etl::impl::AsyncCallData> calls = impl::makeAsyncCallData(sequence, numMarkers);
LOG(log_.debug()) << "Starting data download for ledger " << sequence << "."; LOG(log_.debug()) << "Starting data download for ledger " << sequence << ".";
grpc::CompletionQueue queue; grpc::CompletionQueue cq;
for (auto& call : calls) for (auto& c : calls)
call.call(stub_, queue); c.call(stub_, cq);
std::vector<std::string> edgeKeys;
void* tag = nullptr; void* tag = nullptr;
bool ok = false; bool ok = false;
bool abort = false;
size_t numFinished = 0; size_t numFinished = 0;
bool abort = false;
size_t const incr = 500000;
size_t progress = incr;
std::vector<std::string> edgeKeys;
while (numFinished < calls.size() && queue.Next(&tag, &ok)) { while (numFinished < calls.size() && cq.Next(&tag, &ok)) {
ASSERT(tag != nullptr, "Tag can't be null."); ASSERT(tag != nullptr, "Tag can't be null.");
auto ptr = static_cast<AsyncGrpcCall*>(tag); auto ptr = static_cast<etl::impl::AsyncCallData*>(tag);
if (not ok or *initialLoadShouldStop_) { if (!ok) {
LOG(log_.error()) << "loadInitialLedger cancelled"; LOG(log_.error()) << "loadInitialLedger - ok is false";
return std::unexpected{InitialLedgerLoadError::Cancelled}; return {{}, false}; // handle cancelled
} }
LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix(); LOG(log_.trace()) << "Marker prefix = " << ptr->getMarkerPrefix();
auto result = ptr->process(stub_, queue, observer, abort); auto result = ptr->process(stub_, cq, *backend_, abort);
if (result != AsyncGrpcCall::CallStatus::More) { if (result != etl::impl::AsyncCallData::CallStatus::MORE) {
++numFinished; ++numFinished;
LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished; LOG(log_.debug()) << "Finished a marker. Current number of finished = " << numFinished;
@@ -164,20 +150,18 @@ GrpcSource::loadInitialLedger(
edgeKeys.push_back(std::move(lastKey)); edgeKeys.push_back(std::move(lastKey));
} }
if (result == AsyncGrpcCall::CallStatus::Errored) if (result == etl::impl::AsyncCallData::CallStatus::ERRORED)
abort = true; abort = true;
if (backend_->cache().size() > progress) {
LOG(log_.info()) << "Downloaded " << backend_->cache().size() << " records from rippled";
progress += incr;
}
} }
if (abort) LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size() << ", abort = " << abort
return std::unexpected{InitialLedgerLoadError::Errored}; << ".";
return {std::move(edgeKeys), !abort};
return edgeKeys;
}
void
GrpcSource::stop(boost::asio::yield_context)
{
initialLoadShouldStop_->store(true);
} }
} // namespace etl::impl } // namespace etl::impl

View File

@@ -19,29 +19,25 @@
#pragma once #pragma once
#include "etl/InitialLoadObserverInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/spawn.hpp>
#include <grpcpp/support/status.h> #include <grpcpp/support/status.h>
#include <org/xrpl/rpc/v1/get_ledger.pb.h> #include <org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h> #include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <atomic>
#include <chrono> #include <chrono>
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility> #include <utility>
#include <vector>
namespace etl::impl { namespace etl::impl {
class GrpcSource { class GrpcSource {
util::Logger log_; util::Logger log_;
std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_; std::unique_ptr<org::xrpl::rpc::v1::XRPLedgerAPIService::Stub> stub_;
std::unique_ptr<std::atomic_bool> initialLoadShouldStop_; std::shared_ptr<BackendInterface> backend_;
std::chrono::system_clock::duration deadline_;
static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000; static constexpr auto kKEEPALIVE_PING_INTERVAL_MS = 10000;
static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000; static constexpr auto kKEEPALIVE_TIMEOUT_MS = 5000;
@@ -50,11 +46,7 @@ class GrpcSource {
static constexpr auto kDEADLINE = std::chrono::seconds(30); static constexpr auto kDEADLINE = std::chrono::seconds(30);
public: public:
GrpcSource( GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend);
std::string const& ip,
std::string const& grpcPort,
std::chrono::system_clock::duration deadline = kDEADLINE
);
/** /**
* @brief Fetch data for a specific ledger. * @brief Fetch data for a specific ledger.
@@ -75,19 +67,10 @@ public:
* *
* @param sequence Sequence of the ledger to download * @param sequence Sequence of the ledger to download
* @param numMarkers Number of markers to generate for async calls * @param numMarkers Number of markers to generate for async calls
* @param observer InitialLoadObserverInterface implementation * @return A std::pair of the data and a bool indicating whether the download was successful
* @return Downloaded data or an indication of error or cancellation
*/ */
InitialLedgerLoadResult std::pair<std::vector<std::string>, bool>
loadInitialLedger(uint32_t sequence, uint32_t numMarkers, InitialLoadObserverInterface& observer); loadInitialLedger(uint32_t sequence, uint32_t numMarkers);
/**
* @brief Stop any ongoing operations
* @note This is used to cancel any ongoing initial ledger downloads
* @param yield The coroutine context
*/
void
stop(boost::asio::yield_context yield);
}; };
} // namespace etl::impl } // namespace etl::impl

View File

@@ -21,7 +21,7 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "etl/LedgerFetcherInterface.hpp" #include "etl/LedgerFetcherInterface.hpp"
#include "etl/LoadBalancerInterface.hpp" #include "etlng/LoadBalancerInterface.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <grpcpp/grpcpp.h> #include <grpcpp/grpcpp.h>
@@ -41,13 +41,13 @@ private:
util::Logger log_{"ETL"}; util::Logger log_{"ETL"};
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<LoadBalancerInterface> loadBalancer_; std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
public: public:
/** /**
* @brief Create an instance of the fetcher * @brief Create an instance of the fetcher
*/ */
LedgerFetcher(std::shared_ptr<BackendInterface> backend, std::shared_ptr<LoadBalancerInterface> balancer) LedgerFetcher(std::shared_ptr<BackendInterface> backend, std::shared_ptr<etlng::LoadBalancerInterface> balancer)
: backend_(std::move(backend)), loadBalancer_(std::move(balancer)) : backend_(std::move(backend)), loadBalancer_(std::move(balancer))
{ {
} }

View File

@@ -0,0 +1,276 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/MPTHelpers.hpp"
#include "etl/NFTHelpers.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/Serializer.h>
#include <xrpl/protocol/TxMeta.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
#include <vector>
/**
* @brief Account transactions, NFT transactions and NFT data bundled togeher.
*/
struct FormattedTransactionsData {
std::vector<AccountTransactionsData> accountTxData;
std::vector<NFTTransactionsData> nfTokenTxData;
std::vector<NFTsData> nfTokensData;
std::vector<MPTHolderData> mptHoldersData;
std::vector<NFTsData> nfTokenURIChanges;
};
namespace etl::impl {
/**
* @brief Loads ledger data into the DB
*/
template <typename LedgerFetcherType>
class LedgerLoader {
public:
using GetLedgerResponseType = etlng::LoadBalancerInterface::GetLedgerResponseType;
using OptionalGetLedgerResponseType = etlng::LoadBalancerInterface::OptionalGetLedgerResponseType;
using RawLedgerObjectType = etlng::LoadBalancerInterface::RawLedgerObjectType;
private:
util::Logger log_{"ETL"};
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<etlng::LoadBalancerInterface> loadBalancer_;
std::reference_wrapper<LedgerFetcherType> fetcher_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL
public:
/**
* @brief Create an instance of the loader
*/
LedgerLoader(
std::shared_ptr<BackendInterface> backend,
std::shared_ptr<etlng::LoadBalancerInterface> balancer,
LedgerFetcherType& fetcher,
SystemState const& state
)
: backend_{std::move(backend)}
, loadBalancer_{std::move(balancer)}
, fetcher_{std::ref(fetcher)}
, state_{std::cref(state)}
{
}
/**
* @brief Insert extracted transaction into the ledger
*
* Insert all of the extracted transactions into the ledger, returning transactions related to accounts,
* transactions related to NFTs, and NFTs themselves for later processing.
*
* @param ledger ledger to insert transactions into
* @param data data extracted from an ETL source
* @return The necessary info to write the account_transactions/account_tx and nft_token_transactions tables
*/
FormattedTransactionsData
insertTransactions(ripple::LedgerHeader const& ledger, GetLedgerResponseType& data)
{
FormattedTransactionsData result;
std::vector<NFTsData> nfTokenURIChanges;
for (auto& txn : *(data.mutable_transactions_list()->mutable_transactions())) {
std::string* raw = txn.mutable_transaction_blob();
ripple::SerialIter it{raw->data(), raw->size()};
ripple::STTx const sttx{it};
LOG(log_.trace()) << "Inserting transaction = " << sttx.getTransactionID();
ripple::TxMeta const txMeta{sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
auto const [nftTxs, maybeNFT] = getNFTDataFromTx(txMeta, sttx);
result.nfTokenTxData.insert(result.nfTokenTxData.end(), nftTxs.begin(), nftTxs.end());
// We need to unique the URI changes separately, in case the URI changes are discarded
if (maybeNFT) {
if (maybeNFT->onlyUriChanged) {
nfTokenURIChanges.push_back(*maybeNFT);
} else {
result.nfTokensData.push_back(*maybeNFT);
}
}
auto const maybeMPTHolder = getMPTHolderFromTx(txMeta, sttx);
if (maybeMPTHolder)
result.mptHoldersData.push_back(*maybeMPTHolder);
result.accountTxData.emplace_back(txMeta, sttx.getTransactionID());
static constexpr std::size_t kEY_SIZE = 32;
std::string keyStr{reinterpret_cast<char const*>(sttx.getTransactionID().data()), kEY_SIZE};
backend_->writeTransaction(
std::move(keyStr),
ledger.seq,
ledger.closeTime.time_since_epoch().count(),
std::move(*raw),
std::move(*txn.mutable_metadata_blob())
);
}
result.nfTokensData = getUniqueNFTsDatas(result.nfTokensData);
nfTokenURIChanges = getUniqueNFTsDatas(nfTokenURIChanges);
// Put uri change at the end to ensure the uri not overwritten
result.nfTokensData.insert(result.nfTokensData.end(), nfTokenURIChanges.begin(), nfTokenURIChanges.end());
return result;
}
/**
* @brief Download a ledger with specified sequence in full
*
* Note: This takes several minutes or longer.
*
* @param sequence the sequence of the ledger to download
* @return The ledger downloaded, with a full transaction and account state map
*/
std::optional<ripple::LedgerHeader>
loadInitialLedger(uint32_t sequence)
{
// check that database is actually empty
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (rng) {
ASSERT(false, "Database is not empty");
return {};
}
// Fetch the ledger from the network. This function will not return until either the fetch is successful, or the
// server is being shutdown. This only fetches the ledger header and the transactions+metadata
OptionalGetLedgerResponseType ledgerData{fetcher_.get().fetchData(sequence)};
if (!ledgerData)
return {};
ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(ledgerData->ledger_header()));
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
auto timeDiff = ::util::timed<std::chrono::duration<double>>([this, sequence, &lgrInfo, &ledgerData]() {
backend_->startWrites();
LOG(log_.debug()) << "Started writes";
backend_->writeLedger(lgrInfo, std::move(*ledgerData->mutable_ledger_header()));
LOG(log_.debug()) << "Wrote ledger";
FormattedTransactionsData insertTxResult = insertTransactions(lgrInfo, *ledgerData);
LOG(log_.debug()) << "Inserted txns";
// download the full account state map. This function downloads full
// ledger data and pushes the downloaded data into the writeQueue.
// asyncWriter consumes from the queue and inserts the data into the
// Ledger object. Once the below call returns, all data has been pushed
// into the queue
auto edgeKeys = loadBalancer_->loadInitialLedger(sequence);
size_t numWrites = 0;
backend_->cache().setFull();
auto seconds =
::util::timed<std::chrono::seconds>([this, keys = std::move(edgeKeys), sequence, &numWrites]() mutable {
for (auto& key : keys) {
LOG(log_.debug()) << "Writing edge key = " << ripple::strHex(key);
auto succ = backend_->cache().getSuccessor(*ripple::uint256::fromVoidChecked(key), sequence);
if (succ)
backend_->writeSuccessor(std::move(key), sequence, uint256ToString(succ->key));
}
ripple::uint256 prev = data::kFIRST_KEY;
while (auto cur = backend_->cache().getSuccessor(prev, sequence)) {
ASSERT(cur.has_value(), "Successor for key {} must exist", ripple::strHex(prev));
if (prev == data::kFIRST_KEY)
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(cur->key));
if (isBookDir(cur->key, cur->blob)) {
auto base = getBookBase(cur->key);
// make sure the base is not an actual object
if (!backend_->cache().get(base, sequence)) {
auto succ = backend_->cache().getSuccessor(base, sequence);
ASSERT(succ.has_value(), "Book base {} must have a successor", ripple::strHex(base));
if (succ->key == cur->key) {
LOG(log_.debug()) << "Writing book successor = " << ripple::strHex(base) << " - "
<< ripple::strHex(cur->key);
backend_->writeSuccessor(
uint256ToString(base), sequence, uint256ToString(cur->key)
);
}
}
++numWrites;
}
prev = cur->key;
static constexpr std::size_t kLOG_STRIDE = 100000;
if (numWrites % kLOG_STRIDE == 0 && numWrites != 0)
LOG(log_.info()) << "Wrote " << numWrites << " book successors";
}
backend_->writeSuccessor(uint256ToString(prev), sequence, uint256ToString(data::kLAST_KEY));
++numWrites;
});
LOG(log_.info()) << "Looping through cache and submitting all writes took " << seconds
<< " seconds. numWrites = " << std::to_string(numWrites);
LOG(log_.debug()) << "Loaded initial ledger";
if (not state_.get().isStopping) {
backend_->writeAccountTransactions(std::move(insertTxResult.accountTxData));
backend_->writeNFTs(insertTxResult.nfTokensData);
backend_->writeNFTTransactions(insertTxResult.nfTokenTxData);
backend_->writeMPTHolders(insertTxResult.mptHoldersData);
}
backend_->finishWrites(sequence);
});
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
return lgrInfo;
}
};
} // namespace etl::impl

View File

@@ -1,7 +1,7 @@
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
/* /*
This file is part of clio: https://github.com/XRPLF/clio This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers. Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above purpose with or without fee is hereby granted, provided that the above
@@ -21,14 +21,12 @@
#include "data/BackendInterface.hpp" #include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp" #include "data/DBHelpers.hpp"
#include "etl/LedgerPublisherInterface.hpp" #include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp" #include "etl/SystemState.hpp"
#include "etl/impl/Loading.hpp" #include "etlng/LedgerPublisherInterface.hpp"
#include "feed/SubscriptionManagerInterface.hpp" #include "feed/SubscriptionManagerInterface.hpp"
#include "util/Assert.hpp" #include "util/Assert.hpp"
#include "util/Mutex.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyStrand.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp" #include "util/prometheus/Counter.hpp"
#include "util/prometheus/Prometheus.hpp" #include "util/prometheus/Prometheus.hpp"
@@ -36,7 +34,6 @@
#include <boost/asio/io_context.hpp> #include <boost/asio/io_context.hpp>
#include <boost/asio/post.hpp> #include <boost/asio/post.hpp>
#include <boost/asio/strand.hpp> #include <boost/asio/strand.hpp>
#include <fmt/format.h>
#include <xrpl/basics/chrono.h> #include <xrpl/basics/chrono.h>
#include <xrpl/protocol/Fees.h> #include <xrpl/protocol/Fees.h>
#include <xrpl/protocol/LedgerHeader.h> #include <xrpl/protocol/LedgerHeader.h>
@@ -71,16 +68,18 @@ namespace etl::impl {
* includes reading all of the transactions from the database) is done from the application wide asio io_service, and a * includes reading all of the transactions from the database) is done from the application wide asio io_service, and a
* strand is used to ensure ledgers are published in order. * strand is used to ensure ledgers are published in order.
*/ */
class LedgerPublisher : public LedgerPublisherInterface { class LedgerPublisher : public etlng::LedgerPublisherInterface {
util::Logger log_{"ETL"}; util::Logger log_{"ETL"};
util::async::AnyStrand publishStrand_; boost::asio::strand<boost::asio::io_context::executor_type> publishStrand_;
std::shared_ptr<BackendInterface> backend_; std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<data::LedgerCacheInterface> cache_;
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_; std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions_;
std::reference_wrapper<SystemState const> state_; // shared state for ETL std::reference_wrapper<SystemState const> state_; // shared state for ETL
util::Mutex<std::chrono::time_point<ripple::NetClock>, std::shared_mutex> lastCloseTime_; std::chrono::time_point<ripple::NetClock> lastCloseTime_;
mutable std::shared_mutex closeTimeMtx_;
std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt( std::reference_wrapper<util::prometheus::CounterInt> lastPublishSeconds_ = PrometheusService::counterInt(
"etl_last_publish_seconds", "etl_last_publish_seconds",
@@ -88,20 +87,23 @@ class LedgerPublisher : public LedgerPublisherInterface {
"Seconds since epoch of the last published ledger" "Seconds since epoch of the last published ledger"
); );
util::Mutex<std::optional<uint32_t>, std::shared_mutex> lastPublishedSequence_; std::optional<uint32_t> lastPublishedSequence_;
mutable std::shared_mutex lastPublishedSeqMtx_;
public: public:
/** /**
* @brief Create an instance of the publisher * @brief Create an instance of the publisher
*/ */
LedgerPublisher( LedgerPublisher(
util::async::AnyExecutionContext ctx, boost::asio::io_context& ioc,
std::shared_ptr<BackendInterface> backend, std::shared_ptr<BackendInterface> backend,
data::LedgerCacheInterface& cache,
std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions, std::shared_ptr<feed::SubscriptionManagerInterface> subscriptions,
SystemState const& state SystemState const& state
) )
: publishStrand_{ctx.makeStrand()} : publishStrand_{boost::asio::make_strand(ioc)}
, backend_{std::move(backend)} , backend_{std::move(backend)}
, cache_{cache}
, subscriptions_{std::move(subscriptions)} , subscriptions_{std::move(subscriptions)}
, state_{std::cref(state)} , state_{std::cref(state)}
{ {
@@ -163,13 +165,28 @@ public:
void void
publish(ripple::LedgerHeader const& lgrInfo) publish(ripple::LedgerHeader const& lgrInfo)
{ {
publishStrand_.submit([this, lgrInfo = lgrInfo] { boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq); LOG(log_.info()) << "Publishing ledger " << std::to_string(lgrInfo.seq);
if (!state_.get().isWriting) {
LOG(log_.info()) << "Updating ledger range for read node.";
if (!cache_.get().isDisabled()) {
std::vector<data::LedgerObject> const diff = data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchLedgerDiff(lgrInfo.seq, yield);
});
cache_.get().update(diff, lgrInfo.seq);
}
backend_->updateRange(lgrInfo.seq);
}
setLastClose(lgrInfo.closeTime); setLastClose(lgrInfo.closeTime);
auto age = lastCloseAgeSeconds(); auto age = lastCloseAgeSeconds();
// if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish // if the ledger closed over MAX_LEDGER_AGE_SECONDS ago, assume we are still catching up and don't publish
// TODO: this probably should be a strategy
static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600; static constexpr std::uint32_t kMAX_LEDGER_AGE_SECONDS = 600;
if (age < kMAX_LEDGER_AGE_SECONDS) { if (age < kMAX_LEDGER_AGE_SECONDS) {
std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) { std::optional<ripple::Fees> fees = data::synchronousAndRetryOnTimeout([&](auto yield) {
@@ -177,14 +194,17 @@ public:
}); });
ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq); ASSERT(fees.has_value(), "Fees must exist for ledger {}", lgrInfo.seq);
auto transactions = data::synchronousAndRetryOnTimeout([&](auto yield) { std::vector<data::TransactionAndMetadata> transactions =
data::synchronousAndRetryOnTimeout([&](auto yield) {
return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield); return backend_->fetchAllTransactionsInLedger(lgrInfo.seq, yield);
}); });
auto const ledgerRange = backend_->fetchLedgerRange(); auto const ledgerRange = backend_->fetchLedgerRange();
ASSERT(ledgerRange.has_value(), "Ledger range must exist"); ASSERT(ledgerRange.has_value(), "Ledger range must exist");
auto const range = fmt::format("{}-{}", ledgerRange->minSequence, ledgerRange->maxSequence); std::string const range =
std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence);
subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size()); subscriptions_->pubLedger(lgrInfo, *fees, range, transactions.size());
// order with transaction index // order with transaction index
@@ -197,15 +217,15 @@ public:
object2.getFieldU32(ripple::sfTransactionIndex); object2.getFieldU32(ripple::sfTransactionIndex);
}); });
for (auto const& txAndMeta : transactions) for (auto& txAndMeta : transactions)
subscriptions_->pubTransaction(txAndMeta, lgrInfo); subscriptions_->pubTransaction(txAndMeta, lgrInfo);
subscriptions_->pubBookChanges(lgrInfo, transactions); subscriptions_->pubBookChanges(lgrInfo, transactions);
setLastPublishTime(); setLastPublishTime();
LOG(log_.info()) << "Published ledger " << lgrInfo.seq; LOG(log_.info()) << "Published ledger " << std::to_string(lgrInfo.seq);
} else { } else {
LOG(log_.info()) << "Skipping publishing ledger " << lgrInfo.seq; LOG(log_.info()) << "Skipping publishing ledger " << std::to_string(lgrInfo.seq);
} }
}); });
@@ -240,30 +260,32 @@ public:
std::uint32_t std::uint32_t
lastCloseAgeSeconds() const override lastCloseAgeSeconds() const override
{ {
auto closeTime = lastCloseTime_.lock()->time_since_epoch().count(); std::shared_lock const lck(closeTimeMtx_);
auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()) auto now = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
.count(); .count();
auto closeTime = lastCloseTime_.time_since_epoch().count();
if (now < (kRIPPLE_EPOCH_START + closeTime)) if (now < (kRIPPLE_EPOCH_START + closeTime))
return 0; return 0;
return now - (kRIPPLE_EPOCH_START + closeTime); return now - (kRIPPLE_EPOCH_START + closeTime);
} }
/** /**
* @brief Get the sequence of the last schueduled ledger to publish, Be aware that the ledger may not have been * @brief Get the sequence of the last scheduled ledger to publish, Be aware that the ledger may not have been
* published to network * published to network
*/ */
std::optional<uint32_t> std::optional<uint32_t>
getLastPublishedSequence() const getLastPublishedSequence() const
{ {
return *lastPublishedSequence_.lock(); std::scoped_lock const lck(lastPublishedSeqMtx_);
return lastPublishedSequence_;
} }
private: private:
void void
setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime) setLastClose(std::chrono::time_point<ripple::NetClock> lastCloseTime)
{ {
auto closeTime = lastCloseTime_.lock<std::scoped_lock>(); std::scoped_lock const lck(closeTimeMtx_);
*closeTime = lastCloseTime; lastCloseTime_ = lastCloseTime;
} }
void void
@@ -277,8 +299,8 @@ private:
void void
setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence) setLastPublishedSequence(std::optional<uint32_t> lastPublishedSequence)
{ {
auto lastPublishSeq = lastPublishedSequence_.lock(); std::scoped_lock const lck(lastPublishedSeqMtx_);
*lastPublishSeq = lastPublishedSequence; lastPublishedSequence_ = lastPublishedSequence;
} }
}; };

View File

@@ -19,8 +19,6 @@
#pragma once #pragma once
#include "etl/InitialLoadObserverInterface.hpp"
#include "etl/LoadBalancerInterface.hpp"
#include "etl/Source.hpp" #include "etl/Source.hpp"
#include "etl/impl/ForwardingSource.hpp" #include "etl/impl/ForwardingSource.hpp"
#include "etl/impl/GrpcSource.hpp" #include "etl/impl/GrpcSource.hpp"
@@ -53,8 +51,8 @@ namespace etl::impl {
*/ */
template < template <
typename GrpcSourceType = GrpcSource, typename GrpcSourceType = GrpcSource,
typename SubscriptionSourceTypePtr = std::unique_ptr<impl::SubscriptionSource>, typename SubscriptionSourceTypePtr = std::unique_ptr<SubscriptionSource>,
typename ForwardingSourceType = impl::ForwardingSource> typename ForwardingSourceType = ForwardingSource>
class SourceImpl : public SourceBase { class SourceImpl : public SourceBase {
std::string ip_; std::string ip_;
std::string wsPort_; std::string wsPort_;
@@ -108,7 +106,6 @@ public:
stop(boost::asio::yield_context yield) final stop(boost::asio::yield_context yield) final
{ {
subscriptionSource_->stop(yield); subscriptionSource_->stop(yield);
grpcSource_.stop(yield);
} }
/** /**
@@ -201,13 +198,12 @@ public:
* *
* @param sequence Sequence of the ledger to download * @param sequence Sequence of the ledger to download
* @param numMarkers Number of markers to generate for async calls * @param numMarkers Number of markers to generate for async calls
* @param loader InitialLoadObserverInterface implementation
* @return A std::pair of the data and a bool indicating whether the download was successful * @return A std::pair of the data and a bool indicating whether the download was successful
*/ */
InitialLedgerLoadResult std::pair<std::vector<std::string>, bool>
loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers, InitialLoadObserverInterface& loader) final loadInitialLedger(uint32_t sequence, std::uint32_t numMarkers) final
{ {
return grpcSource_.loadInitialLedger(sequence, numMarkers, loader); return grpcSource_.loadInitialLedger(sequence, numMarkers);
} }
/** /**

View File

@@ -0,0 +1,424 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2023, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/DBHelpers.hpp"
#include "data/Types.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/LedgerLoader.hpp"
#include "util/Assert.hpp"
#include "util/LedgerUtils.hpp"
#include "util/Profiler.hpp"
#include "util/log/Logger.hpp"
#include <grpcpp/grpcpp.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <set>
#include <stdexcept>
#include <string>
#include <thread>
#include <utility>
#include <vector>
namespace etl::impl {
/*
* TODO:
*
* 1) loading of data into db should not really be part of transform right?
* 2) can we just prepare the data and give it to the loader afterwards?
* 3) how to deal with cache update that is needed to write successors if neighbours not included?
*/
/**
* @brief Transformer thread that prepares new ledger out of raw data from GRPC.
*/
template <
typename DataPipeType,
typename LedgerLoaderType,
typename LedgerPublisherType,
typename AmendmentBlockHandlerType>
class Transformer {
using GetLedgerResponseType = typename LedgerLoaderType::GetLedgerResponseType;
using RawLedgerObjectType = typename LedgerLoaderType::RawLedgerObjectType;
util::Logger log_{"ETL"};
std::reference_wrapper<DataPipeType> pipe_;
std::shared_ptr<BackendInterface> backend_;
std::reference_wrapper<LedgerLoaderType> loader_;
std::reference_wrapper<LedgerPublisherType> publisher_;
std::reference_wrapper<AmendmentBlockHandlerType> amendmentBlockHandler_;
uint32_t startSequence_;
std::reference_wrapper<SystemState> state_; // shared state for ETL
std::thread thread_;
public:
/**
* @brief Create an instance of the transformer.
*
* This spawns a new thread that reads from the data pipe and writes ledgers to the DB using LedgerLoader and
* LedgerPublisher.
*/
Transformer(
DataPipeType& pipe,
std::shared_ptr<BackendInterface> backend,
LedgerLoaderType& loader,
LedgerPublisherType& publisher,
AmendmentBlockHandlerType& amendmentBlockHandler,
uint32_t startSequence,
SystemState& state
)
: pipe_{std::ref(pipe)}
, backend_{std::move(backend)}
, loader_{std::ref(loader)}
, publisher_{std::ref(publisher)}
, amendmentBlockHandler_{std::ref(amendmentBlockHandler)}
, startSequence_{startSequence}
, state_{std::ref(state)}
{
thread_ = std::thread([this]() { process(); });
}
/**
* @brief Joins the transformer thread.
*/
~Transformer()
{
if (thread_.joinable())
thread_.join();
}
/**
* @brief Block calling thread until transformer thread exits.
*/
void
waitTillFinished()
{
ASSERT(thread_.joinable(), "Transformer thread must be joinable");
thread_.join();
}
private:
void
process()
{
beast::setCurrentThreadName("ETLService transform");
uint32_t currentSequence = startSequence_;
while (not hasWriteConflict()) {
auto fetchResponse = pipe_.get().popNext(currentSequence);
++currentSequence;
// if fetchResponse is an empty optional, the extractor thread has stopped and the transformer should
// stop as well
if (!fetchResponse)
break;
if (isStopping())
continue;
auto const start = std::chrono::system_clock::now();
auto [lgrInfo, success] = buildNextLedger(*fetchResponse);
if (success) {
auto const numTxns = fetchResponse->transactions_list().transactions_size();
auto const numObjects = fetchResponse->ledger_objects().objects_size();
auto const end = std::chrono::system_clock::now();
auto const duration = ((end - start).count()) / 1000000000.0;
LOG(log_.info()) << "Load phase of ETL. Successfully wrote ledger! Ledger info: "
<< util::toString(lgrInfo) << ". txn count = " << numTxns
<< ". object count = " << numObjects << ". load time = " << duration
<< ". load txns per second = " << numTxns / duration
<< ". load objs per second = " << numObjects / duration;
// success is false if the ledger was already written
publisher_.get().publish(lgrInfo);
} else {
LOG(log_.error()) << "Error writing ledger. " << util::toString(lgrInfo);
}
setWriteConflict(not success);
}
}
/**
* @brief Build the next ledger using the previous ledger and the extracted data.
* @note rawData should be data that corresponds to the ledger immediately following the previous seq.
*
* @param rawData Data extracted from an ETL source
* @return The newly built ledger and data to write to the database
*/
std::pair<ripple::LedgerHeader, bool>
buildNextLedger(GetLedgerResponseType& rawData)
{
LOG(log_.debug()) << "Beginning ledger update";
ripple::LedgerHeader lgrInfo = ::util::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
LOG(log_.debug()) << "Deserialized ledger header. " << ::util::toString(lgrInfo);
backend_->startWrites();
backend_->writeLedger(lgrInfo, std::move(*rawData.mutable_ledger_header()));
writeSuccessors(lgrInfo, rawData);
std::optional<FormattedTransactionsData> insertTxResultOp;
try {
updateCache(lgrInfo, rawData);
LOG(log_.debug()) << "Inserted/modified/deleted all objects. Number of objects = "
<< rawData.ledger_objects().objects_size();
insertTxResultOp.emplace(loader_.get().insertTransactions(lgrInfo, rawData));
} catch (std::runtime_error const& e) {
LOG(log_.fatal()) << "Failed to build next ledger: " << e.what();
amendmentBlockHandler_.get().notifyAmendmentBlocked();
return {ripple::LedgerHeader{}, false};
}
LOG(log_.debug()) << "Inserted all transactions. Number of transactions = "
<< rawData.transactions_list().transactions_size();
backend_->writeAccountTransactions(std::move(insertTxResultOp->accountTxData));
backend_->writeNFTs(insertTxResultOp->nfTokensData);
backend_->writeNFTTransactions(insertTxResultOp->nfTokenTxData);
backend_->writeMPTHolders(insertTxResultOp->mptHoldersData);
auto [success, duration] =
::util::timed<std::chrono::duration<double>>([&]() { return backend_->finishWrites(lgrInfo.seq); });
LOG(log_.debug()) << "Finished writes. Total time: " << std::to_string(duration);
LOG(log_.debug()) << "Finished ledger update: " << ::util::toString(lgrInfo);
return {lgrInfo, success};
}
/**
* @brief Update cache from new ledger data.
*
* @param lgrInfo Ledger info
* @param rawData Ledger data from GRPC
*/
void
updateCache(ripple::LedgerHeader const& lgrInfo, GetLedgerResponseType& rawData)
{
std::vector<data::LedgerObject> cacheUpdates;
cacheUpdates.reserve(rawData.ledger_objects().objects_size());
// TODO change these to unordered_set
std::set<ripple::uint256> bookSuccessorsToCalculate;
std::set<ripple::uint256> modified;
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
auto key = ripple::uint256::fromVoidChecked(obj.key());
ASSERT(key.has_value(), "Failed to deserialize key from void");
cacheUpdates.push_back({*key, {obj.mutable_data()->begin(), obj.mutable_data()->end()}});
LOG(log_.debug()) << "key = " << ripple::strHex(*key) << " - mod type = " << obj.mod_type();
if (obj.mod_type() != RawLedgerObjectType::MODIFIED && !rawData.object_neighbors_included()) {
LOG(log_.debug()) << "object neighbors not included. using cache";
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq - 1)
throw std::logic_error("Cache is not full, but object neighbors were not included");
auto const blob = obj.mutable_data();
auto checkBookBase = false;
auto const isDeleted = (blob->size() == 0);
if (isDeleted) {
auto const old = backend_->cache().get(*key, lgrInfo.seq - 1);
ASSERT(old.has_value(), "Deleted object {} must be in cache", ripple::strHex(*key));
checkBookBase = isBookDir(*key, *old);
} else {
checkBookBase = isBookDir(*key, *blob);
}
if (checkBookBase) {
LOG(log_.debug()) << "Is book dir. Key = " << ripple::strHex(*key);
auto const bookBase = getBookBase(*key);
auto const oldFirstDir = backend_->cache().getSuccessor(bookBase, lgrInfo.seq - 1);
ASSERT(
oldFirstDir.has_value(),
"Book base must have a successor for lgrInfo.seq - 1 = {}",
lgrInfo.seq - 1
);
// We deleted the first directory, or we added a directory prior to the old first
// directory
if ((isDeleted && key == oldFirstDir->key) || (!isDeleted && key < oldFirstDir->key)) {
LOG(log_.debug())
<< "Need to recalculate book base successor. base = " << ripple::strHex(bookBase)
<< " - key = " << ripple::strHex(*key) << " - isDeleted = " << isDeleted
<< " - seq = " << lgrInfo.seq;
bookSuccessorsToCalculate.insert(bookBase);
}
}
}
if (obj.mod_type() == RawLedgerObjectType::MODIFIED)
modified.insert(*key);
backend_->writeLedgerObject(std::move(*obj.mutable_key()), lgrInfo.seq, std::move(*obj.mutable_data()));
}
backend_->cache().update(cacheUpdates, lgrInfo.seq);
// rippled didn't send successor information, so use our cache
if (!rawData.object_neighbors_included()) {
LOG(log_.debug()) << "object neighbors not included. using cache";
if (!backend_->cache().isFull() || backend_->cache().latestLedgerSequence() != lgrInfo.seq)
throw std::logic_error("Cache is not full, but object neighbors were not included");
for (auto const& obj : cacheUpdates) {
if (modified.contains(obj.key))
continue;
auto lb = backend_->cache().getPredecessor(obj.key, lgrInfo.seq);
if (!lb)
lb = {.key = data::kFIRST_KEY, .blob = {}};
auto ub = backend_->cache().getSuccessor(obj.key, lgrInfo.seq);
if (!ub)
ub = {.key = data::kLAST_KEY, .blob = {}};
if (obj.blob.empty()) {
LOG(log_.debug()) << "writing successor for deleted object " << ripple::strHex(obj.key) << " - "
<< ripple::strHex(lb->key) << " - " << ripple::strHex(ub->key);
backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(ub->key));
} else {
backend_->writeSuccessor(uint256ToString(lb->key), lgrInfo.seq, uint256ToString(obj.key));
backend_->writeSuccessor(uint256ToString(obj.key), lgrInfo.seq, uint256ToString(ub->key));
LOG(log_.debug()) << "writing successor for new object " << ripple::strHex(lb->key) << " - "
<< ripple::strHex(obj.key) << " - " << ripple::strHex(ub->key);
}
}
for (auto const& base : bookSuccessorsToCalculate) {
auto succ = backend_->cache().getSuccessor(base, lgrInfo.seq);
if (succ) {
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(succ->key));
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
<< ripple::strHex(succ->key);
} else {
backend_->writeSuccessor(uint256ToString(base), lgrInfo.seq, uint256ToString(data::kLAST_KEY));
LOG(log_.debug()) << "Updating book successor " << ripple::strHex(base) << " - "
<< ripple::strHex(data::kLAST_KEY);
}
}
}
}
/**
* @brief Write successors info into DB.
*
* @param lgrInfo Ledger info
* @param rawData Ledger data from GRPC
*/
void
writeSuccessors(ripple::LedgerHeader const& lgrInfo, GetLedgerResponseType& rawData)
{
// Write successor info, if included from rippled
if (rawData.object_neighbors_included()) {
LOG(log_.debug()) << "object neighbors included";
for (auto& obj : *(rawData.mutable_book_successors())) {
auto firstBook = std::move(*obj.mutable_first_book());
if (!firstBook.size())
firstBook = uint256ToString(data::kLAST_KEY);
LOG(log_.debug()) << "writing book successor " << ripple::strHex(obj.book_base()) << " - "
<< ripple::strHex(firstBook);
backend_->writeSuccessor(std::move(*obj.mutable_book_base()), lgrInfo.seq, std::move(firstBook));
}
for (auto& obj : *(rawData.mutable_ledger_objects()->mutable_objects())) {
if (obj.mod_type() != RawLedgerObjectType::MODIFIED) {
std::string* predPtr = obj.mutable_predecessor();
if (predPtr->empty())
*predPtr = uint256ToString(data::kFIRST_KEY);
std::string* succPtr = obj.mutable_successor();
if (succPtr->empty())
*succPtr = uint256ToString(data::kLAST_KEY);
if (obj.mod_type() == RawLedgerObjectType::DELETED) {
LOG(log_.debug()) << "Modifying successors for deleted object " << ripple::strHex(obj.key())
<< " - " << ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::move(*succPtr));
} else {
LOG(log_.debug()) << "adding successor for new object " << ripple::strHex(obj.key()) << " - "
<< ripple::strHex(*predPtr) << " - " << ripple::strHex(*succPtr);
backend_->writeSuccessor(std::move(*predPtr), lgrInfo.seq, std::string{obj.key()});
backend_->writeSuccessor(std::string{obj.key()}, lgrInfo.seq, std::move(*succPtr));
}
} else
LOG(log_.debug()) << "object modified " << ripple::strHex(obj.key());
}
}
}
/** @return true if the transformer is stopping; false otherwise */
bool
isStopping() const
{
return state_.get().isStopping;
}
/** @return true if there was a write conflict; false otherwise */
bool
hasWriteConflict() const
{
return state_.get().writeConflict;
}
/**
* @brief Sets the write conflict flag.
*
* @param conflict The value to set
*/
void
setWriteConflict(bool conflict)
{
state_.get().writeConflict = conflict;
}
};
} // namespace etl::impl

View File

@@ -19,7 +19,7 @@
#pragma once #pragma once
namespace etl { namespace etlng {
/** /**
* @brief The interface of a handler for amendment blocking * @brief The interface of a handler for amendment blocking
@@ -40,4 +40,4 @@ struct AmendmentBlockHandlerInterface {
stop() = 0; stop() = 0;
}; };
} // namespace etl } // namespace etlng

23
src/etlng/CMakeLists.txt Normal file
View File

@@ -0,0 +1,23 @@
add_library(clio_etlng)
target_sources(
clio_etlng
PRIVATE ETLService.cpp
LoadBalancer.cpp
Source.cpp
impl/AmendmentBlockHandler.cpp
impl/AsyncGrpcCall.cpp
impl/Extraction.cpp
impl/GrpcSource.cpp
impl/ForwardingSource.cpp
impl/Loading.cpp
impl/Monitor.cpp
impl/TaskManager.cpp
impl/ext/Cache.cpp
impl/ext/Core.cpp
impl/ext/MPT.cpp
impl/ext/NFT.cpp
impl/ext/Successor.cpp
)
target_link_libraries(clio_etlng PUBLIC clio_data)

View File

@@ -21,7 +21,7 @@
#include <cstdint> #include <cstdint>
namespace etl { namespace etlng {
/** /**
* @brief An interface for the Cache Loader * @brief An interface for the Cache Loader
@@ -50,4 +50,4 @@ struct CacheLoaderInterface {
wait() noexcept = 0; wait() noexcept = 0;
}; };
} // namespace etl } // namespace etlng

View File

@@ -20,12 +20,12 @@
#pragma once #pragma once
#include "data/Types.hpp" #include "data/Types.hpp"
#include "etl/Models.hpp" #include "etlng/Models.hpp"
#include <cstdint> #include <cstdint>
#include <vector> #include <vector>
namespace etl { namespace etlng {
/** /**
* @brief An interface for the Cache Updater * @brief An interface for the Cache Updater
@@ -63,4 +63,4 @@ struct CacheUpdaterInterface {
setFull() = 0; setFull() = 0;
}; };
} // namespace etl } // namespace etlng

334
src/etlng/ETLService.cpp Normal file
View File

@@ -0,0 +1,334 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "etlng/ETLService.hpp"
#include "data/BackendInterface.hpp"
#include "data/LedgerCacheInterface.hpp"
#include "data/Types.hpp"
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Registry.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
#include "etlng/impl/ext/Cache.hpp"
#include "etlng/impl/ext/Core.hpp"
#include "etlng/impl/ext/NFT.hpp"
#include "etlng/impl/ext/Successor.hpp"
#include "util/Assert.hpp"
#include "util/Profiler.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/log/Logger.hpp"
#include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp>
#include <xrpl/protocol/LedgerHeader.h>
#include <chrono>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
#include <utility>
namespace etlng {
ETLService::ETLService(
util::async::AnyExecutionContext ctx,
std::reference_wrapper<util::config::ClioConfigDefinition const> config,
std::shared_ptr<data::BackendInterface> backend,
std::shared_ptr<LoadBalancerInterface> balancer,
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider,
std::shared_ptr<etl::SystemState> state
)
: ctx_(std::move(ctx))
, config_(config)
, backend_(std::move(backend))
, balancer_(std::move(balancer))
, ledgers_(std::move(ledgers))
, publisher_(std::move(publisher))
, cacheLoader_(std::move(cacheLoader))
, cacheUpdater_(std::move(cacheUpdater))
, extractor_(std::move(extractor))
, loader_(std::move(loader))
, initialLoadObserver_(std::move(initialLoadObserver))
, taskManagerProvider_(std::move(taskManagerProvider))
, monitorProvider_(std::move(monitorProvider))
, state_(std::move(state))
, startSequence_(config.get().maybeValue<uint32_t>("start_sequence"))
, finishSequence_(config.get().maybeValue<uint32_t>("finish_sequence"))
{
ASSERT(not state_->isWriting, "ETL should never start in writer mode");
if (startSequence_.has_value())
LOG(log_.info()) << "Start sequence: " << *startSequence_;
if (finishSequence_.has_value())
LOG(log_.info()) << "Finish sequence: " << *finishSequence_;
LOG(log_.info()) << "Starting in " << (state_->isStrictReadonly ? "STRICT READONLY MODE" : "WRITE MODE");
}
ETLService::~ETLService()
{
stop();
LOG(log_.debug()) << "Destroying ETLng";
}
void
ETLService::run()
{
LOG(log_.info()) << "Running ETLng...";
mainLoop_.emplace(ctx_.execute([this] {
auto const rng = loadInitialLedgerIfNeeded();
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
std::optional<uint32_t> const mostRecentValidated = ledgers_->getMostRecent();
if (not mostRecentValidated) {
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return;
}
if (not rng.has_value()) {
LOG(log_.warn()) << "Initial ledger download got cancelled - stopping ETL service";
return;
}
auto const nextSequence = rng->maxSequence + 1;
LOG(log_.debug()) << "Database is populated. Starting monitor loop. sequence = " << nextSequence;
startMonitor(nextSequence);
// If we are a writer as the result of loading the initial ledger - start loading
if (state_->isWriting)
startLoading(nextSequence);
}));
}
void
ETLService::stop()
{
LOG(log_.info()) << "Stop called";
if (mainLoop_)
mainLoop_->wait();
if (taskMan_)
taskMan_->stop();
if (monitor_)
monitor_->stop();
}
boost::json::object
ETLService::getInfo() const
{
boost::json::object result;
result["etl_sources"] = balancer_->toJson();
result["is_writer"] = static_cast<int>(state_->isWriting);
result["read_only"] = static_cast<int>(state_->isStrictReadonly);
auto last = publisher_->getLastPublish();
if (last.time_since_epoch().count() != 0)
result["last_publish_age_seconds"] = std::to_string(publisher_->lastPublishAgeSeconds());
return result;
}
bool
ETLService::isAmendmentBlocked() const
{
return state_->isAmendmentBlocked;
}
bool
ETLService::isCorruptionDetected() const
{
return state_->isCorruptionDetected;
}
std::optional<etl::ETLState>
ETLService::getETLState() const
{
return balancer_->getETLState();
}
std::uint32_t
ETLService::lastCloseAgeSeconds() const
{
return publisher_->lastCloseAgeSeconds();
}
std::optional<data::LedgerRange>
ETLService::loadInitialLedgerIfNeeded()
{
auto rng = backend_->hardFetchLedgerRangeNoThrow();
if (not rng.has_value()) {
ASSERT(
not state_->isStrictReadonly,
"Database is empty but this node is in strict readonly mode. Can't write initial ledger."
);
LOG(log_.info()) << "Database is empty. Will download a ledger from the network.";
state_->isWriting = true; // immediately become writer as the db is empty
auto const getMostRecent = [this]() {
LOG(log_.info()) << "Waiting for next ledger to be validated by network...";
return ledgers_->getMostRecent();
};
if (auto const maybeSeq = startSequence_.or_else(getMostRecent); maybeSeq.has_value()) {
auto const seq = *maybeSeq;
LOG(log_.info()) << "Starting from sequence " << seq
<< ". Initial ledger download and extraction can take a while...";
auto [ledger, timeDiff] = ::util::timed<std::chrono::duration<double>>([this, seq]() {
return extractor_->extractLedgerOnly(seq).and_then(
[this, seq](auto&& data) -> std::optional<ripple::LedgerHeader> {
// TODO: loadInitialLedger in balancer should be called fetchEdgeKeys or similar
auto res = balancer_->loadInitialLedger(seq, *initialLoadObserver_);
if (not res.has_value() and res.error() == InitialLedgerLoadError::Cancelled) {
LOG(log_.debug()) << "Initial ledger load got cancelled";
return std::nullopt;
}
ASSERT(res.has_value(), "Initial ledger retry logic failed");
data.edgeKeys = std::move(res).value();
return loader_->loadInitialLedger(data);
}
);
});
if (not ledger.has_value()) {
LOG(log_.error()) << "Failed to load initial ledger. Exiting monitor loop";
return std::nullopt;
}
LOG(log_.debug()) << "Time to download and store ledger = " << timeDiff;
LOG(log_.info()) << "Finished loadInitialLedger. cache size = " << backend_->cache().size();
return backend_->hardFetchLedgerRangeNoThrow();
}
LOG(log_.info()) << "The wait for the next validated ledger has been aborted. "
"Exiting monitor loop";
return std::nullopt;
}
LOG(log_.info()) << "Database already populated. Picking up from the tip of history";
cacheLoader_->load(rng->maxSequence);
return rng;
}
void
ETLService::startMonitor(uint32_t seq)
{
monitor_ = monitorProvider_->make(ctx_, backend_, ledgers_, seq);
monitorNewSeqSubscription_ = monitor_->subscribeToNewSequence([this](uint32_t seq) {
LOG(log_.info()) << "ETLService (via Monitor) got new seq from db: " << seq;
if (state_->writeConflict) {
LOG(log_.info()) << "Got a write conflict; Giving up writer seat immediately";
giveUpWriter();
}
if (not state_->isWriting) {
auto const diff = data::synchronousAndRetryOnTimeout([this, seq](auto yield) {
return backend_->fetchLedgerDiff(seq, yield);
});
cacheUpdater_->update(seq, diff);
backend_->updateRange(seq);
}
publisher_->publish(seq, {});
});
monitorDbStalledSubscription_ = monitor_->subscribeToDbStalled([this]() {
LOG(log_.warn()) << "ETLService received DbStalled signal from Monitor";
if (not state_->isStrictReadonly and not state_->isWriting)
attemptTakeoverWriter();
});
monitor_->run();
}
void
ETLService::startLoading(uint32_t seq)
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
taskMan_ = taskManagerProvider_->make(ctx_, *monitor_, seq, finishSequence_);
// FIXME: this legacy name "extractor_threads" is no longer accurate (we have coroutines now)
taskMan_->run(config_.get().get<std::size_t>("extractor_threads"));
}
void
ETLService::attemptTakeoverWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
auto rng = backend_->hardFetchLedgerRangeNoThrow();
ASSERT(rng.has_value(), "Ledger range can't be null");
state_->isWriting = true; // switch to writer
LOG(log_.info()) << "Taking over the ETL writer seat";
startLoading(rng->maxSequence + 1);
}
void
ETLService::giveUpWriter()
{
ASSERT(not state_->isStrictReadonly, "This should only happen on writer nodes");
state_->isWriting = false;
state_->writeConflict = false;
taskMan_ = nullptr;
}
} // namespace etlng

199
src/etlng/ETLService.hpp Normal file
View File

@@ -0,0 +1,199 @@
//------------------------------------------------------------------------------
/*
This file is part of clio: https://github.com/XRPLF/clio
Copyright (c) 2025, the clio developers.
Permission to use, copy, modify, and distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#pragma once
#include "data/BackendInterface.hpp"
#include "data/Types.hpp"
#include "etl/ETLState.hpp"
#include "etl/NetworkValidatedLedgersInterface.hpp"
#include "etl/SystemState.hpp"
#include "etl/impl/AmendmentBlockHandler.hpp"
#include "etl/impl/LedgerFetcher.hpp"
#include "etlng/CacheLoaderInterface.hpp"
#include "etlng/CacheUpdaterInterface.hpp"
#include "etlng/ETLServiceInterface.hpp"
#include "etlng/ExtractorInterface.hpp"
#include "etlng/InitialLoadObserverInterface.hpp"
#include "etlng/LedgerPublisherInterface.hpp"
#include "etlng/LoadBalancerInterface.hpp"
#include "etlng/LoaderInterface.hpp"
#include "etlng/MonitorInterface.hpp"
#include "etlng/MonitorProviderInterface.hpp"
#include "etlng/TaskManagerInterface.hpp"
#include "etlng/TaskManagerProviderInterface.hpp"
#include "etlng/impl/AmendmentBlockHandler.hpp"
#include "etlng/impl/CacheUpdater.hpp"
#include "etlng/impl/Extraction.hpp"
#include "etlng/impl/LedgerPublisher.hpp"
#include "etlng/impl/Loading.hpp"
#include "etlng/impl/Registry.hpp"
#include "etlng/impl/Scheduling.hpp"
#include "etlng/impl/TaskManager.hpp"
#include "etlng/impl/ext/Cache.hpp"
#include "etlng/impl/ext/Core.hpp"
#include "etlng/impl/ext/NFT.hpp"
#include "etlng/impl/ext/Successor.hpp"
#include "util/async/AnyExecutionContext.hpp"
#include "util/async/AnyOperation.hpp"
#include "util/config/ConfigDefinition.hpp"
#include "util/log/Logger.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/json/object.hpp>
#include <boost/signals2/connection.hpp>
#include <fmt/format.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/basics/strHex.h>
#include <xrpl/proto/org/xrpl/rpc/v1/get_ledger.pb.h>
#include <xrpl/proto/org/xrpl/rpc/v1/ledger.pb.h>
#include <xrpl/protocol/LedgerHeader.h>
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/TxFormats.h>
#include <xrpl/protocol/TxMeta.h>
#include <cstddef>
#include <cstdint>
#include <functional>
#include <memory>
#include <optional>
#include <string>
namespace etlng {
/**
* @brief This class is responsible for continuously extracting data from a p2p node, and writing that data to the
* databases.
*
* Usually, multiple different processes share access to the same network accessible databases, in which case only one
* such process is performing ETL and writing to the database. The other processes simply monitor the database for new
* ledgers, and publish those ledgers to the various subscription streams. If a monitoring process determines that the
* ETL writer has failed (no new ledgers written for some time), the process will attempt to become the ETL writer.
*
* If there are multiple monitoring processes that try to become the ETL writer at the same time, one will win out, and
* the others will fall back to monitoring/publishing. In this sense, this class dynamically transitions from monitoring
* to writing and from writing to monitoring, based on the activity of other processes running on different machines.
*/
class ETLService : public ETLServiceInterface {
util::Logger log_{"ETL"};
util::async::AnyExecutionContext ctx_;
std::reference_wrapper<util::config::ClioConfigDefinition const> config_;
std::shared_ptr<BackendInterface> backend_;
std::shared_ptr<LoadBalancerInterface> balancer_;
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers_;
std::shared_ptr<LedgerPublisherInterface> publisher_;
std::shared_ptr<CacheLoaderInterface> cacheLoader_;
std::shared_ptr<CacheUpdaterInterface> cacheUpdater_;
std::shared_ptr<ExtractorInterface> extractor_;
std::shared_ptr<LoaderInterface> loader_;
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver_;
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider_;
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider_;
std::shared_ptr<etl::SystemState> state_;
std::optional<uint32_t> startSequence_;
std::optional<uint32_t> finishSequence_;
std::unique_ptr<MonitorInterface> monitor_;
std::unique_ptr<TaskManagerInterface> taskMan_;
boost::signals2::scoped_connection monitorNewSeqSubscription_;
boost::signals2::scoped_connection monitorDbStalledSubscription_;
std::optional<util::async::AnyOperation<void>> mainLoop_;
public:
/**
* @brief Create an instance of ETLService.
*
* @param ctx The execution context for asynchronous operations
* @param config The Clio configuration definition
* @param backend Interface to the backend database
* @param balancer Load balancer for distributing work
* @param ledgers Interface for accessing network validated ledgers
* @param publisher Interface for publishing ledger data
* @param cacheLoader Interface for loading cache data
* @param cacheUpdater Interface for updating cache data
* @param extractor The extractor to use
* @param loader Interface for loading data
* @param initialLoadObserver The observer for initial data loading
* @param taskManagerProvider The provider of the task manager instance
* @param monitorProvider The provider of the monitor instance
* @param state System state tracking object
*/
ETLService(
util::async::AnyExecutionContext ctx,
std::reference_wrapper<util::config::ClioConfigDefinition const> config,
std::shared_ptr<data::BackendInterface> backend,
std::shared_ptr<LoadBalancerInterface> balancer,
std::shared_ptr<etl::NetworkValidatedLedgersInterface> ledgers,
std::shared_ptr<LedgerPublisherInterface> publisher,
std::shared_ptr<CacheLoaderInterface> cacheLoader,
std::shared_ptr<CacheUpdaterInterface> cacheUpdater,
std::shared_ptr<ExtractorInterface> extractor,
std::shared_ptr<LoaderInterface> loader,
std::shared_ptr<InitialLoadObserverInterface> initialLoadObserver,
std::shared_ptr<etlng::TaskManagerProviderInterface> taskManagerProvider,
std::shared_ptr<etlng::MonitorProviderInterface> monitorProvider,
std::shared_ptr<etl::SystemState> state
);
~ETLService() override;
void
run() override;
void
stop() override;
boost::json::object
getInfo() const override;
bool
isAmendmentBlocked() const override;
bool
isCorruptionDetected() const override;
std::optional<etl::ETLState>
getETLState() const override;
std::uint32_t
lastCloseAgeSeconds() const override;
private:
std::optional<data::LedgerRange>
loadInitialLedgerIfNeeded();
void
startMonitor(uint32_t seq);
void
startLoading(uint32_t seq);
void
attemptTakeoverWriter();
void
giveUpWriter();
};
} // namespace etlng

View File

@@ -26,7 +26,7 @@
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
namespace etl { namespace etlng {
/** /**
* @brief This is a base class for any ETL service implementations. * @brief This is a base class for any ETL service implementations.
@@ -77,7 +77,7 @@ struct ETLServiceInterface {
* @brief Get the etl nodes' state * @brief Get the etl nodes' state
* @return The etl nodes' state, nullopt if etl nodes are not connected * @return The etl nodes' state, nullopt if etl nodes are not connected
*/ */
[[nodiscard]] virtual std::optional<ETLState> [[nodiscard]] virtual std::optional<etl::ETLState>
getETLState() const = 0; getETLState() const = 0;
/** /**
@@ -89,4 +89,4 @@ struct ETLServiceInterface {
lastCloseAgeSeconds() const = 0; lastCloseAgeSeconds() const = 0;
}; };
} // namespace etl } // namespace etlng

View File

@@ -19,12 +19,12 @@
#pragma once #pragma once
#include "etl/Models.hpp" #include "etlng/Models.hpp"
#include <cstdint> #include <cstdint>
#include <optional> #include <optional>
namespace etl { namespace etlng {
/** /**
* @brief An interface for the Extractor * @brief An interface for the Extractor
@@ -51,4 +51,4 @@ struct ExtractorInterface {
extractLedgerOnly(uint32_t seq) = 0; extractLedgerOnly(uint32_t seq) = 0;
}; };
} // namespace etl } // namespace etlng

View File

@@ -19,7 +19,7 @@
#pragma once #pragma once
#include "etl/Models.hpp" #include "etlng/Models.hpp"
#include <xrpl/protocol/LedgerHeader.h> #include <xrpl/protocol/LedgerHeader.h>
@@ -28,7 +28,7 @@
#include <string> #include <string>
#include <vector> #include <vector>
namespace etl { namespace etlng {
/** /**
* @brief The interface for observing the initial ledger load * @brief The interface for observing the initial ledger load
@@ -51,4 +51,4 @@ struct InitialLoadObserverInterface {
) = 0; ) = 0;
}; };
} // namespace etl } // namespace etlng

Some files were not shown because too many files have changed in this diff Show More