Compare commits

...

52 Commits

Author SHA1 Message Date
Ed Hennis
75e134a1fd Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-03-03 20:46:56 -04:00
Ayaz Salikhov
fcec31ed20 chore: Update pre-commit hooks (#6460) 2026-03-03 20:23:22 +00:00
Ed Hennis
828bcb3a7d Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-03-03 15:54:51 -04:00
dependabot[bot]
0abd762781 ci: [DEPENDABOT] bump actions/upload-artifact from 6.0.0 to 7.0.0 (#6450) 2026-03-03 17:17:08 +00:00
Sergey Kuznetsov
5300e65686 tests: Improve stability of Subscribe tests (#6420)
The `Subscribe` tests were flaky, because each test performs some operations (e.g. sends transactions) and waits for messages to appear in subscription with a 100ms timeout. If tests are slow (e.g. compiled in debug mode or a slow machine) then some of them could fail. This change adds an attempt to synchronize the background Env's thread and the test's thread by ensuring that all the scheduled operations are started before the test's thread starts to wait for a websocket message. This is done by limiting I/O threads of the app inside Env to 1 and adding a synchronization barrier after closing the ledger.
2026-03-03 08:46:55 -05:00
Ed Hennis
6fc972746d Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-24 17:35:00 -04:00
Ed Hennis
930afbdea8 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 18:50:00 -04:00
Ed Hennis
95c5bef48b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 18:26:19 -04:00
Ed Hennis
b489b6c3ce Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-20 17:32:02 -04:00
Ed Hennis
70765acef4 Merge remote-tracking branch 'upstream/develop' into ximinez/acquireAsyncDispatch
* upstream/develop:
  ci: [DEPENDABOT] bump tj-actions/changed-files from 46.0.5 to 47.0.4 (6394)
  ci: [DEPENDABOT] bump codecov/codecov-action from 5.4.3 to 5.5.2 (6398)
  ci: Build docs in PRs and in private repos (6400)
  ci: Add dependabot config (6379)
  Fix tautological assertion (6393)
  chore: Apply clang-format width 100 (6387)
2026-02-20 16:30:42 -05:00
Ed Hennis
77aa90bd0e Update formatting 2026-02-20 16:27:59 -05:00
Ed Hennis
4758bb6dc9 Merge commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c' into ximinez/acquireAsyncDispatch
* commit '25cca465538a56cce501477f9e5e2c1c7ea2d84c':
  chore: Set clang-format width to 100 in config file (6387)
2026-02-20 16:27:39 -05:00
Ed Hennis
46e3dcb5fb Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-19 16:24:54 -05:00
Ed Hennis
d57579f10b Merge remote-tracking branch 'XRPLF/develop' into ximinez/acquireAsyncDispatch
* XRPLF/develop:
  refactor: Modularize app/tx (6228)
  refactor: Decouple app/tx from `Application` and `Config` (6227)
  chore: Update clang-format to 21.1.8 (6352)
  refactor: Modularize `HashRouter`, `Conditions`, and `OrderBookDB` (6226)
  chore: Fix minor issues in comments (6346)
  refactor: Modularize the NetworkOPs interface (6225)
  chore: Fix `gcov` lib coverage build failure on macOS (6350)
  refactor: Modularize RelationalDB (6224)
  refactor: Modularize WalletDB and Manifest (6223)
  fix: Update invariant checks for Permissioned Domains (6134)
  refactor: Change main thread name to `xrpld-main` (6336)
  refactor: Fix spelling issues in tests (6199)
  test: Add file and line location to Env (6276)
  chore: Remove CODEOWNERS (6337)
  perf: Remove unnecessary caches (5439)
  chore: Restore unity builds (6328)
  refactor: Update secp256k1 to 0.7.1 (6331)
  fix: Increment sequence when accepting new manifests (6059)
  fix typo in LendingHelpers unit-test (6215)
2026-02-18 19:52:18 -05:00
Ed Hennis
d8dd376d1c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-04 16:30:10 -04:00
Ed Hennis
a8c03e2e6c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-02-03 16:08:06 -04:00
Ed Hennis
2167a66bc7 Fix formatting 2026-01-28 19:39:15 -05:00
Ed Hennis
ed948a858c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-28 18:49:15 -04:00
Ed Hennis
608c102743 Merge commit '5f638f55536def0d88b970d1018a465a238e55f4' into ximinez/acquireAsyncDispatch
* commit '5f638f55536def0d88b970d1018a465a238e55f4':
  chore: Set ColumnLimit to 120 in clang-format (6288)
2026-01-28 17:47:53 -05:00
Ed Hennis
36d1607a4e Merge commit '92046785d1fea5f9efe5a770d636792ea6cab78b' into ximinez/acquireAsyncDispatch
* commit '92046785d1fea5f9efe5a770d636792ea6cab78b':
  test: Fix the `xrpl.net` unit test using async read (6241)
  ci: Upload Conan recipes for develop, release candidates, and releases (6286)
  fix: Stop embedded tests from hanging on ARM by using `atomic_flag` (6248)
  fix:  Remove DEFAULT fields that change to the default in associateAsset (6259) (6273)
  refactor: Update Boost to 1.90 (6280)
  refactor: clean up uses of `std::source_location` (6272)
  ci: Pass missing sanitizers input to actions (6266)
  ci: Properly propagate Conan credentials (6265)
  ci: Explicitly set version when exporting the Conan recipe (6264)
  ci: Use plus instead of hyphen for Conan recipe version suffix (6261)
  chore: Detect uninitialized variables in CMake files (6247)
  ci: Run on-trigger and on-pr when generate-version is modified (6257)
  refactor: Enforce 15-char limit and simplify labels for thread naming (6212)
  docs: Update Ripple Bug Bounty public key (6258)
  ci: Add missing commit hash to Conan recipe version (6256)
  fix: Include `<functional>` header in `Number.h` (6254)
  ci: Upload Conan recipe for merges into develop and commits to release (6235)
  Limit reply size on `TMGetObjectByHash` queries (6110)
  ci: remove 'master' branch as a trigger (6234)
  Improve ledger_entry lookups for fee, amendments, NUNL, and hashes (5644)
2026-01-28 17:47:47 -05:00
Ed Hennis
53ebb86d60 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-15 13:03:36 -04:00
Ed Hennis
1d989bc6de Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-15 12:06:00 -04:00
Ed Hennis
64c0cb8c7e Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-13 18:19:11 -04:00
Ed Hennis
c77cfef41c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-13 15:28:01 -04:00
Ed Hennis
08aa8c06d1 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-12 14:52:16 -04:00
Ed Hennis
9498672f8e Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-11 00:50:43 -04:00
Ed Hennis
e91d55a0e0 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-08 17:06:11 -04:00
Ed Hennis
afdc452cfc Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-08 13:04:20 -04:00
Ed Hennis
a0d4ef1a54 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2026-01-06 14:02:15 -05:00
Ed Hennis
8bc384f8bf Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-22 17:39:59 -05:00
Ed Hennis
bd961c484b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-18 19:59:52 -05:00
Ed Hennis
aee242a8d4 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-12 20:34:59 -05:00
Ed Hennis
fcae74de58 Merge remote-tracking branch 'XRPLF/develop' into ximinez/acquireAsyncDispatch
* XRPLF/develop:
  refactor: Rename `ripple` namespace to `xrpl` (5982)
  refactor: Move JobQueue and related classes into xrpl.core module (6121)
  refactor: Rename `rippled` binary to `xrpld` (5983)
  refactor: rename info() to header() (6138)
  refactor: rename `LedgerInfo` to `LedgerHeader` (6136)
  refactor: clean up `RPCHelpers` (5684)
  chore: Fix docs readme and cmake (6122)
  chore: Clean up .gitignore and .gitattributes (6001)
  chore: Use updated secp256k1 recipe (6118)
2025-12-11 15:33:12 -05:00
Ed Hennis
a56effcb00 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-05 21:13:10 -05:00
Ed Hennis
64c2eca465 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-02 17:37:29 -05:00
Ed Hennis
e56f750e1d Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-12-01 14:40:45 -05:00
Ed Hennis
fde000f3eb Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-28 15:46:44 -05:00
Ed Hennis
d0a62229da Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-27 01:48:56 -05:00
Ed Hennis
d5932cc7d4 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-26 00:25:17 -05:00
Ed Hennis
0b534da781 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-25 14:55:06 -05:00
Ed Hennis
71a70d343b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-24 21:49:11 -05:00
Ed Hennis
0899e65030 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-24 21:30:22 -05:00
Ed Hennis
31ba529761 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-21 12:47:58 -05:00
Ed Hennis
e2c6e5ebb6 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-18 22:39:29 -05:00
Ed Hennis
9d807fce48 Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-15 03:08:41 -05:00
Ed Hennis
9ef160765c Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-13 12:19:21 -05:00
Ed Hennis
d6c0eb243b Merge branch 'develop' into ximinez/acquireAsyncDispatch 2025-11-12 14:12:55 -05:00
Ed Hennis
84c9fc123c Fix formatting 2025-11-10 19:53:05 -05:00
Ed Hennis
00a2a58cfa Add missing header 2025-11-10 19:53:05 -05:00
Ed Hennis
bb2098d873 Add a unit test for CanProcess
- Delete the copy ctor & operator
2025-11-10 19:53:05 -05:00
Ed Hennis
46a5bc74db refactor: acquireAsync will dispatch the job, not the other way around 2025-11-10 19:53:05 -05:00
Ed Hennis
7b72b9cc82 Improve job queue collision checks and logging
- Improve logging related to ledger acquisition and operating mode
  changes
- Class "CanProcess" to keep track of processing of distinct items
2025-11-10 19:53:05 -05:00
31 changed files with 575 additions and 162 deletions

View File

@@ -177,7 +177,7 @@ jobs:
- name: Upload the binary (Linux)
if: ${{ github.repository_owner == 'XRPLF' && runner.os == 'Linux' }}
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: xrpld-${{ inputs.config_name }}
path: ${{ env.BUILD_DIR }}/xrpld

View File

@@ -84,7 +84,7 @@ jobs:
- name: Upload clang-tidy output
if: steps.run_clang_tidy.outcome != 'success'
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
with:
name: clang-tidy-results
path: clang-tidy-output.txt

View File

@@ -20,7 +20,7 @@ repos:
args: [--assume-in-merge]
- repo: https://github.com/pre-commit/mirrors-clang-format
rev: 75ca4ad908dc4a99f57921f29b7e6c1521e10b26 # frozen: v21.1.8
rev: cd481d7b0bfb5c7b3090c21846317f9a8262e891 # frozen: v22.1.0
hooks:
- id: clang-format
args: [--style=file]
@@ -33,17 +33,17 @@ repos:
additional_dependencies: [PyYAML]
- repo: https://github.com/rbubley/mirrors-prettier
rev: 5ba47274f9b181bce26a5150a725577f3c336011 # frozen: v3.6.2
rev: c2bc67fe8f8f549cc489e00ba8b45aa18ee713b1 # frozen: v3.8.1
hooks:
- id: prettier
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 831207fd435b47aeffdf6af853097e64322b4d44 # frozen: v25.12.0
rev: ea488cebbfd88a5f50b8bd95d5c829d0bb76feb8 # frozen: 26.1.0
hooks:
- id: black
- repo: https://github.com/streetsidesoftware/cspell-cli
rev: 1cfa010f078c354f3ffb8413616280cc28f5ba21 # frozen: v9.4.0
rev: a42085ade523f591dca134379a595e7859986445 # frozen: v9.7.0
hooks:
- id: cspell # Spell check changed files
exclude: .config/cspell.config.yaml

View File

@@ -0,0 +1,139 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
CanProcess(CanProcess const&) = delete;
CanProcess&
operator=(CanProcess const&) = delete;
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -199,7 +199,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>

View File

@@ -23,13 +23,13 @@ public:
static constexpr size_t initialBufferSize = kilobytes(256);
RawStateTable()
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
initialBufferSize)}
: monotonic_resource_{
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, items_{monotonic_resource_.get()} {};
RawStateTable(RawStateTable const& rhs)
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
initialBufferSize)}
: monotonic_resource_{
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, items_{rhs.items_, monotonic_resource_.get()}
, dropsDestroyed_{rhs.dropsDestroyed_} {};

View File

@@ -35,6 +35,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, char const* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -72,8 +72,8 @@ OpenView::OpenView(
ReadView const* base,
Rules const& rules,
std::shared_ptr<void const> hold)
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
initialBufferSize)}
: monotonic_resource_{
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, txs_{monotonic_resource_.get()}
, rules_(rules)
, header_(base->header())
@@ -88,8 +88,8 @@ OpenView::OpenView(
}
OpenView::OpenView(ReadView const* base, std::shared_ptr<void const> hold)
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
initialBufferSize)}
: monotonic_resource_{
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
, txs_{monotonic_resource_.get()}
, rules_(base->rules())
, header_(base->header())

View File

@@ -133,9 +133,9 @@ STVar::constructST(SerializedTypeID id, int depth, Args&&... args)
{
construct<T>(std::forward<Args>(args)...);
}
else if constexpr (std::is_same_v<
std::tuple<std::remove_cvref_t<Args>...>,
std::tuple<SerialIter, SField>>)
else if constexpr (
std::
is_same_v<std::tuple<std::remove_cvref_t<Args>...>, std::tuple<SerialIter, SField>>)
{
construct<T>(std::forward<Args>(args)..., depth);
}

View File

@@ -180,8 +180,9 @@ ammAccountHolds(ReadView const& view, AccountID const& ammAccountID, Issue const
if (auto const sle = view.read(keylet::account(ammAccountID)))
return (*sle)[sfBalance];
}
else if (auto const sle = view.read(keylet::line(ammAccountID, issue.account, issue.currency));
sle && !isFrozen(view, ammAccountID, issue.currency, issue.account))
else if (
auto const sle = view.read(keylet::line(ammAccountID, issue.account, issue.currency));
sle && !isFrozen(view, ammAccountID, issue.currency, issue.account))
{
auto amount = (*sle)[sfBalance];
if (ammAccountID > issue.account)

View File

@@ -42,8 +42,9 @@ AMMVote::preclaim(PreclaimContext const& ctx)
}
else if (ammSle->getFieldAmount(sfLPTokenBalance) == beast::zero)
return tecAMM_EMPTY;
else if (auto const lpTokensNew = ammLPHolds(ctx.view, *ammSle, ctx.tx[sfAccount], ctx.j);
lpTokensNew == beast::zero)
else if (
auto const lpTokensNew = ammLPHolds(ctx.view, *ammSle, ctx.tx[sfAccount], ctx.j);
lpTokensNew == beast::zero)
{
JLOG(ctx.j.debug()) << "AMM Vote: account is not LP.";
return tecAMM_INVALID_TOKENS;

View File

@@ -84,11 +84,12 @@ LoanSet::preflight(PreflightContext const& ctx)
!validNumericMinimum(paymentInterval, LoanSet::minPaymentInterval))
return temINVALID;
// Grace period is between min default value and payment interval
else if (auto const gracePeriod = tx[~sfGracePeriod]; //
!validNumericRange(
gracePeriod,
paymentInterval.value_or(LoanSet::defaultPaymentInterval),
defaultGracePeriod))
else if (
auto const gracePeriod = tx[~sfGracePeriod]; //
!validNumericRange(
gracePeriod,
paymentInterval.value_or(LoanSet::defaultPaymentInterval),
defaultGracePeriod))
return temINVALID;
// Copied from preflight2

View File

@@ -85,7 +85,12 @@ public:
}
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}

View File

@@ -0,0 +1,165 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2016 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/CanProcess.h>
#include <xrpl/beast/unit_test.h>
#include <memory>
namespace ripple {
namespace test {
struct CanProcess_test : beast::unit_test::suite
{
template <class Mutex, class Collection, class Item>
void
test(
std::string const& name,
Mutex& mtx,
Collection& collection,
std::vector<Item> const& items)
{
testcase(name);
if (!BEAST_EXPECT(!items.empty()))
return;
if (!BEAST_EXPECT(collection.empty()))
return;
// CanProcess objects can't be copied or moved. To make that easier,
// store shared_ptrs
std::vector<std::shared_ptr<CanProcess>> trackers;
// Fill up the vector with two CanProcess for each Item. The first
// inserts the item into the collection and is "good". The second does
// not and is "bad".
for (int i = 0; i < items.size(); ++i)
{
{
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
{
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
BEAST_EXPECT(collection.size() == i + 1);
}
BEAST_EXPECT(collection.size() == items.size());
// Now remove the items from the vector<CanProcess> two at a time, and
// try to get another CanProcess for that item.
for (int i = 0; i < items.size(); ++i)
{
// Remove the "bad" one in the second position
// This will have no effect on the collection
{
auto const iter = trackers.begin() + 1;
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size());
{
// Append a new "bad" one
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
// Remove the "good" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size() - 1);
{
// Append a new "good" one
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
}
// Now remove them all two at a time
for (int i = items.size() - 1; i >= 0; --i)
{
// Remove the "bad" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
// Remove the "good" one now in front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == 2 * i);
BEAST_EXPECT(collection.size() == i);
}
BEAST_EXPECT(trackers.empty());
BEAST_EXPECT(collection.empty());
}
void
run() override
{
{
std::mutex m;
std::set<int> collection;
std::vector<int> const items{1, 2, 3, 4, 5};
test("set of int", m, collection, items);
}
{
std::mutex m;
std::set<std::string> collection;
std::vector<std::string> const items{"one", "two", "three", "four", "five"};
test("set of string", m, collection, items);
}
{
std::mutex m;
std::unordered_set<char> collection;
std::vector<char> const items{'1', '2', '3', '4', '5'};
test("unorderd_set of char", m, collection, items);
}
{
std::mutex m;
std::unordered_set<std::uint64_t> collection;
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
test("unordered_set of uint64_t", m, collection, items);
}
}
};
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
} // namespace test
} // namespace ripple

View File

@@ -31,6 +31,7 @@
#include <xrpl/protocol/STTx.h>
#include <functional>
#include <future>
#include <source_location>
#include <string>
#include <tuple>
@@ -393,6 +394,48 @@ public:
return close(std::chrono::seconds(5));
}
/** Close and advance the ledger, then synchronize with the server's
io_context to ensure all async operations initiated by the close have
been started.
This function performs the same ledger close as close(), but additionally
ensures that all tasks posted to the server's io_context (such as
WebSocket subscription message sends) have been initiated before returning.
What it guarantees:
- All async operations posted before syncClose() have been STARTED
- For WebSocket sends: async_write_some() has been called
- The actual I/O completion may still be pending (async)
What it does NOT guarantee:
- Async operations have COMPLETED
- WebSocket messages have been received by clients
- However, for localhost connections, the remaining latency is typically
microseconds, making tests reliable
Use this instead of close() when:
- Test code immediately checks for subscription messages
- Race conditions between test and worker threads must be avoided
- Deterministic test behavior is required
@param timeout Maximum time to wait for the barrier task to execute
@return true if close succeeded and barrier executed within timeout,
false otherwise
*/
[[nodiscard]] bool
syncClose(std::chrono::steady_clock::duration timeout = std::chrono::seconds{1})
{
XRPL_ASSERT(
app().getNumberOfThreads() == 1,
"syncClose() is only useful on an application with a single thread");
auto const result = close();
auto serverBarrier = std::make_shared<std::promise<void>>();
auto future = serverBarrier->get_future();
boost::asio::post(app().getIOContext(), [serverBarrier]() { serverBarrier->set_value(); });
auto const status = future.wait_for(timeout);
return result && status == std::future_status::ready;
}
/** Turn on JSON tracing.
With no arguments, trace all
*/

View File

@@ -73,6 +73,8 @@ std::unique_ptr<Config> admin_localnet(std::unique_ptr<Config>);
std::unique_ptr<Config> secure_gateway_localnet(std::unique_ptr<Config>);
std::unique_ptr<Config> single_thread_io(std::unique_ptr<Config>);
/// @brief adjust configuration with params needed to be a validator
///
/// this is intended for use with envconfig, as in

View File

@@ -87,6 +87,12 @@ secure_gateway_localnet(std::unique_ptr<Config> cfg)
(*cfg)[PORT_WS].set("secure_gateway", "127.0.0.0/8");
return cfg;
}
std::unique_ptr<Config>
single_thread_io(std::unique_ptr<Config> cfg)
{
cfg->IO_WORKERS = 1;
return cfg;
}
auto constexpr defaultseed = "shUwVw52ofnCUX5m7kPTKzJdr4HEH";

View File

@@ -26,7 +26,7 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
Env env{*this, single_thread_io(envconfig())};
auto wsc = makeWSClient(env.app().config());
Json::Value stream;
@@ -92,7 +92,7 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
Env env{*this, single_thread_io(envconfig())};
auto wsc = makeWSClient(env.app().config());
Json::Value stream;
@@ -114,7 +114,7 @@ public:
{
// Accept a ledger
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -125,7 +125,7 @@ public:
{
// Accept another ledger
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream update
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -150,7 +150,7 @@ public:
{
using namespace std::chrono_literals;
using namespace jtx;
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto baseFee = env.current()->fees().base.drops();
auto wsc = makeWSClient(env.app().config());
Json::Value stream;
@@ -171,7 +171,7 @@ public:
{
env.fund(XRP(10000), "alice");
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -195,7 +195,7 @@ public:
}));
env.fund(XRP(10000), "bob");
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -249,12 +249,12 @@ public:
{
// Transaction that does not affect stream
env.fund(XRP(10000), "carol");
env.close();
BEAST_EXPECT(env.syncClose());
BEAST_EXPECT(!wsc->getMsg(10ms));
// Transactions concerning alice
env.trust(Account("bob")["USD"](100), "alice");
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream updates
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -288,6 +288,7 @@ public:
using namespace jtx;
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
cfg->FEES.reference_fee = 10;
cfg = single_thread_io(std::move(cfg));
return cfg;
}));
auto wsc = makeWSClient(env.app().config());
@@ -310,7 +311,7 @@ public:
{
env.fund(XRP(10000), "alice");
env.close();
BEAST_EXPECT(env.syncClose());
// Check stream update for payment transaction
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
@@ -360,7 +361,7 @@ public:
testManifests()
{
using namespace jtx;
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto wsc = makeWSClient(env.app().config());
Json::Value stream;
@@ -394,7 +395,7 @@ public:
{
using namespace jtx;
Env env{*this, envconfig(validator, ""), features};
Env env{*this, single_thread_io(envconfig(validator, "")), features};
auto& cfg = env.app().config();
if (!BEAST_EXPECT(cfg.section(SECTION_VALIDATION_SEED).empty()))
return;
@@ -483,7 +484,7 @@ public:
// at least one flag ledger.
while (env.closed()->header().seq < 300)
{
env.close();
BEAST_EXPECT(env.syncClose());
using namespace std::chrono_literals;
BEAST_EXPECT(wsc->findMsg(5s, validValidationFields));
}
@@ -505,7 +506,7 @@ public:
{
using namespace jtx;
testcase("Subscribe by url");
Env env{*this};
Env env{*this, single_thread_io(envconfig())};
Json::Value jv;
jv[jss::url] = "http://localhost/events";
@@ -536,7 +537,7 @@ public:
auto const method = subscribe ? "subscribe" : "unsubscribe";
testcase << "Error cases for " << method;
Env env{*this};
Env env{*this, single_thread_io(envconfig())};
auto wsc = makeWSClient(env.app().config());
{
@@ -572,7 +573,7 @@ public:
}
{
Env env_nonadmin{*this, no_admin(envconfig())};
Env env_nonadmin{*this, single_thread_io(no_admin(envconfig()))};
Json::Value jv;
jv[jss::url] = "no-url";
auto jr = env_nonadmin.rpc("json", method, to_string(jv))[jss::result];
@@ -834,12 +835,13 @@ public:
* send payments between the two accounts a and b,
* and close ledgersToClose ledgers
*/
auto sendPayments = [](Env& env,
Account const& a,
Account const& b,
int newTxns,
std::uint32_t ledgersToClose,
int numXRP = 10) {
auto sendPayments = [this](
Env& env,
Account const& a,
Account const& b,
int newTxns,
std::uint32_t ledgersToClose,
int numXRP = 10) {
env.memoize(a);
env.memoize(b);
for (int i = 0; i < newTxns; ++i)
@@ -852,7 +854,7 @@ public:
jtx::sig(jtx::autofill));
}
for (int i = 0; i < ledgersToClose; ++i)
env.close();
BEAST_EXPECT(env.syncClose());
return newTxns;
};
@@ -945,7 +947,7 @@ public:
*
* also test subscribe to the account before it is created
*/
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto wscTxHistory = makeWSClient(env.app().config());
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
@@ -988,7 +990,7 @@ public:
* subscribe genesis account tx history without txns
* subscribe to bob's account after it is created
*/
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto wscTxHistory = makeWSClient(env.app().config());
Json::Value request;
request[jss::account_history_tx_stream] = Json::objectValue;
@@ -998,6 +1000,7 @@ public:
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec genesisFullHistoryVec;
BEAST_EXPECT(env.syncClose());
if (!BEAST_EXPECT(!getTxHash(*wscTxHistory, genesisFullHistoryVec, 1).first))
return;
@@ -1016,6 +1019,7 @@ public:
if (!BEAST_EXPECT(goodSubRPC(jv)))
return;
IdxHashVec bobFullHistoryVec;
BEAST_EXPECT(env.syncClose());
r = getTxHash(*wscTxHistory, bobFullHistoryVec, 1);
if (!BEAST_EXPECT(r.first && r.second))
return;
@@ -1050,6 +1054,7 @@ public:
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
jv = wscTxHistory->invoke("subscribe", request);
genesisFullHistoryVec.clear();
BEAST_EXPECT(env.syncClose());
BEAST_EXPECT(getTxHash(*wscTxHistory, genesisFullHistoryVec, 31).second);
jv = wscTxHistory->invoke("unsubscribe", request);
@@ -1062,13 +1067,13 @@ public:
* subscribe account and subscribe account tx history
* and compare txns streamed
*/
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto wscAccount = makeWSClient(env.app().config());
auto wscTxHistory = makeWSClient(env.app().config());
std::array<Account, 2> accounts = {alice, bob};
env.fund(XRP(222222), accounts);
env.close();
BEAST_EXPECT(env.syncClose());
// subscribe account
Json::Value stream = Json::objectValue;
@@ -1131,18 +1136,18 @@ public:
* alice issues USD to carol
* mix USD and XRP payments
*/
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
auto const USD_a = alice["USD"];
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(333333), accounts);
env.trust(USD_a(20000), carol);
env.close();
BEAST_EXPECT(env.syncClose());
auto mixedPayments = [&]() -> int {
sendPayments(env, alice, carol, 1, 0);
env(pay(alice, carol, USD_a(100)));
env.close();
BEAST_EXPECT(env.syncClose());
return 2;
};
@@ -1152,6 +1157,7 @@ public:
request[jss::account_history_tx_stream][jss::account] = carol.human();
auto ws = makeWSClient(env.app().config());
auto jv = ws->invoke("subscribe", request);
BEAST_EXPECT(env.syncClose());
{
// take out existing txns from the stream
IdxHashVec tempVec;
@@ -1169,10 +1175,10 @@ public:
/*
* long transaction history
*/
Env env(*this);
Env env(*this, single_thread_io(envconfig()));
std::array<Account, 2> accounts = {alice, carol};
env.fund(XRP(444444), accounts);
env.close();
BEAST_EXPECT(env.syncClose());
// many payments, and close lots of ledgers
auto oneRound = [&](int numPayments) {
@@ -1185,6 +1191,7 @@ public:
request[jss::account_history_tx_stream][jss::account] = carol.human();
auto wscLong = makeWSClient(env.app().config());
auto jv = wscLong->invoke("subscribe", request);
BEAST_EXPECT(env.syncClose());
{
// take out existing txns from the stream
IdxHashVec tempVec;
@@ -1222,7 +1229,7 @@ public:
jtx::testable_amendments() | featurePermissionedDomains | featureCredentials |
featurePermissionedDEX};
Env env(*this, all);
Env env(*this, single_thread_io(envconfig()), all);
PermissionedDEX permDex(env);
auto const alice = permDex.alice;
auto const bob = permDex.bob;
@@ -1241,10 +1248,10 @@ public:
if (!BEAST_EXPECT(jv[jss::status] == "success"))
return;
env(offer(alice, XRP(10), USD(10)), domain(domainID), txflags(tfHybrid));
env.close();
BEAST_EXPECT(env.syncClose());
env(pay(bob, carol, USD(5)), path(~USD), sendmax(XRP(5)), domain(domainID));
env.close();
BEAST_EXPECT(env.syncClose());
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
if (jv[jss::changes].size() != 1)
@@ -1284,9 +1291,9 @@ public:
Account const bob{"bob"};
Account const broker{"broker"};
Env env{*this, features};
Env env{*this, single_thread_io(envconfig()), features};
env.fund(XRP(10000), alice, bob, broker);
env.close();
BEAST_EXPECT(env.syncClose());
auto wsc = test::makeWSClient(env.app().config());
Json::Value stream;
@@ -1350,12 +1357,12 @@ public:
// Verify the NFTokenIDs are correct in the NFTokenMint tx meta
uint256 const nftId1{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId1);
uint256 const nftId2{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId2);
// Alice creates one sell offer for each NFT
@@ -1363,32 +1370,32 @@ public:
// meta
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId1, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex1);
uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId2, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex2);
// Alice cancels two offers she created
// Verify the NFTokenIDs are correct in the NFTokenCancelOffer tx
// meta
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenIDsInCancelOffer({nftId1, nftId2});
// Bobs creates a buy offer for nftId1
// Verify the offer id is correct in the NFTokenCreateOffer tx meta
auto const bobBuyOfferIndex = keylet::nftoffer(bob, env.seq(bob)).key;
env(token::createOffer(bob, nftId1, drops(1)), token::owner(alice));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(bobBuyOfferIndex);
// Alice accepts bob's buy offer
// Verify the NFTokenID is correct in the NFTokenAcceptOffer tx meta
env(token::acceptBuyOffer(alice, bobBuyOfferIndex));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId1);
}
@@ -1397,7 +1404,7 @@ public:
// Alice mints a NFT
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);
// Alice creates sell offer and set broker as destination
@@ -1405,18 +1412,18 @@ public:
env(token::createOffer(alice, nftId, drops(1)),
token::destination(broker),
txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(offerAliceToBroker);
// Bob creates buy offer
uint256 const offerBobToBroker = keylet::nftoffer(bob, env.seq(bob)).key;
env(token::createOffer(bob, nftId, drops(1)), token::owner(alice));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(offerBobToBroker);
// Check NFTokenID meta for NFTokenAcceptOffer in brokered mode
env(token::brokerOffers(broker, offerBobToBroker, offerAliceToBroker));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);
}
@@ -1426,24 +1433,24 @@ public:
// Alice mints a NFT
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
env(token::mint(alice, 0u), txflags(tfTransferable));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenID(nftId);
// Alice creates 2 sell offers for the same NFT
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex1);
uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceOfferIndex2);
// Make sure the metadata only has 1 nft id, since both offers are
// for the same nft
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenIDsInCancelOffer({nftId});
}
@@ -1451,7 +1458,7 @@ public:
{
uint256 const aliceMintWithOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
env(token::mint(alice), token::amount(XRP(0)));
env.close();
BEAST_EXPECT(env.syncClose());
verifyNFTokenOfferID(aliceMintWithOfferIndex1);
}
}

View File

@@ -107,10 +107,8 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL1", hash, 0, InboundLedger::Reason::CONSENSUS);
}
return std::nullopt;
}
@@ -985,7 +983,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -117,12 +117,8 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
{
JLOG(j_.warn()) << "Need validated ledger for preferred ledger analysis " << hash;
Application* pApp = &app_;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(hash, 0, InboundLedger::Reason::CONSENSUS);
});
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL2", hash, 0, InboundLedger::Reason::CONSENSUS);
return std::nullopt;
}

View File

@@ -26,7 +26,12 @@ public:
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) = 0;
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

View File

@@ -353,7 +353,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true;

View File

@@ -2,9 +2,9 @@
#include <xrpld/app/ledger/LedgerMaster.h>
#include <xrpld/app/main/Application.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/core/PerfLog.h>
@@ -59,12 +59,15 @@ public:
(reason != InboundLedger::Reason::CONSENSUS))
return {};
std::stringstream ss;
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -83,47 +86,61 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
}
void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (auto check = std::make_shared<CanProcess const>(acquiresMutex_, pendingAcquires_, hash);
*check)
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock unlock(lock);
acquire(hash, seq, reason);
app_.getJobQueue().addJob(type, name, [check, name, hash, seq, reason, this]() {
JLOG(j_.debug()) << "JOB acquireAsync " << name << " started ";
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new "
"inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
});
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -907,8 +907,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< to_short_string(ledger->header().hash) << ") with >= " << minVal
<< " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -13,7 +13,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timeouts_(0)
, complete_(false)
@@ -33,6 +34,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -40,6 +42,10 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,6 +3,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -103,6 +104,7 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -1072,6 +1072,12 @@ public:
return trapTxID_;
}
size_t
getNumberOfThreads() const override
{
return get_number_of_threads();
}
private:
// For a newly-started validator, this is the greatest persisted ledger
// and new validations must be greater than this.

View File

@@ -157,6 +157,10 @@ public:
* than the last ledger it persisted. */
virtual LedgerIndex
getMaxDisallowedLedger() = 0;
/** Returns the number of io_context (I/O worker) threads used by the application. */
virtual size_t
getNumberOfThreads() const = 0;
};
std::unique_ptr<Application>

View File

@@ -23,4 +23,10 @@ public:
{
return io_context_;
}
size_t
get_number_of_threads() const
{
return threads_.size();
}
};

View File

@@ -30,10 +30,10 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/NetworkIDService.h>
@@ -396,7 +396,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, char const* reason) override;
bool
isBlocked() override;
@@ -841,7 +841,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -984,7 +984,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1008,7 +1008,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1018,9 +1018,9 @@ NetworkOPsImp::processHeartbeatTimer()
auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
auto newMode = mMode.load();
if (origMode != newMode)
{
@@ -1710,7 +1710,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1741,7 +1741,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1837,7 +1837,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -1922,8 +1922,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2052,7 +2052,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
@@ -2065,7 +2065,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.timeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -2077,7 +2077,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2379,7 +2379,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2399,11 +2399,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om)
return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2412,32 +2413,24 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
scope_unlock unlock(lock);
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation " << val->getLedgerHash();
}
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);