Compare commits

..

47 Commits

Author SHA1 Message Date
Ed Hennis
6fc972746d Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-24 17:35:00 -04:00
Ed Hennis
930afbdea8 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 18:50:00 -04:00
Ed Hennis
95c5bef48b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 18:26:19 -04:00
Ed Hennis
b489b6c3ce Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 17:32:02 -04:00
Ed Hennis
70765acef4 Merge remote-tracking branch 'upstream/develop' into ximinez/acquireAsyncDispatch
* upstream/develop:
  ci: [DEPENDABOT] bump tj-actions/changed-files from 46.0.5 to 47.0.4 (6394)
  ci: [DEPENDABOT] bump codecov/codecov-action from 5.4.3 to 5.5.2 (6398)
  ci: Build docs in PRs and in private repos (6400)
  ci: Add dependabot config (6379)
  Fix tautological assertion (6393)
  chore: Apply clang-format width 100 (6387)
2026-02-20 16:30:42 -05:00
Ed Hennis
77aa90bd0e Update formatting 2026-02-20 16:27:59 -05:00
Ed Hennis
4758bb6dc9 Merge commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c' into ximinez/acquireAsyncDispatch
* commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c':
  chore: Set clang-format width to 100 in config file (6387)
2026-02-20 16:27:39 -05:00
Ed Hennis
46e3dcb5fb Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-19 16:24:54 -05:00
Ed Hennis
d57579f10b Merge remote-tracking branch 'XRPLF/develop' into ximinez/acquireAsyncDispatch
* XRPLF/develop:
  refactor: Modularize app/tx (6228)
  refactor: Decouple app/tx from `Application` and `Config` (6227)
  chore: Update clang-format to 21.1.8 (6352)
  refactor: Modularize `HashRouter`, `Conditions`, and `OrderBookDB` (6226)
  chore: Fix minor issues in comments (6346)
  refactor: Modularize the NetworkOPs interface (6225)
  chore: Fix `gcov` lib coverage build failure on macOS (6350)
  refactor: Modularize RelationalDB (6224)
  refactor: Modularize WalletDB and Manifest (6223)
  fix: Update invariant checks for Permissioned Domains (6134)
  refactor: Change main thread name to `xrpld-main` (6336)
  refactor: Fix spelling issues in tests (6199)
  test: Add file and line location to Env (6276)
  chore: Remove CODEOWNERS (6337)
  perf: Remove unnecessary caches (5439)
  chore: Restore unity builds (6328)
  refactor: Update secp256k1 to 0.7.1 (6331)
  fix: Increment sequence when accepting new manifests (6059)
  fix typo in LendingHelpers unit-test (6215)
2026-02-18 19:52:18 -05:00
Ed Hennis
d8dd376d1c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-04 16:30:10 -04:00
Ed Hennis
a8c03e2e6c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-03 16:08:06 -04:00
Ed Hennis
2167a66bc7 Fix formatting 2026-01-28 19:39:15 -05:00
Ed Hennis
ed948a858c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-28 18:49:15 -04:00
Ed Hennis
608c102743 Merge commit '5f638f55536def0d88b970d1018a465a238e55f4' into ximinez/acquireAsyncDispatch
* commit '5f638f55536def0d88b970d1018a465a238e55f4':
  chore: Set ColumnLimit to 120 in clang-format (6288)
2026-01-28 17:47:53 -05:00
Ed Hennis
36d1607a4e Merge commit '92046785d1fea5f9efe5a770d636792ea6cab78b' into ximinez/acquireAsyncDispatch
* commit '92046785d1fea5f9efe5a770d636792ea6cab78b':
  test: Fix the `xrpl.net` unit test using async read (6241)
  ci: Upload Conan recipes for develop, release candidates, and releases (6286)
  fix: Stop embedded tests from hanging on ARM by using `atomic_flag` (6248)
  fix:  Remove DEFAULT fields that change to the default in associateAsset (6259) (6273)
  refactor: Update Boost to 1.90 (6280)
  refactor: clean up uses of `std::source_location` (6272)
  ci: Pass missing sanitizers input to actions (6266)
  ci: Properly propagate Conan credentials (6265)
  ci: Explicitly set version when exporting the Conan recipe (6264)
  ci: Use plus instead of hyphen for Conan recipe version suffix (6261)
  chore: Detect uninitialized variables in CMake files (6247)
  ci: Run on-trigger and on-pr when generate-version is modified (6257)
  refactor: Enforce 15-char limit and simplify labels for thread naming (6212)
  docs: Update Ripple Bug Bounty public key (6258)
  ci: Add missing commit hash to Conan recipe version (6256)
  fix: Include `<functional>` header in `Number.h` (6254)
  ci: Upload Conan recipe for merges into develop and commits to release (6235)
  Limit reply size on `TMGetObjectByHash` queries (6110)
  ci: remove 'master' branch as a trigger (6234)
  Improve ledger_entry lookups for fee, amendments, NUNL, and hashes (5644)
2026-01-28 17:47:47 -05:00
Ed Hennis
53ebb86d60 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-15 13:03:36 -04:00
Ed Hennis
1d989bc6de Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-15 12:06:00 -04:00
Ed Hennis
64c0cb8c7e Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-13 18:19:11 -04:00
Ed Hennis
c77cfef41c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-13 15:28:01 -04:00
Ed Hennis
08aa8c06d1 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-12 14:52:16 -04:00
Ed Hennis
9498672f8e Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-11 00:50:43 -04:00
Ed Hennis
e91d55a0e0 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-08 17:06:11 -04:00
Ed Hennis
afdc452cfc Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-08 13:04:20 -04:00
Ed Hennis
a0d4ef1a54 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-06 14:02:15 -05:00
Ed Hennis
8bc384f8bf Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-22 17:39:59 -05:00
Ed Hennis
bd961c484b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-18 19:59:52 -05:00
Ed Hennis
aee242a8d4 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-12 20:34:59 -05:00
Ed Hennis
fcae74de58 Merge remote-tracking branch 'XRPLF/develop' into ximinez/acquireAsyncDispatch
* XRPLF/develop:
  refactor: Rename `ripple` namespace to `xrpl` (5982)
  refactor: Move JobQueue and related classes into xrpl.core module (6121)
  refactor: Rename `rippled` binary to `xrpld` (5983)
  refactor: rename info() to header() (6138)
  refactor: rename `LedgerInfo` to `LedgerHeader` (6136)
  refactor: clean up `RPCHelpers` (5684)
  chore: Fix docs readme and cmake (6122)
  chore: Clean up .gitignore and .gitattributes (6001)
  chore: Use updated secp256k1 recipe (6118)
2025-12-11 15:33:12 -05:00
Ed Hennis
a56effcb00 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-05 21:13:10 -05:00
Ed Hennis
64c2eca465 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-02 17:37:29 -05:00
Ed Hennis
e56f750e1d Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-01 14:40:45 -05:00
Ed Hennis
fde000f3eb Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-28 15:46:44 -05:00
Ed Hennis
d0a62229da Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-27 01:48:56 -05:00
Ed Hennis
d5932cc7d4 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-26 00:25:17 -05:00
Ed Hennis
0b534da781 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-25 14:55:06 -05:00
Ed Hennis
71a70d343b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-24 21:49:11 -05:00
Ed Hennis
0899e65030 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-24 21:30:22 -05:00
Ed Hennis
31ba529761 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-21 12:47:58 -05:00
Ed Hennis
e2c6e5ebb6 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-18 22:39:29 -05:00
Ed Hennis
9d807fce48 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-15 03:08:41 -05:00
Ed Hennis
9ef160765c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-13 12:19:21 -05:00
Ed Hennis
d6c0eb243b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-12 14:12:55 -05:00
Ed Hennis
84c9fc123c Fix formatting 2025-11-10 19:53:05 -05:00
Ed Hennis
00a2a58cfa Add missing header 2025-11-10 19:53:05 -05:00
Ed Hennis
bb2098d873 Add a unit test for CanProcess
- Delete the copy ctor & operator
2025-11-10 19:53:05 -05:00
Ed Hennis
46a5bc74db refactor: acquireAsync will dispatch the job, not the other way around 2025-11-10 19:53:05 -05:00
Ed Hennis
7b72b9cc82 Improve job queue collision checks and logging
- Improve logging related to ledger acquisition and operating mode
  changes
- Class "CanProcess" to keep track of processing of distinct items
2025-11-10 19:53:05 -05:00
38 changed files with 626 additions and 4259 deletions

View File

@@ -1,143 +1,105 @@
---
Checks: "-*,
bugprone-argument-comment,
bugprone-assert-side-effect,
bugprone-bad-signal-to-kill-thread,
bugprone-bool-pointer-implicit-conversion,
bugprone-casting-through-void,
bugprone-chained-comparison,
bugprone-compare-pointer-to-member-virtual-function,
bugprone-copy-constructor-init,
bugprone-dangling-handle,
bugprone-dynamic-static-initializers,
bugprone-fold-init-type,
bugprone-forward-declaration-namespace,
bugprone-inaccurate-erase,
bugprone-incorrect-enable-if,
bugprone-incorrect-roundings,
bugprone-infinite-loop,
bugprone-integer-division,
bugprone-lambda-function-name,
bugprone-macro-parentheses,
bugprone-macro-repeated-side-effects,
bugprone-misplaced-operator-in-strlen-in-alloc,
bugprone-misplaced-pointer-arithmetic-in-alloc,
bugprone-misplaced-widening-cast,
bugprone-multi-level-implicit-pointer-conversion,
bugprone-multiple-new-in-one-expression,
bugprone-multiple-statement-macro,
bugprone-no-escape,
bugprone-non-zero-enum-to-bool-conversion,
bugprone-parent-virtual-call,
bugprone-posix-return,
bugprone-redundant-branch-condition,
bugprone-shared-ptr-array-mismatch,
bugprone-signal-handler,
bugprone-signed-char-misuse,
bugprone-sizeof-container,
bugprone-spuriously-wake-up-functions,
bugprone-standalone-empty,
bugprone-string-constructor,
bugprone-string-integer-assignment,
bugprone-string-literal-with-embedded-nul,
bugprone-stringview-nullptr,
bugprone-suspicious-enum-usage,
bugprone-suspicious-include,
bugprone-suspicious-memory-comparison,
bugprone-suspicious-memset-usage,
bugprone-suspicious-realloc-usage,
bugprone-suspicious-semicolon,
bugprone-suspicious-string-compare,
bugprone-swapped-arguments,
bugprone-terminating-continue,
bugprone-throw-keyword-missing,
bugprone-undefined-memory-manipulation,
bugprone-undelegated-constructor,
bugprone-unhandled-exception-at-new,
bugprone-unique-ptr-array-mismatch,
bugprone-unsafe-functions,
bugprone-virtual-near-miss,
cppcoreguidelines-no-suspend-with-lock,
cppcoreguidelines-virtual-class-destructor,
hicpp-ignored-remove-result,
misc-definitions-in-headers,
misc-header-include-cycle,
misc-misplaced-const,
misc-static-assert,
misc-throw-by-value-catch-by-reference,
misc-unused-alias-decls,
misc-unused-using-decls,
readability-duplicate-include,
readability-enum-initial-value,
readability-misleading-indentation,
readability-non-const-parameter,
readability-redundant-declaration,
readability-reference-to-constructed-temporary,
modernize-deprecated-headers,
modernize-make-shared,
modernize-make-unique,
performance-implicit-conversion-in-loop,
performance-move-constructor-init,
performance-trivially-destructible
bugprone-argument-comment
"
# ---
# checks that have some issues that need to be resolved:
#
# bugprone-empty-catch,
# bugprone-assert-side-effect,
# bugprone-bad-signal-to-kill-thread,
# bugprone-bool-pointer-implicit-conversion,
# bugprone-casting-through-void,
# bugprone-chained-comparison,
# bugprone-compare-pointer-to-member-virtual-function,
# bugprone-copy-constructor-init,
# bugprone-crtp-constructor-accessibility,
# bugprone-dangling-handle,
# bugprone-dynamic-static-initializers,
# bugprone-empty-catch,
# bugprone-fold-init-type,
# bugprone-forward-declaration-namespace,
# bugprone-inaccurate-erase,
# bugprone-inc-dec-in-conditions,
# bugprone-reserved-identifier,
# bugprone-incorrect-enable-if,
# bugprone-incorrect-roundings,
# bugprone-infinite-loop,
# bugprone-integer-division,
# bugprone-lambda-function-name,
# bugprone-macro-parentheses,
# bugprone-macro-repeated-side-effects,
# bugprone-misplaced-operator-in-strlen-in-alloc,
# bugprone-misplaced-pointer-arithmetic-in-alloc,
# bugprone-misplaced-widening-cast,
# bugprone-move-forwarding-reference,
# bugprone-unused-local-non-trivial-variable,
# bugprone-return-const-ref-from-parameter,
# bugprone-switch-missing-default-case,
# bugprone-sizeof-expression,
# bugprone-suspicious-stringview-data-usage,
# bugprone-suspicious-missing-comma,
# bugprone-pointer-arithmetic-on-polymorphic-object,
# bugprone-multi-level-implicit-pointer-conversion,
# bugprone-multiple-new-in-one-expression,
# bugprone-multiple-statement-macro,
# bugprone-no-escape,
# bugprone-non-zero-enum-to-bool-conversion,
# bugprone-optional-value-conversion,
# bugprone-parent-virtual-call,
# bugprone-pointer-arithmetic-on-polymorphic-object,
# bugprone-posix-return,
# bugprone-redundant-branch-condition,
# bugprone-reserved-identifier,
# bugprone-return-const-ref-from-parameter,
# bugprone-shared-ptr-array-mismatch,
# bugprone-signal-handler,
# bugprone-signed-char-misuse,
# bugprone-sizeof-container,
# bugprone-sizeof-expression,
# bugprone-spuriously-wake-up-functions,
# bugprone-standalone-empty,
# bugprone-string-constructor,
# bugprone-string-integer-assignment,
# bugprone-string-literal-with-embedded-nul,
# bugprone-stringview-nullptr,
# bugprone-suspicious-enum-usage,
# bugprone-suspicious-include,
# bugprone-suspicious-memory-comparison,
# bugprone-suspicious-memset-usage,
# bugprone-suspicious-missing-comma,
# bugprone-suspicious-realloc-usage,
# bugprone-suspicious-semicolon,
# bugprone-suspicious-string-compare,
# bugprone-suspicious-stringview-data-usage,
# bugprone-swapped-arguments,
# bugprone-switch-missing-default-case,
# bugprone-terminating-continue,
# bugprone-throw-keyword-missing,
# bugprone-too-small-loop-variable,
# bugprone-undefined-memory-manipulation,
# bugprone-undelegated-constructor,
# bugprone-unhandled-exception-at-new,
# bugprone-unhandled-self-assignment,
# bugprone-unique-ptr-array-mismatch,
# bugprone-unsafe-functions,
# bugprone-unused-local-non-trivial-variable,
# bugprone-unused-raii,
# bugprone-unused-return-value,
# bugprone-use-after-move,
# bugprone-unhandled-self-assignment,
# bugprone-unused-raii,
#
# cppcoreguidelines-misleading-capture-default-by-value,
# bugprone-virtual-near-miss,
# cppcoreguidelines-init-variables,
# cppcoreguidelines-misleading-capture-default-by-value,
# cppcoreguidelines-no-suspend-with-lock,
# cppcoreguidelines-pro-type-member-init,
# cppcoreguidelines-pro-type-static-cast-downcast,
# cppcoreguidelines-use-default-member-init,
# cppcoreguidelines-rvalue-reference-param-not-moved,
#
# cppcoreguidelines-use-default-member-init,
# cppcoreguidelines-virtual-class-destructor,
# hicpp-ignored-remove-result,
# llvm-namespace-comment,
# misc-const-correctness,
# misc-definitions-in-headers,
# misc-header-include-cycle,
# misc-include-cleaner,
# misc-misplaced-const,
# misc-redundant-expression,
#
# readability-avoid-nested-conditional-operator,
# readability-avoid-return-with-void-value,
# readability-braces-around-statements,
# readability-container-contains,
# readability-container-size-empty,
# readability-convert-member-functions-to-static,
# readability-const-return-type,
# readability-else-after-return,
# readability-implicit-bool-conversion,
# readability-inconsistent-declaration-parameter-name,
# readability-identifier-naming,
# readability-make-member-function-const,
# readability-math-missing-parentheses,
# readability-redundant-inline-specifier,
# readability-redundant-member-init,
# readability-redundant-casting,
# readability-redundant-string-init,
# readability-simplify-boolean-expr,
# readability-static-definition-in-anonymous-namespace,
# readability-suspicious-call-argument,
# readability-use-std-min-max,
# readability-static-accessed-through-instance,
#
# misc-static-assert,
# misc-throw-by-value-catch-by-reference,
# misc-unused-alias-decls,
# misc-unused-using-decls,
# modernize-concat-nested-namespaces,
# modernize-deprecated-headers,
# modernize-make-shared,
# modernize-make-unique,
# modernize-pass-by-value,
# modernize-type-traits,
# modernize-use-designated-initializers,
@@ -149,50 +111,79 @@ Checks: "-*,
# modernize-use-starts-ends-with,
# modernize-use-std-numbers,
# modernize-use-using,
#
# performance-faster-string-find,
# performance-for-range-copy,
# performance-implicit-conversion-in-loop,
# performance-inefficient-vector-operation,
# performance-move-const-arg,
# performance-move-constructor-init,
# performance-no-automatic-move,
# ---
# performance-trivially-destructible,
# readability-avoid-nested-conditional-operator,
# readability-avoid-return-with-void-value,
# readability-braces-around-statements,
# readability-const-return-type,
# readability-container-contains,
# readability-container-size-empty,
# readability-convert-member-functions-to-static,
# readability-duplicate-include,
# readability-else-after-return,
# readability-enum-initial-value,
# readability-implicit-bool-conversion,
# readability-inconsistent-declaration-parameter-name,
# readability-identifier-naming,
# readability-make-member-function-const,
# readability-math-missing-parentheses,
# readability-misleading-indentation,
# readability-non-const-parameter,
# readability-redundant-casting,
# readability-redundant-declaration,
# readability-redundant-inline-specifier,
# readability-redundant-member-init,
# readability-redundant-string-init,
# readability-reference-to-constructed-temporary,
# readability-simplify-boolean-expr,
# readability-static-accessed-through-instance,
# readability-static-definition-in-anonymous-namespace,
# readability-suspicious-call-argument,
# readability-use-std-min-max
#
CheckOptions:
# readability-braces-around-statements.ShortStatementLines: 2
# readability-identifier-naming.MacroDefinitionCase: UPPER_CASE
# readability-identifier-naming.ClassCase: CamelCase
# readability-identifier-naming.StructCase: CamelCase
# readability-identifier-naming.UnionCase: CamelCase
# readability-identifier-naming.EnumCase: CamelCase
# readability-identifier-naming.EnumConstantCase: CamelCase
# readability-identifier-naming.ScopedEnumConstantCase: CamelCase
# readability-identifier-naming.GlobalConstantCase: UPPER_CASE
# readability-identifier-naming.GlobalConstantPrefix: "k"
# readability-identifier-naming.GlobalVariableCase: CamelCase
# readability-identifier-naming.GlobalVariablePrefix: "g"
# readability-identifier-naming.ConstexprFunctionCase: camelBack
# readability-identifier-naming.ConstexprMethodCase: camelBack
# readability-identifier-naming.ClassMethodCase: camelBack
# readability-identifier-naming.ClassMemberCase: camelBack
# readability-identifier-naming.ClassConstantCase: UPPER_CASE
# readability-identifier-naming.ClassConstantPrefix: "k"
# readability-identifier-naming.StaticConstantCase: UPPER_CASE
# readability-identifier-naming.StaticConstantPrefix: "k"
# readability-identifier-naming.StaticVariableCase: UPPER_CASE
# readability-identifier-naming.StaticVariablePrefix: "k"
# readability-identifier-naming.ConstexprVariableCase: UPPER_CASE
# readability-identifier-naming.ConstexprVariablePrefix: "k"
# readability-identifier-naming.LocalConstantCase: camelBack
# readability-identifier-naming.LocalVariableCase: camelBack
# readability-identifier-naming.TemplateParameterCase: CamelCase
# readability-identifier-naming.ParameterCase: camelBack
# readability-identifier-naming.FunctionCase: camelBack
# readability-identifier-naming.MemberCase: camelBack
# readability-identifier-naming.PrivateMemberSuffix: _
# readability-identifier-naming.ProtectedMemberSuffix: _
# readability-identifier-naming.PublicMemberSuffix: ""
# readability-identifier-naming.FunctionIgnoredRegexp: ".*tag_invoke.*"
bugprone-unsafe-functions.ReportMoreUnsafeFunctions: true
# CheckOptions:
# readability-braces-around-statements.ShortStatementLines: 2
# readability-identifier-naming.MacroDefinitionCase: UPPER_CASE
# readability-identifier-naming.ClassCase: CamelCase
# readability-identifier-naming.StructCase: CamelCase
# readability-identifier-naming.UnionCase: CamelCase
# readability-identifier-naming.EnumCase: CamelCase
# readability-identifier-naming.EnumConstantCase: CamelCase
# readability-identifier-naming.ScopedEnumConstantCase: CamelCase
# readability-identifier-naming.GlobalConstantCase: UPPER_CASE
# readability-identifier-naming.GlobalConstantPrefix: "k"
# readability-identifier-naming.GlobalVariableCase: CamelCase
# readability-identifier-naming.GlobalVariablePrefix: "g"
# readability-identifier-naming.ConstexprFunctionCase: camelBack
# readability-identifier-naming.ConstexprMethodCase: camelBack
# readability-identifier-naming.ClassMethodCase: camelBack
# readability-identifier-naming.ClassMemberCase: camelBack
# readability-identifier-naming.ClassConstantCase: UPPER_CASE
# readability-identifier-naming.ClassConstantPrefix: "k"
# readability-identifier-naming.StaticConstantCase: UPPER_CASE
# readability-identifier-naming.StaticConstantPrefix: "k"
# readability-identifier-naming.StaticVariableCase: UPPER_CASE
# readability-identifier-naming.StaticVariablePrefix: "k"
# readability-identifier-naming.ConstexprVariableCase: UPPER_CASE
# readability-identifier-naming.ConstexprVariablePrefix: "k"
# readability-identifier-naming.LocalConstantCase: camelBack
# readability-identifier-naming.LocalVariableCase: camelBack
# readability-identifier-naming.TemplateParameterCase: CamelCase
# readability-identifier-naming.ParameterCase: camelBack
# readability-identifier-naming.FunctionCase: camelBack
# readability-identifier-naming.MemberCase: camelBack
# readability-identifier-naming.PrivateMemberSuffix: _
# readability-identifier-naming.ProtectedMemberSuffix: _
# readability-identifier-naming.PublicMemberSuffix: ""
# readability-identifier-naming.FunctionIgnoredRegexp: ".*tag_invoke.*"
# bugprone-unsafe-functions.ReportMoreUnsafeFunctions: true
# bugprone-unused-return-value.CheckedReturnTypes: ::std::error_code;::std::error_condition;::std::errc
# misc-include-cleaner.IgnoreHeaders: '.*/(detail|impl)/.*;.*(expected|unexpected).*;.*ranges_lower_bound\.h;time.h;stdlib.h;__chrono/.*;fmt/chrono.h;boost/uuid/uuid_hash.hpp'
#

View File

@@ -101,7 +101,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2
@@ -229,21 +229,8 @@ jobs:
env:
BUILD_NPROC: ${{ steps.nproc.outputs.nproc }}
run: |
set -o pipefail
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
./xrpld --unittest --unittest-jobs "${BUILD_NPROC}"
- name: Show test failure summary
if: ${{ failure() && !inputs.build_only }}
working-directory: ${{ runner.os == 'Windows' && format('{0}/{1}', env.BUILD_DIR, inputs.build_type) || env.BUILD_DIR }}
run: |
if [ ! -f unittest.log ]; then
echo "unittest.log not found; embedded tests may not have run."
exit 0
fi
if ! grep -E "failed" unittest.log; then
echo "Log present but no failure lines found in unittest.log."
fi
- name: Debug failure (Linux)
if: ${{ failure() && runner.os == 'Linux' && !inputs.build_only }}
run: |

View File

@@ -78,9 +78,9 @@ jobs:
id: run_clang_tidy
continue-on-error: true
env:
TARGETS: ${{ inputs.files != '' && inputs.files || 'src tests' }}
FILES: ${{ inputs.files }}
run: |
run-clang-tidy -j ${{ steps.nproc.outputs.nproc }} -p "${BUILD_DIR}" ${TARGETS} 2>&1 | tee clang-tidy-output.txt
run-clang-tidy -j ${{ steps.nproc.outputs.nproc }} -p "$BUILD_DIR" $FILES 2>&1 | tee clang-tidy-output.txt
- name: Upload clang-tidy output
if: steps.run_clang_tidy.outcome != 'success'

View File

@@ -22,8 +22,7 @@ jobs:
if: ${{ inputs.check_only_changed }}
runs-on: ubuntu-latest
outputs:
clang_tidy_config_changed: ${{ steps.changed_clang_tidy.outputs.any_changed }}
any_cpp_changed: ${{ steps.changed_files.outputs.any_changed }}
any_changed: ${{ steps.changed_files.outputs.any_changed }}
all_changed_files: ${{ steps.changed_files.outputs.all_changed_files }}
steps:
- name: Checkout repository
@@ -39,17 +38,10 @@ jobs:
**/*.ipp
separator: " "
- name: Get changed clang-tidy configuration
id: changed_clang_tidy
uses: tj-actions/changed-files@7dee1b0c1557f278e5c7dc244927139d78c0e22a # v47.0.4
with:
files: |
.clang-tidy
run-clang-tidy:
needs: [determine-files]
if: ${{ always() && !cancelled() && (!inputs.check_only_changed || needs.determine-files.outputs.any_cpp_changed == 'true' || needs.determine-files.outputs.clang_tidy_config_changed == 'true') }}
if: ${{ always() && !cancelled() && (!inputs.check_only_changed || needs.determine-files.outputs.any_changed == 'true') }}
uses: ./.github/workflows/reusable-clang-tidy-files.yml
with:
files: ${{ (needs.determine-files.outputs.clang_tidy_config_changed == 'true' && '') || (inputs.check_only_changed && needs.determine-files.outputs.all_changed_files || '') }}
files: ${{ inputs.check_only_changed && needs.determine-files.outputs.all_changed_files || '' }}
create_issue_on_failure: ${{ inputs.create_issue_on_failure }}

View File

@@ -64,7 +64,7 @@ jobs:
steps:
- name: Cleanup workspace (macOS and Windows)
if: ${{ runner.os == 'macOS' || runner.os == 'Windows' }}
uses: XRPLF/actions/cleanup-workspace@c7d9ce5ebb03c752a354889ecd870cadfc2b1cd4
uses: XRPLF/actions/cleanup-workspace@cf0433aa74563aead044a1e395610c96d65a37cf
- name: Checkout repository
uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2

File diff suppressed because it is too large Load Diff

View File

@@ -251,29 +251,6 @@ pip3 install pre-commit
pre-commit install
```
## Clang-tidy
All code must pass `clang-tidy` checks according to the settings in [`.clang-tidy`](./.clang-tidy).
There is a Continuous Integration job that runs clang-tidy on pull requests. The CI will check:
- All changed C++ files (`.cpp`, `.h`, `.ipp`) when only code files are modified
- **All files in the repository** when the `.clang-tidy` configuration file is changed
This ensures that configuration changes don't introduce new warnings across the codebase.
### Running clang-tidy locally
Before running clang-tidy, you must build the project to generate required files (particularly protobuf headers). Refer to [`BUILD.md`](./BUILD.md) for build instructions.
Then run clang-tidy on your local changes:
```
run-clang-tidy -p build src tests
```
This will check all source files in the `src` and `tests` directories using the compile commands from your `build` directory.
## Contracts and instrumentation
We are using [Antithesis](https://antithesis.com/) for continuous fuzzing,

View File

@@ -71,7 +71,6 @@ words:
- coldwallet
- compr
- conanfile
- cppcoro
- conanrun
- confs
- connectability
@@ -102,11 +101,9 @@ words:
- Falco
- finalizers
- firewalled
- fcontext
- fmtdur
- fsanitize
- funclets
- gantt
- gcov
- gcovr
- ghead
@@ -188,7 +185,6 @@ words:
- ostr
- pargs
- partitioner
- pratik
- paychan
- paychans
- permdex
@@ -210,7 +206,6 @@ words:
- queuable
- Raphson
- replayer
- repost
- rerere
- retriable
- RIPD
@@ -241,7 +236,6 @@ words:
- soci
- socidb
- sslws
- stackful
- statsd
- STATSDCOLLECTOR
- stissue

View File

@@ -0,0 +1,139 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
CanProcess(CanProcess const&) = delete;
CanProcess&
operator=(CanProcess const&) = delete;
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -1,687 +0,0 @@
#pragma once
#include <coroutine>
#include <exception>
#include <utility>
#include <variant>
namespace xrpl {
template <typename T = void>
class CoroTask;
/**
* CoroTask<void> -- coroutine return type for void-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<void>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - exception_ : std::exception_ptr |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_void() |
* | + unhandled_exception() |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter
* +-----------------------------------------------+
* | await_suspend(h): |
* | if continuation_ set -> symmetric transfer |
* | else -> noop_coroutine |
* +-----------------------------------------------+
*
* Design Notes
* ------------
* - Lazy start: initial_suspend returns suspend_always, so the coroutine
* body does not execute until the handle is explicitly resumed.
* - Symmetric transfer: await_suspend returns a coroutine_handle instead
* of void/bool, allowing the scheduler to jump directly to the next
* coroutine without growing the call stack.
* - Continuation chaining: when one CoroTask is co_await-ed inside
* another, the caller's handle is stored as continuation_ so
* FinalAwaiter can resume it when this task finishes.
* - Move-only: the handle is exclusively owned; copy is deleted.
*
* Usage Examples
* ==============
*
* 1. Basic void coroutine (the most common case in rippled):
*
* CoroTask<void> doWork(std::shared_ptr<CoroTaskRunner> runner) {
* // do something
* co_await runner->suspend(); // yield control
* // resumed later via runner->post() or runner->resume()
* co_return;
* }
*
* 2. co_await-ing one CoroTask<void> from another (chaining):
*
* CoroTask<void> inner() {
* // ...
* co_return;
* }
* CoroTask<void> outer() {
* co_await inner(); // continuation_ links outer -> inner
* co_return; // FinalAwaiter resumes outer
* }
*
* 3. Exceptions propagate through co_await:
*
* CoroTask<void> failing() {
* throw std::runtime_error("oops");
* co_return;
* }
* CoroTask<void> caller() {
* try { co_await failing(); }
* catch (std::runtime_error const&) { // caught here }
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Dangling references in coroutine parameters.
* Coroutine parameters are copied into the frame, but references
* are NOT -- they are stored as-is. If the referent goes out of scope
* before the coroutine finishes, you get use-after-free.
*
* // BROKEN -- local dies before coroutine runs:
* CoroTask<void> bad(int& ref) { co_return; }
* void launch() {
* int local = 42;
* auto task = bad(local); // frame stores &local
* } // local destroyed; frame holds dangling ref
*
* // FIX -- pass by value, or ensure lifetime via shared_ptr.
*
* BUG-RISK: GCC 14 corrupts reference captures in coroutine lambdas.
* When a lambda that returns CoroTask captures by reference ([&]),
* GCC 14 may generate a corrupted coroutine frame. Always capture
* by explicit pointer-to-value instead:
*
* // BROKEN on GCC 14:
* jq.postCoroTask(t, n, [&](auto) -> CoroTask<void> { ... });
*
* // FIX -- capture pointers explicitly:
* jq.postCoroTask(t, n, [ptr = &val](auto) -> CoroTask<void> { ... });
*
* BUG-RISK: Resuming a destroyed or completed CoroTask.
* Calling handle().resume() after the coroutine has already run to
* completion (done() == true) is undefined behavior. The CoroTaskRunner
* guards against this with an XRPL_ASSERT, but standalone usage of
* CoroTask must check done() before resuming.
*
* BUG-RISK: Moving a CoroTask that is being awaited.
* If task A is co_await-ed by task B (so A.continuation_ == B), moving
* or destroying A will invalidate the continuation link. Never move
* or reassign a CoroTask while it is mid-execution or being awaited.
*
* LIMITATION: CoroTask is fire-and-forget for the top-level owner.
* There is no built-in notification when the coroutine finishes.
* The caller must use external synchronization (e.g. CoroTaskRunner::join
* or a gate/condition_variable) to know when it is done.
*
* LIMITATION: No cancellation token.
* There is no way to cancel a suspended CoroTask from outside. The
* coroutine body must cooperatively check a flag (e.g. jq_.isStopping())
* after each co_await and co_return early if needed.
*
* LIMITATION: Stackless -- cannot suspend from nested non-coroutine calls.
* If a coroutine calls a regular function that wants to "yield", it
* cannot. Only the immediate coroutine body can use co_await.
* This is acceptable for rippled because all yield() sites are shallow.
*/
template <>
class CoroTask<void>
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise. Compiler uses this to manage coroutine state.
* Stores the exception (if any) and the continuation handle for
* symmetric transfer back to the awaiting coroutine.
*/
struct promise_type
{
// Captured exception from the coroutine body, rethrown in
// await_resume() when this task is co_await-ed by a caller.
std::exception_ptr exception_;
// Handle to the coroutine that is co_await-ing this task.
// Set by await_suspend(). FinalAwaiter uses it for symmetric
// transfer back to the caller. Null if this is a top-level task.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. The coroutine body does not execute until the
* handle is explicitly resumed (e.g. by CoroTaskRunner::resume).
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Awaiter returned by final_suspend(). Uses symmetric transfer:
* if a continuation exists, transfers control directly to it
* (tail-call, no stack growth). Otherwise returns noop_coroutine
* so the coroutine frame stays alive for the owner to destroy.
*/
struct FinalAwaiter
{
/**
* Always false. We need await_suspend to run for
* symmetric transfer.
*/
bool
await_ready() noexcept
{
return false;
}
/**
* Symmetric transfer: returns the continuation handle so
* the compiler emits a tail-call instead of a nested resume.
* If no continuation is set, returns noop_coroutine to
* suspend at final_suspend without destroying the frame.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
/**
* Returns FinalAwaiter for symmetric transfer at coroutine end.
*/
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return;` (void coroutine).
*/
void
return_void()
{
}
/**
* Called by the compiler when an exception escapes the coroutine
* body. Captures it for later rethrowing in await_resume().
*/
void
unhandled_exception()
{
exception_ = std::current_exception();
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `co_await someCoroTask;` --
/**
* Always false. This task is lazy, so co_await always suspends
* the caller to set up the continuation link.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores the caller's handle as our continuation, then returns
* our handle for symmetric transfer (caller suspends, we resume).
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_; // Symmetric transfer
}
/**
* Called when the caller resumes after co_await. Rethrows any
* exception captured by unhandled_exception().
*/
void
await_resume()
{
if (auto& ep = handle_.promise().exception_)
std::rethrow_exception(ep);
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
/**
* CoroTask<T> -- coroutine return type for value-returning coroutines.
*
* Class / Dependency Diagram
* ==========================
*
* CoroTask<T>
* +-----------------------------------------------+
* | - handle_ : Handle (coroutine_handle<promise>) |
* +-----------------------------------------------+
* | + handle(), done() |
* | + await_ready/suspend/resume (Awaiter iface) |
* +-----------------------------------------------+
* | owns
* v
* promise_type
* +-----------------------------------------------+
* | - result_ : variant<monostate, T, |
* | exception_ptr> |
* | - continuation_ : std::coroutine_handle<> |
* +-----------------------------------------------+
* | + get_return_object() -> CoroTask |
* | + initial_suspend() -> suspend_always (lazy) |
* | + final_suspend() -> FinalAwaiter |
* | + return_value(T) -> stores in result_[1] |
* | + unhandled_exception -> stores in result_[2] |
* +-----------------------------------------------+
* | returns at final_suspend
* v
* FinalAwaiter (same symmetric-transfer pattern as CoroTask<void>)
*
* Value Extraction
* ----------------
* await_resume() inspects the variant:
* - index 2 (exception_ptr) -> rethrow
* - index 1 (T) -> return value via move
*
* Usage Examples
* ==============
*
* 1. Simple value return:
*
* CoroTask<int> computeAnswer() { co_return 42; }
*
* CoroTask<void> caller() {
* int v = co_await computeAnswer(); // v == 42
* }
*
* 2. Chaining value-returning coroutines:
*
* CoroTask<int> add(int a, int b) { co_return a + b; }
* CoroTask<int> doubleSum(int a, int b) {
* int s = co_await add(a, b);
* co_return s * 2;
* }
*
* 3. Exception propagation from inner to outer:
*
* CoroTask<int> failing() {
* throw std::runtime_error("bad");
* co_return 0; // never reached
* }
* CoroTask<void> caller() {
* try {
* int v = co_await failing(); // throws here
* } catch (std::runtime_error const& e) {
* // e.what() == "bad"
* }
* }
*
* Caveats / Pitfalls (in addition to CoroTask<void> caveats above)
* ================================================================
*
* BUG-RISK: await_resume() moves the value out of the variant.
* Calling co_await on the same CoroTask<T> instance twice is undefined
* behavior -- the second call will see a moved-from T. CoroTask is
* single-shot: one co_return, one co_await.
*
* BUG-RISK: T must be move-constructible.
* return_value(T) takes by value and moves into the variant.
* Types that are not movable cannot be used as T.
*
* LIMITATION: No co_yield support.
* CoroTask<T> only supports a single co_return. It does not implement
* yield_value(), so using co_yield inside a CoroTask<T> coroutine is a
* compile error. For streaming values, a different return type
* (e.g. Generator<T>) would be needed.
*
* LIMITATION: Result is only accessible via co_await.
* There is no .get() or .result() method. The value can only be
* extracted by co_await-ing the CoroTask<T> from inside another
* coroutine. For extracting results in non-coroutine code, pass a
* pointer to the caller and write through it (as the tests do).
*/
template <typename T>
class CoroTask
{
public:
struct promise_type;
using Handle = std::coroutine_handle<promise_type>;
/**
* Coroutine promise for value-returning coroutines.
* Stores the result as a variant: monostate (not yet set),
* T (co_return value), or exception_ptr (unhandled exception).
*/
struct promise_type
{
// Tri-state result:
// index 0 (monostate) -- coroutine has not yet completed
// index 1 (T) -- co_return value stored here
// index 2 (exception) -- unhandled exception captured here
std::variant<std::monostate, T, std::exception_ptr> result_;
// Handle to the coroutine co_await-ing this task. Used by
// FinalAwaiter for symmetric transfer. Null for top-level tasks.
std::coroutine_handle<> continuation_;
/**
* Create the CoroTask return object.
* Called by the compiler at coroutine creation.
*/
CoroTask
get_return_object()
{
return CoroTask{Handle::from_promise(*this)};
}
/**
* Lazy start. Coroutine body does not run until explicitly resumed.
*/
std::suspend_always
initial_suspend() noexcept
{
return {};
}
/**
* Symmetric-transfer awaiter at coroutine completion.
* Same pattern as CoroTask<void>::FinalAwaiter.
*/
struct FinalAwaiter
{
bool
await_ready() noexcept
{
return false;
}
/**
* Returns continuation for symmetric transfer, or
* noop_coroutine if this is a top-level task.
*
* @param h Handle to this completing coroutine
*
* @return Continuation handle, or noop_coroutine
*/
std::coroutine_handle<>
await_suspend(Handle h) noexcept
{
if (auto cont = h.promise().continuation_)
return cont;
return std::noop_coroutine();
}
void
await_resume() noexcept
{
}
};
FinalAwaiter
final_suspend() noexcept
{
return {};
}
/**
* Called by the compiler for `co_return value;`.
* Moves the value into result_ at index 1.
*
* @param value The value to store
*/
void
return_value(T value)
{
result_.template emplace<1>(std::move(value));
}
/**
* Captures unhandled exceptions at index 2 of result_.
* Rethrown later in await_resume().
*/
void
unhandled_exception()
{
result_.template emplace<2>(std::current_exception());
}
};
/**
* Default constructor. Creates an empty (null handle) task.
*/
CoroTask() = default;
/**
* Takes ownership of a compiler-generated coroutine handle.
*
* @param h Coroutine handle to own
*/
explicit CoroTask(Handle h) : handle_(h)
{
}
/**
* Destroys the coroutine frame if this task owns one.
*/
~CoroTask()
{
if (handle_)
handle_.destroy();
}
/**
* Move constructor. Transfers handle ownership, leaves other empty.
*/
CoroTask(CoroTask&& other) noexcept : handle_(std::exchange(other.handle_, {}))
{
}
/**
* Move assignment. Destroys current frame (if any), takes other's.
*/
CoroTask&
operator=(CoroTask&& other) noexcept
{
if (this != &other)
{
if (handle_)
handle_.destroy();
handle_ = std::exchange(other.handle_, {});
}
return *this;
}
CoroTask(CoroTask const&) = delete;
CoroTask&
operator=(CoroTask const&) = delete;
/**
* @return The underlying coroutine_handle
*/
Handle
handle() const
{
return handle_;
}
/**
* @return true if the coroutine has run to completion (or thrown)
*/
bool
done() const
{
return handle_ && handle_.done();
}
// -- Awaiter interface: allows `T val = co_await someCoroTask;` --
/**
* Always false. co_await always suspends to set up continuation.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Stores caller as continuation, returns our handle for
* symmetric transfer.
*
* @param caller Handle of the coroutine doing co_await on us
*
* @return Our handle for symmetric transfer
*/
std::coroutine_handle<>
await_suspend(std::coroutine_handle<> caller) noexcept
{
handle_.promise().continuation_ = caller;
return handle_;
}
/**
* Extracts the result: rethrows if exception, otherwise moves
* the T value out of the variant. Single-shot: calling twice
* on the same task is undefined (moved-from T).
*
* @return The co_return-ed value
*/
T
await_resume()
{
auto& result = handle_.promise().result_;
if (auto* ep = std::get_if<2>(&result))
std::rethrow_exception(*ep);
return std::get<1>(std::move(result));
}
private:
// Exclusively-owned coroutine handle. Null after move or default
// construction. Destroyed in the destructor.
Handle handle_;
};
} // namespace xrpl

View File

@@ -1,329 +0,0 @@
#pragma once
/**
* @file CoroTaskRunner.ipp
*
* CoroTaskRunner inline implementation.
*
* This file contains the business logic for managing C++20 coroutines
* on the JobQueue. It is included at the bottom of JobQueue.h.
*
* Data Flow: suspend / post / resume cycle
* =========================================
*
* coroutine body CoroTaskRunner JobQueue
* -------------- -------------- --------
* |
* co_await runner->suspend()
* |
* +--- await_suspend ------> onSuspend()
* | ++nSuspend_ ------------> nSuspend_
* | [coroutine is now suspended]
* |
* . (externally or by JobQueueAwaiter)
* .
* +--- (caller calls) -----> post()
* | ++runCount_
* | addJob(resume) ----------> job enqueued
* | |
* | [worker picks up]
* | |
* +--- <----- resume() <-----------------------------------+
* | --nSuspend_ ------> nSuspend_
* | swap in LocalValues (lvs_)
* | task_.handle().resume()
* | |
* | [coroutine body continues here]
* | |
* | swap out LocalValues
* | --runCount_
* | cv_.notify_all()
* v
*
* Thread Safety
* =============
* - mutex_ : guards task_.handle().resume() so that post()-before-suspend
* races cannot resume the coroutine while it is still running.
* (See the race condition discussion in JobQueue.h)
* - mutex_run_ : guards runCount_ counter; used by join() to wait until
* all in-flight resume operations complete.
* - jq_.m_mutex: guards nSuspend_ increments/decrements.
*
* Common Mistakes When Modifying This File
* =========================================
*
* 1. Changing lock ordering.
* resume() acquires locks in this order: jq_.m_mutex -> mutex_ -> mutex_run_.
* post() acquires only mutex_run_. Any new code path that touches these
* mutexes must follow the same order to avoid deadlocks.
*
* 2. Removing the shared_from_this() capture in post().
* The lambda passed to addJob captures [this, sp = shared_from_this()].
* If you remove sp, 'this' can be destroyed before the job runs,
* causing use-after-free. The sp capture is load-bearing.
*
* 3. Forgetting to decrement nSuspend_ on a new code path.
* Every ++nSuspend_ must have a matching --nSuspend_. If you add a new
* suspension path (e.g. a new awaiter) and forget to decrement on resume
* or on failure, JobQueue::stop() will hang.
*
* 4. Calling task_.handle().resume() without holding mutex_.
* This allows a race where the coroutine runs on two threads
* simultaneously. Always hold mutex_ around resume().
*
* 5. Swapping LocalValues outside of the mutex_ critical section.
* The swap-in and swap-out of LocalValues must bracket the resume()
* call. If you move the swap-out before the lock_guard(mutex_) is
* released, you break LocalValue isolation for any code that runs
* after the coroutine suspends but before the lock is dropped.
*/
//
namespace xrpl {
/**
* Construct a CoroTaskRunner. Sets runCount_ to 0; does not
* create the coroutine. Call init() afterwards.
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
inline JobQueue::CoroTaskRunner::CoroTaskRunner(
create_t,
JobQueue& jq,
JobType type,
std::string const& name)
: jq_(jq), type_(type), name_(name), runCount_(0)
{
}
/**
* Initialize with a coroutine-returning callable.
* Stores the callable on the heap (FuncStore) so it outlives the
* coroutine frame. Coroutine frames store a reference to the
* callable's implicit object parameter (the lambda). If the callable
* is a temporary, that reference dangles after the caller returns.
* Keeping the callable alive here ensures the coroutine's captures
* remain valid.
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
JobQueue::CoroTaskRunner::init(F&& f)
{
using Fn = std::decay_t<F>;
auto store = std::make_unique<FuncStore<Fn>>(std::forward<F>(f));
task_ = store->func(shared_from_this());
storedFunc_ = std::move(store);
}
/**
* Destructor. Waits for any in-flight resume() to complete, then
* asserts (debug) that the coroutine has finished or
* expectEarlyExit() was called.
*
* The join() call is necessary because with async dispatch the
* coroutine runs on a worker thread. The gate signal (which wakes
* the test thread) can arrive before resume() has set finished_.
* join() synchronizes via mutex_run_, establishing a happens-before
* edge: finished_ = true → unlock(mutex_run_) in resume() →
* lock(mutex_run_) in join() → read finished_.
*/
inline JobQueue::CoroTaskRunner::~CoroTaskRunner()
{
#ifndef NDEBUG
join();
XRPL_ASSERT(finished_, "xrpl::JobQueue::CoroTaskRunner::~CoroTaskRunner : is finished");
#endif
}
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
*/
inline void
JobQueue::CoroTaskRunner::onSuspend()
{
std::lock_guard lock(jq_.m_mutex);
++jq_.nSuspend_;
}
/**
* Decrement nSuspend_ without resuming.
*/
inline void
JobQueue::CoroTaskRunner::onUndoSuspend()
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
/**
* Return a SuspendAwaiter whose await_suspend() increments nSuspend_
* before the coroutine actually suspends. The caller must later call
* post() or resume() to continue execution.
*
* @return Awaiter for use with `co_await runner->suspend()`
*/
inline auto
JobQueue::CoroTaskRunner::suspend()
{
/**
* Custom awaiter for suspend(). Always suspends (await_ready
* returns false) and increments nSuspend_ in await_suspend().
*/
struct SuspendAwaiter
{
CoroTaskRunner& runner_; // The runner that owns this coroutine.
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Called when the coroutine suspends. Increments nSuspend_
* so the JobQueue knows a coroutine is waiting.
*/
void
await_suspend(std::coroutine_handle<>) const
{
runner_.onSuspend();
}
void
await_resume() const noexcept
{
}
};
return SuspendAwaiter{*this};
}
/**
* Schedule coroutine resumption as a job on the JobQueue.
* A shared_ptr capture (sp) prevents this CoroTaskRunner from being
* destroyed while the job is queued but not yet executed.
*
* @return false if the JobQueue rejected the job (shutting down)
*/
inline bool
JobQueue::CoroTaskRunner::post()
{
{
std::lock_guard lk(mutex_run_);
++runCount_;
}
// sp prevents 'this' from being destroyed while the job is pending
if (jq_.addJob(type_, name_, [this, sp = shared_from_this()]() { resume(); }))
{
return true;
}
// The coroutine will not run. Undo the runCount_ increment.
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
return false;
}
/**
* Resume the coroutine on the current thread.
*
* Steps:
* 1. Decrement nSuspend_ (under jq_.m_mutex)
* 2. Swap in this coroutine's LocalValues for thread-local isolation
* 3. Resume the coroutine handle (under mutex_)
* 4. Swap out LocalValues, restoring the thread's previous state
* 5. Decrement runCount_ and notify join() waiters
*
* Note: runCount_ is NOT incremented here — post() already did that.
* This ensures join() stays blocked for the entire post→resume lifetime.
*/
inline void
JobQueue::CoroTaskRunner::resume()
{
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
}
auto saved = detail::getLocalValues().release();
detail::getLocalValues().reset(&lvs_);
std::lock_guard lock(mutex_);
XRPL_ASSERT(!task_.done(), "xrpl::JobQueue::CoroTaskRunner::resume : task is not done");
task_.handle().resume();
detail::getLocalValues().release();
detail::getLocalValues().reset(saved);
if (task_.done())
{
#ifndef NDEBUG
finished_ = true;
#endif
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// Use std::move (not task_ = {}) so task_.handle_ is null BEFORE the
// frame is destroyed. operator= would destroy the frame while handle_
// still holds the old value -- a re-entrancy hazard on GCC-12 if
// frame destruction triggers runner cleanup.
[[maybe_unused]] auto completed = std::move(task_);
}
std::lock_guard lk(mutex_run_);
--runCount_;
cv_.notify_all();
}
/**
* @return true if the coroutine has not yet run to completion
*/
inline bool
JobQueue::CoroTaskRunner::runnable() const
{
// After normal completion, task_ is reset to break the shared_ptr cycle
// (handle_ becomes null). A null handle means the coroutine is done.
return task_.handle() && !task_.done();
}
/**
* Handle early termination when the coroutine never ran (e.g. JobQueue
* is stopping). Decrements nSuspend_ and destroys the coroutine frame
* to break the shared_ptr cycle: frame -> lambda -> runner -> frame.
*/
inline void
JobQueue::CoroTaskRunner::expectEarlyExit()
{
#ifndef NDEBUG
if (!finished_)
#endif
{
std::lock_guard lock(jq_.m_mutex);
--jq_.nSuspend_;
#ifndef NDEBUG
finished_ = true;
#endif
}
// Break the shared_ptr cycle: frame -> shared_ptr<runner> -> this.
// The coroutine is at initial_suspend and never ran user code, so
// destroying it is safe. Use std::move (not task_ = {}) so
// task_.handle_ is null before the frame is destroyed.
{
[[maybe_unused]] auto completed = std::move(task_);
}
storedFunc_.reset();
}
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
*/
inline void
JobQueue::CoroTaskRunner::join()
{
std::unique_lock<std::mutex> lk(mutex_run_);
cv_.wait(lk, [this]() { return runCount_ == 0; });
}
} // namespace xrpl

View File

@@ -199,7 +199,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>

View File

@@ -2,7 +2,6 @@
#include <xrpl/basics/LocalValue.h>
#include <xrpl/core/ClosureCounter.h>
#include <xrpl/core/CoroTask.h>
#include <xrpl/core/JobTypeData.h>
#include <xrpl/core/JobTypes.h>
#include <xrpl/core/detail/Workers.h>
@@ -10,7 +9,6 @@
#include <boost/coroutine/all.hpp>
#include <coroutine>
#include <set>
namespace xrpl {
@@ -121,395 +119,6 @@ public:
join();
};
/** C++20 coroutine lifecycle manager. Replaces Coro for new code.
*
* Class / Inheritance / Dependency Diagram
* =========================================
*
* std::enable_shared_from_this<CoroTaskRunner>
* ^
* | (public inheritance)
* |
* CoroTaskRunner
* +---------------------------------------------------+
* | - lvs_ : detail::LocalValues |
* | - jq_ : JobQueue& |
* | - type_ : JobType |
* | - name_ : std::string |
* | - runCount_ : int (in-flight resumes) |
* | - mutex_ : std::mutex (coroutine guard) |
* | - mutex_run_ : std::mutex (join guard) |
* | - cv_ : condition_variable |
* | - task_ : CoroTask<void> |
* | - storedFunc_ : unique_ptr<FuncBase> (type-erased)|
* +---------------------------------------------------+
* | + init(F&&) : set up coroutine callable |
* | + onSuspend() : ++jq_.nSuspend_ |
* | + onUndoSuspend() : --jq_.nSuspend_ |
* | + suspend() : returns SuspendAwaiter |
* | + post() : schedule resume on JobQueue |
* | + resume() : resume coroutine on caller |
* | + runnable() : !task_.done() |
* | + expectEarlyExit() : teardown for failed post |
* | + join() : block until not running |
* +---------------------------------------------------+
* | |
* | owns | references
* v v
* CoroTask<void> JobQueue
* (coroutine frame) (thread pool + nSuspend_)
*
* FuncBase / FuncStore<F> (type-erased heap storage
* for the coroutine lambda)
*
* Coroutine Lifecycle (Control Flow)
* ===================================
*
* Caller thread JobQueue worker thread
* ------------- ----------------------
* postCoroTask(f)
* |
* +-- check stopping_ (reject if JQ shutting down)
* +-- ++nSuspend_ (lazy start counts as suspended)
* +-- make_shared<CoroTaskRunner>
* +-- init(f)
* | +-- store lambda on heap (FuncStore)
* | +-- task_ = f(shared_from_this())
* | [coroutine created, suspended at initial_suspend]
* +-- post()
* | +-- ++runCount_
* | +-- addJob(type_, [resume]{})
* | resume()
* | |
* | +-- --nSuspend_
* | +-- swap in LocalValues
* | +-- task_.handle().resume()
* | | [coroutine body runs]
* | | ...
* | | co_await suspend()
* | | +-- ++nSuspend_
* | | [coroutine suspends]
* | +-- swap out LocalValues
* | +-- --runCount_
* | +-- cv_.notify_all()
* |
* post() <-- called externally or by JobQueueAwaiter
* +-- ++runCount_
* +-- addJob(type_, [resume]{})
* resume()
* |
* +-- [coroutine body continues]
* +-- co_return
* +-- --runCount_
* +-- cv_.notify_all()
* join()
* +-- cv_.wait([]{runCount_ == 0})
* +-- [done]
*
* Usage Examples
* ==============
*
* 1. Fire-and-forget coroutine (most common pattern):
*
* jq.postCoroTask(jtCLIENT, "MyWork",
* [](auto runner) -> CoroTask<void> {
* doSomeWork();
* co_await runner->suspend(); // yield to other jobs
* doMoreWork();
* co_return;
* });
*
* 2. Manually controlling suspend / resume (external trigger):
*
* auto runner = jq.postCoroTask(jtCLIENT, "ExtTrigger",
* [&result](auto runner) -> CoroTask<void> {
* startAsyncOperation(callback);
* co_await runner->suspend();
* // callback called runner->post() to get here
* result = collectResult();
* co_return;
* });
* // ... later, from the callback:
* runner->post(); // reschedule the coroutine on the JobQueue
*
* 3. Using JobQueueAwaiter for automatic suspend + repost:
*
* jq.postCoroTask(jtCLIENT, "AutoRepost",
* [](auto runner) -> CoroTask<void> {
* step1();
* co_await JobQueueAwaiter{runner}; // yield + auto-repost
* step2();
* co_await JobQueueAwaiter{runner};
* step3();
* co_return;
* });
*
* 4. Checking shutdown after co_await (cooperative cancellation):
*
* jq.postCoroTask(jtCLIENT, "Cancellable",
* [&jq](auto runner) -> CoroTask<void> {
* while (moreWork()) {
* co_await JobQueueAwaiter{runner};
* if (jq.isStopping())
* co_return; // bail out cleanly
* processNextItem();
* }
* co_return;
* });
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Calling suspend() without a matching post()/resume().
* After co_await runner->suspend(), the coroutine is parked and
* nSuspend_ is incremented. If nothing ever calls post() or
* resume(), the coroutine is leaked and JobQueue::stop() will
* hang forever waiting for nSuspend_ to reach zero.
*
* BUG-RISK: Calling post() on an already-running coroutine.
* post() schedules a resume() job. If the coroutine has not
* actually suspended yet (no co_await executed), the resume job
* will try to call handle().resume() while the coroutine is still
* running on another thread. This is UB. The mutex_ prevents
* data corruption but the logic is wrong — always co_await
* suspend() before calling post(). (The test testIncorrectOrder
* shows this works only because mutex_ serializes the calls.)
*
* BUG-RISK: Dropping the shared_ptr<CoroTaskRunner> before join().
* The CoroTaskRunner destructor asserts (!finished_ is false).
* If you let the last shared_ptr die while the coroutine is still
* running or suspended, you get an assertion failure in debug and
* UB in release. Always call join() or expectEarlyExit() first.
*
* BUG-RISK: Lambda captures outliving the coroutine frame.
* The lambda passed to postCoroTask is heap-allocated (FuncStore)
* to prevent dangling. But objects captured by pointer still need
* their own lifetime management. If you capture a raw pointer to
* a stack variable, and the stack frame exits before the coroutine
* finishes, the pointer dangles. Use shared_ptr or ensure the
* pointed-to object outlives the coroutine.
*
* BUG-RISK: Forgetting co_return in a void coroutine.
* If the coroutine body falls off the end without co_return,
* the compiler may silently treat it as co_return (per standard),
* but some compilers warn. Always write explicit co_return.
*
* LIMITATION: CoroTaskRunner only supports CoroTask<void>.
* The task_ member is CoroTask<void>. To return values from
* the top-level coroutine, write through a captured pointer
* (as the tests demonstrate), or co_await inner CoroTask<T>
* coroutines that return values.
*
* LIMITATION: One coroutine per CoroTaskRunner.
* init() must be called exactly once. You cannot reuse a
* CoroTaskRunner to run a second coroutine. Create a new one
* via postCoroTask() instead.
*
* LIMITATION: No timeout on join().
* join() blocks indefinitely. If the coroutine is suspended
* and never posted, join() will deadlock. Use timed waits
* on the gate pattern (condition_variable + wait_for) in tests.
*/
class CoroTaskRunner : public std::enable_shared_from_this<CoroTaskRunner>
{
private:
// Per-coroutine thread-local storage. Swapped in before resume()
// and swapped out after, so each coroutine sees its own LocalValue
// state regardless of which worker thread executes it.
detail::LocalValues lvs_;
// Back-reference to the owning JobQueue. Used to post jobs,
// increment/decrement nSuspend_, and acquire jq_.m_mutex.
JobQueue& jq_;
// Job type passed to addJob() when posting this coroutine.
JobType type_;
// Human-readable name for this coroutine job (for logging).
std::string name_;
// Number of in-flight resume operations (pending + active).
// Incremented by post(), decremented when resume() finishes.
// Guarded by mutex_run_. join() blocks until this reaches 0.
//
// A counter (not a bool) is needed because post() can be called
// from within the coroutine body (e.g. via JobQueueAwaiter),
// enqueuing a second resume while the first is still running.
// A bool would be clobbered: R2.post() sets true, then R1's
// cleanup sets false — losing the fact that R2 is still pending.
int runCount_;
// Guards task_.handle().resume() to prevent the coroutine from
// running on two threads simultaneously. Handles the race where
// post() enqueues a resume before the coroutine has actually
// suspended (post-before-suspend pattern).
std::mutex mutex_;
// Guards runCount_. Used with cv_ for join() to wait
// until all pending/active resume operations complete.
std::mutex mutex_run_;
// Notified when runCount_ reaches zero, allowing
// join() waiters to wake up.
std::condition_variable cv_;
// The coroutine handle wrapper. Owns the coroutine frame.
// Set by init(), reset to empty by expectEarlyExit() on
// early termination.
CoroTask<void> task_;
/**
* Type-erased base for heap-stored callables.
* Prevents the coroutine lambda from being destroyed before
* the coroutine frame is done with it.
*
* @see FuncStore
*/
struct FuncBase
{
virtual ~FuncBase() = default;
};
/**
* Concrete type-erased storage for a callable of type F.
* The coroutine frame stores a reference to the lambda's implicit
* object parameter. If the lambda is a temporary, that reference
* dangles after the call returns. FuncStore keeps it alive on
* the heap for the lifetime of the CoroTaskRunner.
*/
template <class F>
struct FuncStore : FuncBase
{
F func; // The stored callable (coroutine lambda).
explicit FuncStore(F&& f) : func(std::move(f))
{
}
};
// Heap-allocated callable storage. Set by init(), ensures the
// lambda outlives the coroutine frame that references it.
std::unique_ptr<FuncBase> storedFunc_;
#ifndef NDEBUG
// Debug-only flag. True once the coroutine has completed or
// expectEarlyExit() was called. Asserted in the destructor
// to catch leaked runners.
bool finished_ = false;
#endif
public:
/**
* Tag type for private construction. Prevents external code
* from constructing CoroTaskRunner directly. Use postCoroTask().
*/
struct create_t
{
explicit create_t() = default;
};
/**
* Construct a CoroTaskRunner. Private by convention (create_t tag).
*
* @param jq The JobQueue this coroutine will run on
* @param type Job type for scheduling priority
* @param name Human-readable name for logging
*/
CoroTaskRunner(create_t, JobQueue&, JobType, std::string const&);
CoroTaskRunner(CoroTaskRunner const&) = delete;
CoroTaskRunner&
operator=(CoroTaskRunner const&) = delete;
/**
* Destructor. Asserts (debug) that the coroutine has finished
* or expectEarlyExit() was called.
*/
~CoroTaskRunner();
/**
* Initialize with a coroutine-returning callable.
* Must be called exactly once, after the object is managed by
* shared_ptr (because init uses shared_from_this internally).
* This is handled automatically by postCoroTask().
*
* @param f Callable: CoroTask<void>(shared_ptr<CoroTaskRunner>)
*/
template <class F>
void
init(F&& f);
/**
* Increment the JobQueue's suspended-coroutine count (nSuspend_).
* Called when the coroutine is about to suspend. Every call
* must be balanced by a corresponding decrement (via resume()
* or onUndoSuspend()), or JobQueue::stop() will hang.
*/
void
onSuspend();
/**
* Decrement nSuspend_ without resuming.
* Used to undo onSuspend() when a scheduled post() fails
* (e.g. JobQueue is stopping).
*/
void
onUndoSuspend();
/**
* Suspend the coroutine.
* The awaiter's await_suspend() increments nSuspend_ before the
* coroutine actually suspends. The caller must later call post()
* or resume() to continue execution.
*
* @return An awaiter for use with `co_await runner->suspend()`
*/
auto
suspend();
/**
* Schedule coroutine resumption as a job on the JobQueue.
* Captures shared_from_this() to prevent this runner from being
* destroyed while the job is queued.
*
* @return true if the job was accepted; false if the JobQueue
* is stopping (caller must handle cleanup)
*/
bool
post();
/**
* Resume the coroutine on the current thread.
* Decrements nSuspend_, swaps in LocalValues, resumes the
* coroutine handle, swaps out LocalValues, and notifies join()
* waiters. Lock ordering: mutex_run_ -> jq_.m_mutex -> mutex_.
*/
void
resume();
/**
* @return true if the coroutine has not yet run to completion
*/
bool
runnable() const;
/**
* Handle early termination when the coroutine never ran.
* Decrements nSuspend_ and destroys the coroutine frame to
* break the shared_ptr cycle (frame -> lambda -> runner -> frame).
* Called by postCoroTask() when post() fails.
*/
void
expectEarlyExit();
/**
* Block until all pending/active resume operations complete.
* Uses cv_ + mutex_run_ to wait until runCount_ reaches 0.
* Warning: deadlocks if the coroutine is suspended and never posted.
*/
void
join();
};
using JobFunction = std::function<void()>;
JobQueue(
@@ -556,19 +165,6 @@ public:
std::shared_ptr<Coro>
postCoro(JobType t, std::string const& name, F&& f);
/** Creates a C++20 coroutine and adds a job to the queue to run it.
@param t The type of job.
@param name Name of the job.
@param f Callable with signature
CoroTask<void>(std::shared_ptr<CoroTaskRunner>).
@return shared_ptr to posted CoroTaskRunner. nullptr if not successful.
*/
template <class F>
std::shared_ptr<CoroTaskRunner>
postCoroTask(JobType t, std::string const& name, F&& f);
/** Jobs waiting at this priority.
*/
int
@@ -783,7 +379,6 @@ private:
} // namespace xrpl
#include <xrpl/core/Coro.ipp>
#include <xrpl/core/CoroTaskRunner.ipp>
namespace xrpl {
@@ -806,69 +401,4 @@ JobQueue::postCoro(JobType t, std::string const& name, F&& f)
return coro;
}
// postCoroTask — entry point for launching a C++20 coroutine on the JobQueue.
//
// Control Flow
// ============
//
// postCoroTask(t, name, f)
// |
// +-- 1. Check stopping_ — reject if JQ shutting down
// |
// +-- 2. ++nSuspend_ (mirrors Boost Coro ctor's implicit yield)
// | The coroutine is "suspended" from the JobQueue's perspective
// | even though it hasn't run yet — this keeps the JQ shutdown
// | logic correct (it waits for nSuspend_ to reach 0).
// |
// +-- 3. Create CoroTaskRunner (shared_ptr, ref-counted)
// |
// +-- 4. runner->init(f)
// | +-- Heap-allocate the lambda (FuncStore) to prevent
// | | dangling captures in the coroutine frame
// | +-- task_ = f(shared_from_this())
// | [coroutine created but NOT started — lazy initial_suspend]
// |
// +-- 5. runner->post()
// | +-- addJob(type_, [resume]{}) → resume on worker thread
// | +-- failure (JQ stopping):
// | +-- runner->expectEarlyExit()
// | | --nSuspend_, destroy coroutine frame
// | +-- return nullptr
//
// Why async post() instead of synchronous resume()?
// ==================================================
// The initial dispatch MUST use async post() so the coroutine body runs on
// a JobQueue worker thread, not the caller's thread. resume() swaps the
// caller's thread-local LocalValues with the coroutine's private copy.
// If the coroutine mutates LocalValues (e.g. thread_specific_storage test),
// those mutations bleed back into the caller's thread-local state after the
// swap-out, corrupting subsequent tests that share the same thread pool.
// Async post() avoids this by running the coroutine on a worker thread whose
// LocalValues are managed by the thread pool, not by the caller.
//
template <class F>
std::shared_ptr<JobQueue::CoroTaskRunner>
JobQueue::postCoroTask(JobType t, std::string const& name, F&& f)
{
// Reject if the JQ is shutting down — matches addJob()'s stopping_ check.
// Must check before incrementing nSuspend_ to avoid leaving an orphan
// count that would cause stop() to hang.
if (stopping_)
return nullptr;
{
std::lock_guard lock(m_mutex);
++nSuspend_;
}
auto runner = std::make_shared<CoroTaskRunner>(CoroTaskRunner::create_t{}, *this, t, name);
runner->init(std::forward<F>(f));
if (!runner->post())
{
runner->expectEarlyExit();
runner.reset();
}
return runner;
}
} // namespace xrpl

View File

@@ -1,174 +0,0 @@
#pragma once
#include <xrpl/core/JobQueue.h>
#include <coroutine>
#include <memory>
namespace xrpl {
/**
* Awaiter that suspends and immediately reschedules on the JobQueue.
* Equivalent to calling yield() followed by post() in the old Coro API.
*
* Usage:
* co_await JobQueueAwaiter{runner};
*
* What it waits for: The coroutine is re-queued as a job and resumes
* when a worker thread picks it up.
*
* Which thread resumes: A JobQueue worker thread.
*
* What await_resume() returns: void.
*
* Dependency Diagram
* ==================
*
* JobQueueAwaiter
* +----------------------------------------------+
* | + runner : shared_ptr<CoroTaskRunner> |
* +----------------------------------------------+
* | + await_ready() -> false (always suspend) |
* | + await_suspend() -> bool (suspend or cancel) |
* | + await_resume() -> void |
* +----------------------------------------------+
* | |
* | uses | uses
* v v
* CoroTaskRunner JobQueue
* .onSuspend() (via runner->post() -> addJob)
* .onUndoSuspend()
* .post()
*
* Control Flow (await_suspend)
* ============================
*
* co_await JobQueueAwaiter{runner}
* |
* +-- await_ready() -> false
* +-- await_suspend(handle)
* |
* +-- runner->onSuspend() // ++nSuspend_
* +-- runner->post() // addJob to JobQueue
* | |
* | +-- success? return true // coroutine stays suspended
* | | // worker thread will call resume()
* | +-- failure? (JQ stopping)
* | +-- runner->onUndoSuspend() // --nSuspend_
* | +-- return false // coroutine continues immediately
* | // so it can clean up and co_return
*
* Usage Examples
* ==============
*
* 1. Yield and auto-repost (most common -- replaces yield() + post()):
*
* CoroTask<void> handler(auto runner) {
* doPartA();
* co_await JobQueueAwaiter{runner}; // yield + repost
* doPartB(); // runs on a worker thread
* co_return;
* }
*
* 2. Multiple yield points in a loop:
*
* CoroTask<void> batchProcessor(auto runner) {
* for (auto& item : items) {
* process(item);
* co_await JobQueueAwaiter{runner}; // let other jobs run
* }
* co_return;
* }
*
* 3. Graceful shutdown -- checking after resume:
*
* CoroTask<void> longTask(auto runner, JobQueue& jq) {
* while (hasWork()) {
* co_await JobQueueAwaiter{runner};
* // If JQ is stopping, await_suspend returns false and
* // the coroutine continues immediately without re-queuing.
* // Always check isStopping() to decide whether to proceed:
* if (jq.isStopping())
* co_return;
* doNextChunk();
* }
* co_return;
* }
*
* Caveats / Pitfalls
* ==================
*
* BUG-RISK: Using a stale or null runner.
* The runner shared_ptr must be valid and point to the CoroTaskRunner
* that owns the coroutine currently executing. Passing a runner from
* a different coroutine, or a default-constructed shared_ptr, is UB.
*
* BUG-RISK: Assuming resume happens on the same thread.
* After co_await JobQueueAwaiter, the coroutine resumes on whatever
* worker thread picks up the job. Do not rely on thread-local state
* unless it is managed through LocalValue (which CoroTaskRunner
* automatically swaps in/out).
*
* BUG-RISK: Ignoring the shutdown path.
* When the JobQueue is stopping, post() fails and await_suspend()
* returns false (coroutine does NOT actually suspend). The coroutine
* body continues immediately on the same thread. If your code after
* co_await assumes it was re-queued and is running on a worker thread,
* that assumption breaks during shutdown. Always handle the "JQ is
* stopping" case, either by checking jq.isStopping() or by letting
* the coroutine fall through to co_return naturally.
*
* DIFFERENCE from runner->suspend() + runner->post():
* JobQueueAwaiter combines both in one atomic operation. With the
* manual suspend()/post() pattern, there is a window between the
* two calls where an external event could race. JobQueueAwaiter
* removes that window -- onSuspend() and post() happen within the
* same await_suspend() call while the coroutine is guaranteed to
* be suspended. Prefer JobQueueAwaiter unless you need an external
* party to decide *when* to call post().
*/
struct JobQueueAwaiter
{
// The CoroTaskRunner that owns the currently executing coroutine.
std::shared_ptr<JobQueue::CoroTaskRunner> runner;
/**
* Always returns false so the coroutine suspends.
*/
bool
await_ready() const noexcept
{
return false;
}
/**
* Increment nSuspend (equivalent to yield()) and schedule resume
* on the JobQueue (equivalent to post()). If the JobQueue is
* stopping, undoes the suspend count and returns false so the
* coroutine continues immediately and can clean up.
*
* @return true if coroutine should stay suspended (job posted);
* false if coroutine should continue (JQ stopping)
*/
bool
await_suspend(std::coroutine_handle<>)
{
runner->onSuspend();
if (!runner->post())
{
// JobQueue is stopping. Undo the suspend count and
// don't actually suspend — the coroutine continues
// immediately so it can clean up and co_return.
runner->onUndoSuspend();
return false;
}
return true;
}
void
await_resume() const noexcept
{
}
};
} // namespace xrpl

View File

@@ -77,16 +77,16 @@ public:
If the object is not found or an error is encountered, the
result will indicate the condition.
@note This will be called concurrently.
@param hash The hash of the object.
@param key A pointer to the key data.
@param pObject [out] The created object if successful.
@return The result of the operation.
*/
virtual Status
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) = 0;
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) = 0;
/** Fetch a batch synchronously. */
virtual std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) = 0;
fetchBatch(std::vector<uint256 const*> const& hashes) = 0;
/** Store a single object.
Depending on the implementation this may happen immediately

View File

@@ -35,6 +35,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, char const* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -33,7 +33,7 @@ DatabaseNodeImp::fetchNodeObject(
try
{
status = backend_->fetch(hash, &nodeObject);
status = backend_->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{
@@ -68,10 +68,18 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
using namespace std::chrono;
auto const before = steady_clock::now();
std::vector<uint256 const*> batch{};
batch.reserve(hashes.size());
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
batch.push_back(&hash);
}
// Get the node objects that match the hashes from the backend. To protect
// against the backends returning fewer or more results than expected, the
// container is resized to the number of hashes.
auto results = backend_->fetchBatch(hashes).first;
auto results = backend_->fetchBatch(batch).first;
XRPL_ASSERT(
results.size() == hashes.size() || results.empty(),
"number of output objects either matches number of input hashes or is empty");

View File

@@ -105,7 +105,7 @@ DatabaseRotatingImp::fetchNodeObject(
std::shared_ptr<NodeObject> nodeObject;
try
{
status = backend->fetch(hash, &nodeObject);
status = backend->fetch(hash.data(), &nodeObject);
}
catch (std::exception const& e)
{

View File

@@ -116,9 +116,10 @@ public:
//--------------------------------------------------------------------------
Status
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
XRPL_ASSERT(db_, "xrpl::NodeStore::MemoryBackend::fetch : non-null database");
uint256 const hash(uint256::fromVoid(key));
std::lock_guard _(db_->mutex);
@@ -133,14 +134,14 @@ public:
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h, &nObj);
Status status = fetch(h->begin(), &nObj);
if (status != ok)
results.push_back({});
else

View File

@@ -179,17 +179,17 @@ public:
}
Status
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pno) override
fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
{
Status status;
pno->reset();
nudb::error_code ec;
db_.fetch(
hash.data(),
[&hash, pno, &status](void const* data, std::size_t size) {
key,
[key, pno, &status](void const* data, std::size_t size) {
nudb::detail::buffer bf;
auto const result = nodeobject_decompress(data, size, bf);
DecodedBlob decoded(hash.data(), result.first, result.second);
DecodedBlob decoded(key, result.first, result.second);
if (!decoded.wasOk())
{
status = dataCorrupt;
@@ -207,14 +207,14 @@ public:
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h, &nObj);
Status status = fetch(h->begin(), &nObj);
if (status != ok)
results.push_back({});
else

View File

@@ -36,13 +36,13 @@ public:
}
Status
fetch(uint256 const&, std::shared_ptr<NodeObject>*) override
fetch(void const*, std::shared_ptr<NodeObject>*) override
{
return notFound;
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
return {};
}

View File

@@ -244,7 +244,7 @@ public:
//--------------------------------------------------------------------------
Status
fetch(uint256 const& hash, std::shared_ptr<NodeObject>* pObject) override
fetch(void const* key, std::shared_ptr<NodeObject>* pObject) override
{
XRPL_ASSERT(m_db, "xrpl::NodeStore::RocksDBBackend::fetch : non-null database");
pObject->reset();
@@ -252,7 +252,7 @@ public:
Status status(ok);
rocksdb::ReadOptions const options;
rocksdb::Slice const slice(std::bit_cast<char const*>(hash.data()), m_keyBytes);
rocksdb::Slice const slice(static_cast<char const*>(key), m_keyBytes);
std::string string;
@@ -260,7 +260,7 @@ public:
if (getStatus.ok())
{
DecodedBlob decoded(hash.data(), string.data(), string.size());
DecodedBlob decoded(key, string.data(), string.size());
if (decoded.wasOk())
{
@@ -295,14 +295,14 @@ public:
}
std::pair<std::vector<std::shared_ptr<NodeObject>>, Status>
fetchBatch(std::vector<uint256> const& hashes) override
fetchBatch(std::vector<uint256 const*> const& hashes) override
{
std::vector<std::shared_ptr<NodeObject>> results;
results.reserve(hashes.size());
for (auto const& h : hashes)
{
std::shared_ptr<NodeObject> nObj;
Status status = fetch(h, &nObj);
Status status = fetch(h->begin(), &nObj);
if (status != ok)
results.push_back({});
else
@@ -332,8 +332,9 @@ public:
EncodedBlob encoded(e);
wb.Put(
rocksdb::Slice(std::bit_cast<char const*>(encoded.getKey()), m_keyBytes),
rocksdb::Slice(std::bit_cast<char const*>(encoded.getData()), encoded.getSize()));
rocksdb::Slice(reinterpret_cast<char const*>(encoded.getKey()), m_keyBytes),
rocksdb::Slice(
reinterpret_cast<char const*>(encoded.getData()), encoded.getSize()));
}
rocksdb::WriteOptions const options;

View File

@@ -85,7 +85,12 @@ public:
}
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}

View File

@@ -5340,20 +5340,20 @@ class Vault_test : public beast::unit_test::suite
env.close();
// 2. Mantissa larger than uint64 max
env.set_parse_failure_expected(true);
try
{
tx[sfAssetsMaximum] = "18446744073709551617e5"; // uint64 max + 1
env(tx, THISLINE);
BEAST_EXPECTS(false, "Expected parse_error for mantissa larger than uint64 max");
BEAST_EXPECT(false);
}
catch (parse_error const& e)
{
using namespace std::string_literals;
BEAST_EXPECT(
e.what() == "invalidParamsField 'tx_json.AssetsMaximum' has invalid data."s);
e.what() ==
"invalidParamsField 'tx_json.AssetsMaximum' has invalid "
"data."s);
}
env.set_parse_failure_expected(false);
}
}

View File

@@ -0,0 +1,165 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2016 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/CanProcess.h>
#include <xrpl/beast/unit_test.h>
#include <memory>
namespace ripple {
namespace test {
struct CanProcess_test : beast::unit_test::suite
{
template <class Mutex, class Collection, class Item>
void
test(
std::string const& name,
Mutex& mtx,
Collection& collection,
std::vector<Item> const& items)
{
testcase(name);
if (!BEAST_EXPECT(!items.empty()))
return;
if (!BEAST_EXPECT(collection.empty()))
return;
// CanProcess objects can't be copied or moved. To make that easier,
// store shared_ptrs
std::vector<std::shared_ptr<CanProcess>> trackers;
// Fill up the vector with two CanProcess for each Item. The first
// inserts the item into the collection and is "good". The second does
// not and is "bad".
for (int i = 0; i < items.size(); ++i)
{
{
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
{
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
BEAST_EXPECT(collection.size() == i + 1);
}
BEAST_EXPECT(collection.size() == items.size());
// Now remove the items from the vector<CanProcess> two at a time, and
// try to get another CanProcess for that item.
for (int i = 0; i < items.size(); ++i)
{
// Remove the "bad" one in the second position
// This will have no effect on the collection
{
auto const iter = trackers.begin() + 1;
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size());
{
// Append a new "bad" one
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
// Remove the "good" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size() - 1);
{
// Append a new "good" one
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
}
// Now remove them all two at a time
for (int i = items.size() - 1; i >= 0; --i)
{
// Remove the "bad" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
// Remove the "good" one now in front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == 2 * i);
BEAST_EXPECT(collection.size() == i);
}
BEAST_EXPECT(trackers.empty());
BEAST_EXPECT(collection.empty());
}
void
run() override
{
{
std::mutex m;
std::set<int> collection;
std::vector<int> const items{1, 2, 3, 4, 5};
test("set of int", m, collection, items);
}
{
std::mutex m;
std::set<std::string> collection;
std::vector<std::string> const items{"one", "two", "three", "four", "five"};
test("set of string", m, collection, items);
}
{
std::mutex m;
std::unordered_set<char> collection;
std::vector<char> const items{'1', '2', '3', '4', '5'};
test("unorderd_set of char", m, collection, items);
}
{
std::mutex m;
std::unordered_set<std::uint64_t> collection;
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
test("unordered_set of uint64_t", m, collection, items);
}
}
};
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
} // namespace test
} // namespace ripple

View File

@@ -1,537 +0,0 @@
#include <test/jtx.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/JobQueueAwaiter.h>
#include <chrono>
#include <mutex>
namespace xrpl {
namespace test {
/**
* Test suite for the C++20 coroutine primitives: CoroTask, CoroTaskRunner,
* and JobQueueAwaiter.
*
* Dependency Diagram
* ==================
*
* CoroTask_test
* +-------------------------------------------------+
* | + gate (inner class) : condition_variable helper |
* +-------------------------------------------------+
* | uses
* v
* jtx::Env --> JobQueue::postCoroTask()
* |
* +-- CoroTaskRunner (suspend / post / resume)
* +-- CoroTask<void> / CoroTask<T>
* +-- JobQueueAwaiter
*
* Test Coverage Matrix
* ====================
*
* Test | Primitives exercised
* --------------------------+----------------------------------------------
* testVoidCompletion | CoroTask<void> basic lifecycle
* testCorrectOrder | suspend() -> join() -> post() -> complete
* testIncorrectOrder | post() before suspend() (race-safe path)
* testJobQueueAwaiter | JobQueueAwaiter suspend + auto-repost
* testThreadSpecificStorage | LocalValue isolation across coroutines
* testExceptionPropagation | unhandled_exception() in promise_type
* testMultipleYields | N sequential suspend/resume cycles
* testValueReturn | CoroTask<T> co_return value
* testValueException | CoroTask<T> exception via co_await
* testValueChaining | nested CoroTask<T> -> CoroTask<T>
* testShutdownRejection | postCoroTask returns nullptr when stopping
*/
class CoroTask_test : public beast::unit_test::suite
{
public:
/**
* Simple one-shot gate for synchronizing between test thread
* and coroutine worker threads. signal() sets the flag;
* wait_for() blocks until signaled or timeout.
*/
class gate
{
private:
std::condition_variable cv_;
std::mutex mutex_;
bool signaled_ = false;
public:
/**
* Block until signaled or timeout expires.
*
* @param rel_time Maximum duration to wait
*
* @return true if signaled before timeout
*/
template <class Rep, class Period>
bool
wait_for(std::chrono::duration<Rep, Period> const& rel_time)
{
std::unique_lock<std::mutex> lk(mutex_);
auto b = cv_.wait_for(lk, rel_time, [this] { return signaled_; });
signaled_ = false;
return b;
}
/**
* Signal the gate, waking any waiting thread.
*/
void
signal()
{
std::lock_guard lk(mutex_);
signaled_ = true;
cv_.notify_all();
}
};
// NOTE: All coroutine lambdas passed to postCoroTask use explicit
// pointer-by-value captures instead of [&] to work around a GCC 14
// bug where reference captures in coroutine lambdas are corrupted
// in the coroutine frame.
/**
* CoroTask<void> runs to completion and runner becomes non-runnable.
*/
void
testVoidCompletion()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("void completion");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* Correct order: suspend, join, post, complete.
* Mirrors existing Coroutine_test::correct_order.
*/
void
testCorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("correct order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g1, g2;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, g1p = &g1, g2p = &g2](auto runner) -> CoroTask<void> {
*rp = runner;
g1p->signal();
co_await runner->suspend();
g2p->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g1.wait_for(5s));
runner->join();
runner->post();
BEAST_EXPECT(g2.wait_for(5s));
runner->join();
}
/**
* Incorrect order: post() before suspend(). Verifies the
* race-safe path. Mirrors Coroutine_test::incorrect_order.
*/
void
testIncorrectOrder()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("incorrect order");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto runner) -> CoroTask<void> {
runner->post();
co_await runner->suspend();
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
}
/**
* JobQueueAwaiter suspend + auto-repost across multiple yield points.
*/
void
testJobQueueAwaiter()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("JobQueueAwaiter");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int step = 0;
env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [sp = &step, gp = &g](auto runner) -> CoroTask<void> {
*sp = 1;
co_await JobQueueAwaiter{runner};
*sp = 2;
co_await JobQueueAwaiter{runner};
*sp = 3;
gp->signal();
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(step == 3);
}
/**
* Per-coroutine LocalValue isolation. Each coroutine sees its own
* copy of thread-local state. Mirrors Coroutine_test::thread_specific_storage.
*/
void
testThreadSpecificStorage()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("thread specific storage");
Env env(*this);
auto& jq = env.app().getJobQueue();
static int const N = 4;
std::array<std::shared_ptr<JobQueue::CoroTaskRunner>, N> a;
LocalValue<int> lv(-1);
BEAST_EXPECT(*lv == -1);
gate g;
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -1);
*lv = -2;
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
for (int i = 0; i < N; ++i)
{
jq.postCoroTask(
jtCLIENT,
"CoroTaskTest",
[this, ap = &a, gp = &g, lvp = &lv, id = i](auto runner) -> CoroTask<void> {
(*ap)[id] = runner;
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == -1);
**lvp = id;
this->BEAST_EXPECT(**lvp == id);
gp->signal();
co_await runner->suspend();
this->BEAST_EXPECT(**lvp == id);
co_return;
});
BEAST_EXPECT(g.wait_for(5s));
a[i]->join();
}
for (auto const& r : a)
{
r->post();
BEAST_EXPECT(g.wait_for(5s));
r->join();
}
for (auto const& r : a)
{
r->post();
r->join();
}
jq.addJob(jtCLIENT, "LocalValTest", [&]() {
this->BEAST_EXPECT(*lv == -2);
g.signal();
});
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(*lv == -1);
}
/**
* Exception thrown in coroutine body is caught by
* promise_type::unhandled_exception(). Coroutine completes.
*/
void
testExceptionPropagation()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("exception propagation");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [gp = &g](auto) -> CoroTask<void> {
gp->signal();
throw std::runtime_error("test exception");
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
// The exception is caught by promise_type::unhandled_exception()
// and the coroutine is considered done
BEAST_EXPECT(!runner->runnable());
}
/**
* Multiple sequential suspend/resume cycles via co_await.
*/
void
testMultipleYields()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("multiple yields");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int counter = 0;
std::shared_ptr<JobQueue::CoroTaskRunner> r;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT,
"CoroTaskTest",
[rp = &r, cp = &counter, gp = &g](auto runner) -> CoroTask<void> {
*rp = runner;
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_await runner->suspend();
++(*cp);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 1);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 2);
runner->join();
runner->post();
BEAST_EXPECT(g.wait_for(5s));
BEAST_EXPECT(counter == 3);
runner->join();
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> returns a value via co_return. Outer coroutine
* extracts it with co_await.
*/
void
testValueReturn()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value return");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> { co_return 42; };
*rp = co_await inner();
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 42);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> propagates exceptions from inner coroutines.
* Outer coroutine catches via try/catch around co_await.
*/
void
testValueException()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value exception");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
bool caught = false;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [cp = &caught, gp = &g](auto) -> CoroTask<void> {
auto inner = []() -> CoroTask<int> {
throw std::runtime_error("inner error");
co_return 0;
};
try
{
co_await inner();
}
catch (std::runtime_error const& e)
{
*cp = true;
}
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(caught);
BEAST_EXPECT(!runner->runnable());
}
/**
* CoroTask<T> chaining. Nested value-returning coroutines
* compose via co_await.
*/
void
testValueChaining()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("value chaining");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
gate g;
int result = 0;
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [rp = &result, gp = &g](auto) -> CoroTask<void> {
auto add = [](int a, int b) -> CoroTask<int> { co_return a + b; };
auto mul = [add](int a, int b) -> CoroTask<int> {
int sum = co_await add(a, b);
co_return sum * 2;
};
*rp = co_await mul(3, 4);
gp->signal();
co_return;
});
BEAST_EXPECT(runner);
BEAST_EXPECT(g.wait_for(5s));
runner->join();
BEAST_EXPECT(result == 14); // (3 + 4) * 2
BEAST_EXPECT(!runner->runnable());
}
/**
* postCoroTask returns nullptr when JobQueue is stopping.
*/
void
testShutdownRejection()
{
using namespace std::chrono_literals;
using namespace jtx;
testcase("shutdown rejection");
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FORCE_MULTI_THREAD = true;
return cfg;
}));
// Stop the JobQueue
env.app().getJobQueue().stop();
auto runner = env.app().getJobQueue().postCoroTask(
jtCLIENT, "CoroTaskTest", [](auto) -> CoroTask<void> { co_return; });
BEAST_EXPECT(!runner);
}
void
run() override
{
testVoidCompletion();
testCorrectOrder();
testIncorrectOrder();
testJobQueueAwaiter();
testThreadSpecificStorage();
testExceptionPropagation();
testMultipleYields();
testValueReturn();
testValueException();
testValueChaining();
testShutdownRejection();
}
};
BEAST_DEFINE_TESTSUITE(CoroTask, core, xrpl);
} // namespace test
} // namespace xrpl

View File

@@ -138,7 +138,7 @@ public:
{
std::shared_ptr<NodeObject> object;
Status const status = backend.fetch(batch[i]->getHash(), &object);
Status const status = backend.fetch(batch[i]->getHash().cbegin(), &object);
BEAST_EXPECT(status == ok);
@@ -158,7 +158,7 @@ public:
{
std::shared_ptr<NodeObject> object;
Status const status = backend.fetch(batch[i]->getHash(), &object);
Status const status = backend.fetch(batch[i]->getHash().cbegin(), &object);
BEAST_EXPECT(status == notFound);
}

View File

@@ -314,7 +314,7 @@ public:
std::shared_ptr<NodeObject> obj;
std::shared_ptr<NodeObject> result;
obj = seq1_.obj(dist_(gen_));
backend_.fetch(obj->getHash(), &result);
backend_.fetch(obj->getHash().data(), &result);
suite_.expect(result && isSame(result, obj));
}
catch (std::exception const& e)
@@ -377,9 +377,9 @@ public:
{
try
{
auto const hash = seq2_.key(i);
auto const key = seq2_.key(i);
std::shared_ptr<NodeObject> result;
backend_.fetch(hash, &result);
backend_.fetch(key.data(), &result);
suite_.expect(!result);
}
catch (std::exception const& e)
@@ -449,9 +449,9 @@ public:
{
if (rand_(gen_) < missingNodePercent)
{
auto const hash = seq2_.key(dist_(gen_));
auto const key = seq2_.key(dist_(gen_));
std::shared_ptr<NodeObject> result;
backend_.fetch(hash, &result);
backend_.fetch(key.data(), &result);
suite_.expect(!result);
}
else
@@ -459,7 +459,7 @@ public:
std::shared_ptr<NodeObject> obj;
std::shared_ptr<NodeObject> result;
obj = seq1_.obj(dist_(gen_));
backend_.fetch(obj->getHash(), &result);
backend_.fetch(obj->getHash().data(), &result);
suite_.expect(result && isSame(result, obj));
}
}
@@ -540,7 +540,8 @@ public:
std::shared_ptr<NodeObject> result;
auto const j = older_(gen_);
obj = seq1_.obj(j);
backend_.fetch(obj->getHash(), &result);
std::shared_ptr<NodeObject> result1;
backend_.fetch(obj->getHash().data(), &result);
suite_.expect(result != nullptr);
suite_.expect(isSame(result, obj));
}
@@ -558,7 +559,7 @@ public:
std::shared_ptr<NodeObject> result;
auto const j = recent_(gen_);
obj = seq1_.obj(j);
backend_.fetch(obj->getHash(), &result);
backend_.fetch(obj->getHash().data(), &result);
suite_.expect(!result || isSame(result, obj));
break;
}

View File

@@ -107,10 +107,8 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL1", hash, 0, InboundLedger::Reason::CONSENSUS);
}
return std::nullopt;
}
@@ -985,7 +983,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -117,12 +117,8 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
{
JLOG(j_.warn()) << "Need validated ledger for preferred ledger analysis " << hash;
Application* pApp = &app_;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(hash, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL2", hash, 0, InboundLedger::Reason::CONSENSUS);
return std::nullopt;
}

View File

@@ -26,7 +26,12 @@ public:
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) = 0;
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

View File

@@ -353,7 +353,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true;

View File

@@ -2,9 +2,9 @@
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/PerfLog.h>
@@ -59,12 +59,15 @@ public:
(reason != InboundLedger::Reason::CONSENSUS))
return {};
std::stringstream ss;
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -83,47 +86,61 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
}
void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (auto check = std::make_shared<CanProcess const>(acquiresMutex_, pendingAcquires_, hash);
*check)
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock unlock(lock);
acquire(hash, seq, reason);
app_.getJobQueue().addJob(type, name, [check, name, hash, seq, reason, this]() {
JLOG(j_.debug()) << "JOB acquireAsync " << name << " started ";
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new "
"inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
});
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -907,8 +907,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< to_short_string(ledger->header().hash) << ") with >= " << minVal
<< " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -13,7 +13,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timeouts_(0)
, complete_(false)
@@ -33,6 +34,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -40,6 +42,10 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,6 +3,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -103,6 +104,7 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -30,10 +30,10 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/NetworkIDService.h>
@@ -396,7 +396,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, char const* reason) override;
bool
isBlocked() override;
@@ -841,7 +841,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -984,7 +984,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1008,7 +1008,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1018,9 +1018,9 @@ NetworkOPsImp::processHeartbeatTimer()
auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
auto newMode = mMode.load();
if (origMode != newMode)
{
@@ -1710,7 +1710,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1741,7 +1741,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1837,7 +1837,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -1922,8 +1922,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2052,7 +2052,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
@@ -2065,7 +2065,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.timeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -2077,7 +2077,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2379,7 +2379,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2399,11 +2399,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om)
return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2412,32 +2413,24 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
scope_unlock unlock(lock);
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation " << val->getLedgerHash();
}
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);