mirror of
https://github.com/XRPLF/rippled.git
synced 2026-03-04 20:02:29 +00:00
Compare commits
52 Commits
legleux/te
...
ximinez/ac
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
75e134a1fd | ||
|
|
fcec31ed20 | ||
|
|
828bcb3a7d | ||
|
|
0abd762781 | ||
|
|
5300e65686 | ||
|
|
6fc972746d | ||
|
|
930afbdea8 | ||
|
|
95c5bef48b | ||
|
|
b489b6c3ce | ||
|
|
70765acef4 | ||
|
|
77aa90bd0e | ||
|
|
4758bb6dc9 | ||
|
|
46e3dcb5fb | ||
|
|
d57579f10b | ||
|
|
d8dd376d1c | ||
|
|
a8c03e2e6c | ||
|
|
2167a66bc7 | ||
|
|
ed948a858c | ||
|
|
608c102743 | ||
|
|
36d1607a4e | ||
|
|
53ebb86d60 | ||
|
|
1d989bc6de | ||
|
|
64c0cb8c7e | ||
|
|
c77cfef41c | ||
|
|
08aa8c06d1 | ||
|
|
9498672f8e | ||
|
|
e91d55a0e0 | ||
|
|
afdc452cfc | ||
|
|
a0d4ef1a54 | ||
|
|
8bc384f8bf | ||
|
|
bd961c484b | ||
|
|
aee242a8d4 | ||
|
|
fcae74de58 | ||
|
|
a56effcb00 | ||
|
|
64c2eca465 | ||
|
|
e56f750e1d | ||
|
|
fde000f3eb | ||
|
|
d0a62229da | ||
|
|
d5932cc7d4 | ||
|
|
0b534da781 | ||
|
|
71a70d343b | ||
|
|
0899e65030 | ||
|
|
31ba529761 | ||
|
|
e2c6e5ebb6 | ||
|
|
9d807fce48 | ||
|
|
9ef160765c | ||
|
|
d6c0eb243b | ||
|
|
84c9fc123c | ||
|
|
00a2a58cfa | ||
|
|
bb2098d873 | ||
|
|
46a5bc74db | ||
|
|
7b72b9cc82 |
@@ -177,7 +177,7 @@ jobs:
|
||||
|
||||
- name: Upload the binary (Linux)
|
||||
if: ${{ github.repository_owner == 'XRPLF' && runner.os == 'Linux' }}
|
||||
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
|
||||
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
|
||||
with:
|
||||
name: xrpld-${{ inputs.config_name }}
|
||||
path: ${{ env.BUILD_DIR }}/xrpld
|
||||
|
||||
@@ -84,7 +84,7 @@ jobs:
|
||||
|
||||
- name: Upload clang-tidy output
|
||||
if: steps.run_clang_tidy.outcome != 'success'
|
||||
uses: actions/upload-artifact@b7c566a772e6b6bfb58ed0dc250532a479d7789f # v6.0.0
|
||||
uses: actions/upload-artifact@bbbca2ddaa5d8feaa63e36b76fdaad77386f024f # v7.0.0
|
||||
with:
|
||||
name: clang-tidy-results
|
||||
path: clang-tidy-output.txt
|
||||
|
||||
@@ -20,7 +20,7 @@ repos:
|
||||
args: [--assume-in-merge]
|
||||
|
||||
- repo: https://github.com/pre-commit/mirrors-clang-format
|
||||
rev: 75ca4ad908dc4a99f57921f29b7e6c1521e10b26 # frozen: v21.1.8
|
||||
rev: cd481d7b0bfb5c7b3090c21846317f9a8262e891 # frozen: v22.1.0
|
||||
hooks:
|
||||
- id: clang-format
|
||||
args: [--style=file]
|
||||
@@ -33,17 +33,17 @@ repos:
|
||||
additional_dependencies: [PyYAML]
|
||||
|
||||
- repo: https://github.com/rbubley/mirrors-prettier
|
||||
rev: 5ba47274f9b181bce26a5150a725577f3c336011 # frozen: v3.6.2
|
||||
rev: c2bc67fe8f8f549cc489e00ba8b45aa18ee713b1 # frozen: v3.8.1
|
||||
hooks:
|
||||
- id: prettier
|
||||
|
||||
- repo: https://github.com/psf/black-pre-commit-mirror
|
||||
rev: 831207fd435b47aeffdf6af853097e64322b4d44 # frozen: v25.12.0
|
||||
rev: ea488cebbfd88a5f50b8bd95d5c829d0bb76feb8 # frozen: 26.1.0
|
||||
hooks:
|
||||
- id: black
|
||||
|
||||
- repo: https://github.com/streetsidesoftware/cspell-cli
|
||||
rev: 1cfa010f078c354f3ffb8413616280cc28f5ba21 # frozen: v9.4.0
|
||||
rev: a42085ade523f591dca134379a595e7859986445 # frozen: v9.7.0
|
||||
hooks:
|
||||
- id: cspell # Spell check changed files
|
||||
exclude: .config/cspell.config.yaml
|
||||
|
||||
139
include/xrpl/basics/CanProcess.h
Normal file
139
include/xrpl/basics/CanProcess.h
Normal file
@@ -0,0 +1,139 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2024 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
|
||||
|
||||
#include <functional>
|
||||
#include <mutex>
|
||||
#include <set>
|
||||
|
||||
/** RAII class to check if an Item is already being processed on another thread,
|
||||
* as indicated by it's presence in a Collection.
|
||||
*
|
||||
* If the Item is not in the Collection, it will be added under lock in the
|
||||
* ctor, and removed under lock in the dtor. The object will be considered
|
||||
* "usable" and evaluate to `true`.
|
||||
*
|
||||
* If the Item is in the Collection, no changes will be made to the collection,
|
||||
* and the CanProcess object will be considered "unusable".
|
||||
*
|
||||
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
|
||||
* Process or skip a block of code, or set a flag.)
|
||||
*
|
||||
* The current use is to avoid lock contention that would be involved in
|
||||
* processing something associated with the Item.
|
||||
*
|
||||
* Examples:
|
||||
*
|
||||
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
|
||||
* {
|
||||
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
|
||||
* {
|
||||
* acquire(hash, ...);
|
||||
* }
|
||||
* }
|
||||
*
|
||||
* bool
|
||||
* NetworkOPsImp::recvValidation(
|
||||
* std::shared_ptr<STValidation> const& val,
|
||||
* std::string const& source)
|
||||
* {
|
||||
* CanProcess check(
|
||||
* validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
* BypassAccept bypassAccept =
|
||||
* check ? BypassAccept::no : BypassAccept::yes;
|
||||
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
|
||||
* }
|
||||
*
|
||||
*/
|
||||
class CanProcess
|
||||
{
|
||||
public:
|
||||
template <class Mutex, class Collection, class Item>
|
||||
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
|
||||
: cleanup_(insert(mtx, collection, item))
|
||||
{
|
||||
}
|
||||
|
||||
~CanProcess()
|
||||
{
|
||||
if (cleanup_)
|
||||
cleanup_();
|
||||
}
|
||||
|
||||
CanProcess(CanProcess const&) = delete;
|
||||
|
||||
CanProcess&
|
||||
operator=(CanProcess const&) = delete;
|
||||
|
||||
explicit
|
||||
operator bool() const
|
||||
{
|
||||
return static_cast<bool>(cleanup_);
|
||||
}
|
||||
|
||||
private:
|
||||
template <bool useIterator, class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
doInsert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
// TODO: Use structured binding once LLVM 16 is the minimum supported
|
||||
// version. See also: https://github.com/llvm/llvm-project/issues/48582
|
||||
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
|
||||
auto const insertResult = collection.insert(item);
|
||||
auto const it = insertResult.first;
|
||||
if (!insertResult.second)
|
||||
return {};
|
||||
if constexpr (useIterator)
|
||||
return [&, it]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(it);
|
||||
};
|
||||
else
|
||||
return [&]() {
|
||||
std::unique_lock<Mutex> lock(mtx);
|
||||
collection.erase(item);
|
||||
};
|
||||
}
|
||||
|
||||
// Generic insert() function doesn't use iterators because they may get
|
||||
// invalidated
|
||||
template <class Mutex, class Collection, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, Collection& collection, Item const& item)
|
||||
{
|
||||
return doInsert<false>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// Specialize insert() for std::set, which does not invalidate iterators for
|
||||
// insert and erase
|
||||
template <class Mutex, class Item>
|
||||
std::function<void()>
|
||||
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
|
||||
{
|
||||
return doInsert<true>(mtx, collection, item);
|
||||
}
|
||||
|
||||
// If set, then the item is "usable"
|
||||
std::function<void()> cleanup_;
|
||||
};
|
||||
|
||||
#endif
|
||||
@@ -199,7 +199,7 @@ public:
|
||||
|
||||
/** Add a suppression peer and get message's relay status.
|
||||
* Return pair:
|
||||
* element 1: true if the peer is added.
|
||||
* element 1: true if the key is added.
|
||||
* element 2: optional is seated to the relay time point or
|
||||
* is unseated if has not relayed yet. */
|
||||
std::pair<bool, std::optional<Stopwatch::time_point>>
|
||||
|
||||
@@ -23,13 +23,13 @@ public:
|
||||
static constexpr size_t initialBufferSize = kilobytes(256);
|
||||
|
||||
RawStateTable()
|
||||
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
|
||||
initialBufferSize)}
|
||||
: monotonic_resource_{
|
||||
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
|
||||
, items_{monotonic_resource_.get()} {};
|
||||
|
||||
RawStateTable(RawStateTable const& rhs)
|
||||
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
|
||||
initialBufferSize)}
|
||||
: monotonic_resource_{
|
||||
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
|
||||
, items_{rhs.items_, monotonic_resource_.get()}
|
||||
, dropsDestroyed_{rhs.dropsDestroyed_} {};
|
||||
|
||||
|
||||
@@ -35,6 +35,8 @@ struct LedgerHeader
|
||||
|
||||
// If validated is false, it means "not yet validated."
|
||||
// Once validated is true, it will never be set false at a later time.
|
||||
// NOTE: If you are accessing this directly, you are probably doing it
|
||||
// wrong. Use LedgerMaster::isValidated().
|
||||
// VFALCO TODO Make this not mutable
|
||||
bool mutable validated = false;
|
||||
bool accepted = false;
|
||||
|
||||
@@ -185,7 +185,7 @@ public:
|
||||
virtual bool
|
||||
isFull() = 0;
|
||||
virtual void
|
||||
setMode(OperatingMode om) = 0;
|
||||
setMode(OperatingMode om, char const* reason) = 0;
|
||||
virtual bool
|
||||
isBlocked() = 0;
|
||||
virtual bool
|
||||
|
||||
@@ -72,8 +72,8 @@ OpenView::OpenView(
|
||||
ReadView const* base,
|
||||
Rules const& rules,
|
||||
std::shared_ptr<void const> hold)
|
||||
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
|
||||
initialBufferSize)}
|
||||
: monotonic_resource_{
|
||||
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
|
||||
, txs_{monotonic_resource_.get()}
|
||||
, rules_(rules)
|
||||
, header_(base->header())
|
||||
@@ -88,8 +88,8 @@ OpenView::OpenView(
|
||||
}
|
||||
|
||||
OpenView::OpenView(ReadView const* base, std::shared_ptr<void const> hold)
|
||||
: monotonic_resource_{std::make_unique<boost::container::pmr::monotonic_buffer_resource>(
|
||||
initialBufferSize)}
|
||||
: monotonic_resource_{
|
||||
std::make_unique<boost::container::pmr::monotonic_buffer_resource>(initialBufferSize)}
|
||||
, txs_{monotonic_resource_.get()}
|
||||
, rules_(base->rules())
|
||||
, header_(base->header())
|
||||
|
||||
@@ -133,9 +133,9 @@ STVar::constructST(SerializedTypeID id, int depth, Args&&... args)
|
||||
{
|
||||
construct<T>(std::forward<Args>(args)...);
|
||||
}
|
||||
else if constexpr (std::is_same_v<
|
||||
std::tuple<std::remove_cvref_t<Args>...>,
|
||||
std::tuple<SerialIter, SField>>)
|
||||
else if constexpr (
|
||||
std::
|
||||
is_same_v<std::tuple<std::remove_cvref_t<Args>...>, std::tuple<SerialIter, SField>>)
|
||||
{
|
||||
construct<T>(std::forward<Args>(args)..., depth);
|
||||
}
|
||||
|
||||
@@ -180,8 +180,9 @@ ammAccountHolds(ReadView const& view, AccountID const& ammAccountID, Issue const
|
||||
if (auto const sle = view.read(keylet::account(ammAccountID)))
|
||||
return (*sle)[sfBalance];
|
||||
}
|
||||
else if (auto const sle = view.read(keylet::line(ammAccountID, issue.account, issue.currency));
|
||||
sle && !isFrozen(view, ammAccountID, issue.currency, issue.account))
|
||||
else if (
|
||||
auto const sle = view.read(keylet::line(ammAccountID, issue.account, issue.currency));
|
||||
sle && !isFrozen(view, ammAccountID, issue.currency, issue.account))
|
||||
{
|
||||
auto amount = (*sle)[sfBalance];
|
||||
if (ammAccountID > issue.account)
|
||||
|
||||
@@ -42,8 +42,9 @@ AMMVote::preclaim(PreclaimContext const& ctx)
|
||||
}
|
||||
else if (ammSle->getFieldAmount(sfLPTokenBalance) == beast::zero)
|
||||
return tecAMM_EMPTY;
|
||||
else if (auto const lpTokensNew = ammLPHolds(ctx.view, *ammSle, ctx.tx[sfAccount], ctx.j);
|
||||
lpTokensNew == beast::zero)
|
||||
else if (
|
||||
auto const lpTokensNew = ammLPHolds(ctx.view, *ammSle, ctx.tx[sfAccount], ctx.j);
|
||||
lpTokensNew == beast::zero)
|
||||
{
|
||||
JLOG(ctx.j.debug()) << "AMM Vote: account is not LP.";
|
||||
return tecAMM_INVALID_TOKENS;
|
||||
|
||||
@@ -84,11 +84,12 @@ LoanSet::preflight(PreflightContext const& ctx)
|
||||
!validNumericMinimum(paymentInterval, LoanSet::minPaymentInterval))
|
||||
return temINVALID;
|
||||
// Grace period is between min default value and payment interval
|
||||
else if (auto const gracePeriod = tx[~sfGracePeriod]; //
|
||||
!validNumericRange(
|
||||
gracePeriod,
|
||||
paymentInterval.value_or(LoanSet::defaultPaymentInterval),
|
||||
defaultGracePeriod))
|
||||
else if (
|
||||
auto const gracePeriod = tx[~sfGracePeriod]; //
|
||||
!validNumericRange(
|
||||
gracePeriod,
|
||||
paymentInterval.value_or(LoanSet::defaultPaymentInterval),
|
||||
defaultGracePeriod))
|
||||
return temINVALID;
|
||||
|
||||
// Copied from preflight2
|
||||
|
||||
@@ -85,7 +85,12 @@ public:
|
||||
}
|
||||
|
||||
virtual void
|
||||
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) override
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
165
src/test/basics/CanProcess_test.cpp
Normal file
165
src/test/basics/CanProcess_test.cpp
Normal file
@@ -0,0 +1,165 @@
|
||||
//------------------------------------------------------------------------------
|
||||
/*
|
||||
This file is part of rippled: https://github.com/ripple/rippled
|
||||
Copyright (c) 2012-2016 Ripple Labs Inc.
|
||||
|
||||
Permission to use, copy, modify, and/or distribute this software for any
|
||||
purpose with or without fee is hereby granted, provided that the above
|
||||
copyright notice and this permission notice appear in all copies.
|
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
|
||||
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
|
||||
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
|
||||
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
|
||||
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
|
||||
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
|
||||
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
|
||||
*/
|
||||
//==============================================================================
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/beast/unit_test.h>
|
||||
|
||||
#include <memory>
|
||||
|
||||
namespace ripple {
|
||||
namespace test {
|
||||
|
||||
struct CanProcess_test : beast::unit_test::suite
|
||||
{
|
||||
template <class Mutex, class Collection, class Item>
|
||||
void
|
||||
test(
|
||||
std::string const& name,
|
||||
Mutex& mtx,
|
||||
Collection& collection,
|
||||
std::vector<Item> const& items)
|
||||
{
|
||||
testcase(name);
|
||||
|
||||
if (!BEAST_EXPECT(!items.empty()))
|
||||
return;
|
||||
if (!BEAST_EXPECT(collection.empty()))
|
||||
return;
|
||||
|
||||
// CanProcess objects can't be copied or moved. To make that easier,
|
||||
// store shared_ptrs
|
||||
std::vector<std::shared_ptr<CanProcess>> trackers;
|
||||
// Fill up the vector with two CanProcess for each Item. The first
|
||||
// inserts the item into the collection and is "good". The second does
|
||||
// not and is "bad".
|
||||
for (int i = 0; i < items.size(); ++i)
|
||||
{
|
||||
{
|
||||
auto const& good =
|
||||
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(*good);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
{
|
||||
auto const& bad =
|
||||
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(!*bad);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
}
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
// Now remove the items from the vector<CanProcess> two at a time, and
|
||||
// try to get another CanProcess for that item.
|
||||
for (int i = 0; i < items.size(); ++i)
|
||||
{
|
||||
// Remove the "bad" one in the second position
|
||||
// This will have no effect on the collection
|
||||
{
|
||||
auto const iter = trackers.begin() + 1;
|
||||
BEAST_EXPECT(!**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
{
|
||||
// Append a new "bad" one
|
||||
auto const& bad =
|
||||
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(!*bad);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * items.size());
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
|
||||
// Remove the "good" one from the front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
|
||||
BEAST_EXPECT(collection.size() == items.size() - 1);
|
||||
{
|
||||
// Append a new "good" one
|
||||
auto const& good =
|
||||
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
|
||||
BEAST_EXPECT(*good);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * items.size());
|
||||
BEAST_EXPECT(collection.size() == items.size());
|
||||
}
|
||||
// Now remove them all two at a time
|
||||
for (int i = items.size() - 1; i >= 0; --i)
|
||||
{
|
||||
// Remove the "bad" one from the front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(!**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
|
||||
BEAST_EXPECT(collection.size() == i + 1);
|
||||
// Remove the "good" one now in front
|
||||
{
|
||||
auto const iter = trackers.begin();
|
||||
BEAST_EXPECT(**iter);
|
||||
trackers.erase(iter);
|
||||
}
|
||||
BEAST_EXPECT(trackers.size() == 2 * i);
|
||||
BEAST_EXPECT(collection.size() == i);
|
||||
}
|
||||
BEAST_EXPECT(trackers.empty());
|
||||
BEAST_EXPECT(collection.empty());
|
||||
}
|
||||
|
||||
void
|
||||
run() override
|
||||
{
|
||||
{
|
||||
std::mutex m;
|
||||
std::set<int> collection;
|
||||
std::vector<int> const items{1, 2, 3, 4, 5};
|
||||
test("set of int", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::set<std::string> collection;
|
||||
std::vector<std::string> const items{"one", "two", "three", "four", "five"};
|
||||
test("set of string", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::unordered_set<char> collection;
|
||||
std::vector<char> const items{'1', '2', '3', '4', '5'};
|
||||
test("unorderd_set of char", m, collection, items);
|
||||
}
|
||||
{
|
||||
std::mutex m;
|
||||
std::unordered_set<std::uint64_t> collection;
|
||||
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
|
||||
test("unordered_set of uint64_t", m, collection, items);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
|
||||
|
||||
} // namespace test
|
||||
} // namespace ripple
|
||||
@@ -31,6 +31,7 @@
|
||||
#include <xrpl/protocol/STTx.h>
|
||||
|
||||
#include <functional>
|
||||
#include <future>
|
||||
#include <source_location>
|
||||
#include <string>
|
||||
#include <tuple>
|
||||
@@ -393,6 +394,48 @@ public:
|
||||
return close(std::chrono::seconds(5));
|
||||
}
|
||||
|
||||
/** Close and advance the ledger, then synchronize with the server's
|
||||
io_context to ensure all async operations initiated by the close have
|
||||
been started.
|
||||
|
||||
This function performs the same ledger close as close(), but additionally
|
||||
ensures that all tasks posted to the server's io_context (such as
|
||||
WebSocket subscription message sends) have been initiated before returning.
|
||||
|
||||
What it guarantees:
|
||||
- All async operations posted before syncClose() have been STARTED
|
||||
- For WebSocket sends: async_write_some() has been called
|
||||
- The actual I/O completion may still be pending (async)
|
||||
|
||||
What it does NOT guarantee:
|
||||
- Async operations have COMPLETED
|
||||
- WebSocket messages have been received by clients
|
||||
- However, for localhost connections, the remaining latency is typically
|
||||
microseconds, making tests reliable
|
||||
|
||||
Use this instead of close() when:
|
||||
- Test code immediately checks for subscription messages
|
||||
- Race conditions between test and worker threads must be avoided
|
||||
- Deterministic test behavior is required
|
||||
|
||||
@param timeout Maximum time to wait for the barrier task to execute
|
||||
@return true if close succeeded and barrier executed within timeout,
|
||||
false otherwise
|
||||
*/
|
||||
[[nodiscard]] bool
|
||||
syncClose(std::chrono::steady_clock::duration timeout = std::chrono::seconds{1})
|
||||
{
|
||||
XRPL_ASSERT(
|
||||
app().getNumberOfThreads() == 1,
|
||||
"syncClose() is only useful on an application with a single thread");
|
||||
auto const result = close();
|
||||
auto serverBarrier = std::make_shared<std::promise<void>>();
|
||||
auto future = serverBarrier->get_future();
|
||||
boost::asio::post(app().getIOContext(), [serverBarrier]() { serverBarrier->set_value(); });
|
||||
auto const status = future.wait_for(timeout);
|
||||
return result && status == std::future_status::ready;
|
||||
}
|
||||
|
||||
/** Turn on JSON tracing.
|
||||
With no arguments, trace all
|
||||
*/
|
||||
|
||||
@@ -73,6 +73,8 @@ std::unique_ptr<Config> admin_localnet(std::unique_ptr<Config>);
|
||||
|
||||
std::unique_ptr<Config> secure_gateway_localnet(std::unique_ptr<Config>);
|
||||
|
||||
std::unique_ptr<Config> single_thread_io(std::unique_ptr<Config>);
|
||||
|
||||
/// @brief adjust configuration with params needed to be a validator
|
||||
///
|
||||
/// this is intended for use with envconfig, as in
|
||||
|
||||
@@ -87,6 +87,12 @@ secure_gateway_localnet(std::unique_ptr<Config> cfg)
|
||||
(*cfg)[PORT_WS].set("secure_gateway", "127.0.0.0/8");
|
||||
return cfg;
|
||||
}
|
||||
std::unique_ptr<Config>
|
||||
single_thread_io(std::unique_ptr<Config> cfg)
|
||||
{
|
||||
cfg->IO_WORKERS = 1;
|
||||
return cfg;
|
||||
}
|
||||
|
||||
auto constexpr defaultseed = "shUwVw52ofnCUX5m7kPTKzJdr4HEH";
|
||||
|
||||
|
||||
@@ -26,7 +26,7 @@ public:
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
Env env{*this, single_thread_io(envconfig())};
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
Json::Value stream;
|
||||
|
||||
@@ -92,7 +92,7 @@ public:
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
Env env{*this, single_thread_io(envconfig())};
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
Json::Value stream;
|
||||
|
||||
@@ -114,7 +114,7 @@ public:
|
||||
|
||||
{
|
||||
// Accept a ledger
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream update
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -125,7 +125,7 @@ public:
|
||||
|
||||
{
|
||||
// Accept another ledger
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream update
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -150,7 +150,7 @@ public:
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto baseFee = env.current()->fees().base.drops();
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
Json::Value stream;
|
||||
@@ -171,7 +171,7 @@ public:
|
||||
|
||||
{
|
||||
env.fund(XRP(10000), "alice");
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream update for payment transaction
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -195,7 +195,7 @@ public:
|
||||
}));
|
||||
|
||||
env.fund(XRP(10000), "bob");
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream update for payment transaction
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -249,12 +249,12 @@ public:
|
||||
{
|
||||
// Transaction that does not affect stream
|
||||
env.fund(XRP(10000), "carol");
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
BEAST_EXPECT(!wsc->getMsg(10ms));
|
||||
|
||||
// Transactions concerning alice
|
||||
env.trust(Account("bob")["USD"](100), "alice");
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream updates
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -288,6 +288,7 @@ public:
|
||||
using namespace jtx;
|
||||
Env env(*this, envconfig([](std::unique_ptr<Config> cfg) {
|
||||
cfg->FEES.reference_fee = 10;
|
||||
cfg = single_thread_io(std::move(cfg));
|
||||
return cfg;
|
||||
}));
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
@@ -310,7 +311,7 @@ public:
|
||||
|
||||
{
|
||||
env.fund(XRP(10000), "alice");
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// Check stream update for payment transaction
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
@@ -360,7 +361,7 @@ public:
|
||||
testManifests()
|
||||
{
|
||||
using namespace jtx;
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
Json::Value stream;
|
||||
|
||||
@@ -394,7 +395,7 @@ public:
|
||||
{
|
||||
using namespace jtx;
|
||||
|
||||
Env env{*this, envconfig(validator, ""), features};
|
||||
Env env{*this, single_thread_io(envconfig(validator, "")), features};
|
||||
auto& cfg = env.app().config();
|
||||
if (!BEAST_EXPECT(cfg.section(SECTION_VALIDATION_SEED).empty()))
|
||||
return;
|
||||
@@ -483,7 +484,7 @@ public:
|
||||
// at least one flag ledger.
|
||||
while (env.closed()->header().seq < 300)
|
||||
{
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
using namespace std::chrono_literals;
|
||||
BEAST_EXPECT(wsc->findMsg(5s, validValidationFields));
|
||||
}
|
||||
@@ -505,7 +506,7 @@ public:
|
||||
{
|
||||
using namespace jtx;
|
||||
testcase("Subscribe by url");
|
||||
Env env{*this};
|
||||
Env env{*this, single_thread_io(envconfig())};
|
||||
|
||||
Json::Value jv;
|
||||
jv[jss::url] = "http://localhost/events";
|
||||
@@ -536,7 +537,7 @@ public:
|
||||
auto const method = subscribe ? "subscribe" : "unsubscribe";
|
||||
testcase << "Error cases for " << method;
|
||||
|
||||
Env env{*this};
|
||||
Env env{*this, single_thread_io(envconfig())};
|
||||
auto wsc = makeWSClient(env.app().config());
|
||||
|
||||
{
|
||||
@@ -572,7 +573,7 @@ public:
|
||||
}
|
||||
|
||||
{
|
||||
Env env_nonadmin{*this, no_admin(envconfig())};
|
||||
Env env_nonadmin{*this, single_thread_io(no_admin(envconfig()))};
|
||||
Json::Value jv;
|
||||
jv[jss::url] = "no-url";
|
||||
auto jr = env_nonadmin.rpc("json", method, to_string(jv))[jss::result];
|
||||
@@ -834,12 +835,13 @@ public:
|
||||
* send payments between the two accounts a and b,
|
||||
* and close ledgersToClose ledgers
|
||||
*/
|
||||
auto sendPayments = [](Env& env,
|
||||
Account const& a,
|
||||
Account const& b,
|
||||
int newTxns,
|
||||
std::uint32_t ledgersToClose,
|
||||
int numXRP = 10) {
|
||||
auto sendPayments = [this](
|
||||
Env& env,
|
||||
Account const& a,
|
||||
Account const& b,
|
||||
int newTxns,
|
||||
std::uint32_t ledgersToClose,
|
||||
int numXRP = 10) {
|
||||
env.memoize(a);
|
||||
env.memoize(b);
|
||||
for (int i = 0; i < newTxns; ++i)
|
||||
@@ -852,7 +854,7 @@ public:
|
||||
jtx::sig(jtx::autofill));
|
||||
}
|
||||
for (int i = 0; i < ledgersToClose; ++i)
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
return newTxns;
|
||||
};
|
||||
|
||||
@@ -945,7 +947,7 @@ public:
|
||||
*
|
||||
* also test subscribe to the account before it is created
|
||||
*/
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto wscTxHistory = makeWSClient(env.app().config());
|
||||
Json::Value request;
|
||||
request[jss::account_history_tx_stream] = Json::objectValue;
|
||||
@@ -988,7 +990,7 @@ public:
|
||||
* subscribe genesis account tx history without txns
|
||||
* subscribe to bob's account after it is created
|
||||
*/
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto wscTxHistory = makeWSClient(env.app().config());
|
||||
Json::Value request;
|
||||
request[jss::account_history_tx_stream] = Json::objectValue;
|
||||
@@ -998,6 +1000,7 @@ public:
|
||||
if (!BEAST_EXPECT(goodSubRPC(jv)))
|
||||
return;
|
||||
IdxHashVec genesisFullHistoryVec;
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
if (!BEAST_EXPECT(!getTxHash(*wscTxHistory, genesisFullHistoryVec, 1).first))
|
||||
return;
|
||||
|
||||
@@ -1016,6 +1019,7 @@ public:
|
||||
if (!BEAST_EXPECT(goodSubRPC(jv)))
|
||||
return;
|
||||
IdxHashVec bobFullHistoryVec;
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
r = getTxHash(*wscTxHistory, bobFullHistoryVec, 1);
|
||||
if (!BEAST_EXPECT(r.first && r.second))
|
||||
return;
|
||||
@@ -1050,6 +1054,7 @@ public:
|
||||
"rHb9CJAWyB4rj91VRWn96DkukG4bwdtyTh";
|
||||
jv = wscTxHistory->invoke("subscribe", request);
|
||||
genesisFullHistoryVec.clear();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
BEAST_EXPECT(getTxHash(*wscTxHistory, genesisFullHistoryVec, 31).second);
|
||||
jv = wscTxHistory->invoke("unsubscribe", request);
|
||||
|
||||
@@ -1062,13 +1067,13 @@ public:
|
||||
* subscribe account and subscribe account tx history
|
||||
* and compare txns streamed
|
||||
*/
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto wscAccount = makeWSClient(env.app().config());
|
||||
auto wscTxHistory = makeWSClient(env.app().config());
|
||||
|
||||
std::array<Account, 2> accounts = {alice, bob};
|
||||
env.fund(XRP(222222), accounts);
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// subscribe account
|
||||
Json::Value stream = Json::objectValue;
|
||||
@@ -1131,18 +1136,18 @@ public:
|
||||
* alice issues USD to carol
|
||||
* mix USD and XRP payments
|
||||
*/
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
auto const USD_a = alice["USD"];
|
||||
|
||||
std::array<Account, 2> accounts = {alice, carol};
|
||||
env.fund(XRP(333333), accounts);
|
||||
env.trust(USD_a(20000), carol);
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
auto mixedPayments = [&]() -> int {
|
||||
sendPayments(env, alice, carol, 1, 0);
|
||||
env(pay(alice, carol, USD_a(100)));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
return 2;
|
||||
};
|
||||
|
||||
@@ -1152,6 +1157,7 @@ public:
|
||||
request[jss::account_history_tx_stream][jss::account] = carol.human();
|
||||
auto ws = makeWSClient(env.app().config());
|
||||
auto jv = ws->invoke("subscribe", request);
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
{
|
||||
// take out existing txns from the stream
|
||||
IdxHashVec tempVec;
|
||||
@@ -1169,10 +1175,10 @@ public:
|
||||
/*
|
||||
* long transaction history
|
||||
*/
|
||||
Env env(*this);
|
||||
Env env(*this, single_thread_io(envconfig()));
|
||||
std::array<Account, 2> accounts = {alice, carol};
|
||||
env.fund(XRP(444444), accounts);
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
// many payments, and close lots of ledgers
|
||||
auto oneRound = [&](int numPayments) {
|
||||
@@ -1185,6 +1191,7 @@ public:
|
||||
request[jss::account_history_tx_stream][jss::account] = carol.human();
|
||||
auto wscLong = makeWSClient(env.app().config());
|
||||
auto jv = wscLong->invoke("subscribe", request);
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
{
|
||||
// take out existing txns from the stream
|
||||
IdxHashVec tempVec;
|
||||
@@ -1222,7 +1229,7 @@ public:
|
||||
jtx::testable_amendments() | featurePermissionedDomains | featureCredentials |
|
||||
featurePermissionedDEX};
|
||||
|
||||
Env env(*this, all);
|
||||
Env env(*this, single_thread_io(envconfig()), all);
|
||||
PermissionedDEX permDex(env);
|
||||
auto const alice = permDex.alice;
|
||||
auto const bob = permDex.bob;
|
||||
@@ -1241,10 +1248,10 @@ public:
|
||||
if (!BEAST_EXPECT(jv[jss::status] == "success"))
|
||||
return;
|
||||
env(offer(alice, XRP(10), USD(10)), domain(domainID), txflags(tfHybrid));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
env(pay(bob, carol, USD(5)), path(~USD), sendmax(XRP(5)), domain(domainID));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
BEAST_EXPECT(wsc->findMsg(5s, [&](auto const& jv) {
|
||||
if (jv[jss::changes].size() != 1)
|
||||
@@ -1284,9 +1291,9 @@ public:
|
||||
Account const bob{"bob"};
|
||||
Account const broker{"broker"};
|
||||
|
||||
Env env{*this, features};
|
||||
Env env{*this, single_thread_io(envconfig()), features};
|
||||
env.fund(XRP(10000), alice, bob, broker);
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
|
||||
auto wsc = test::makeWSClient(env.app().config());
|
||||
Json::Value stream;
|
||||
@@ -1350,12 +1357,12 @@ public:
|
||||
// Verify the NFTokenIDs are correct in the NFTokenMint tx meta
|
||||
uint256 const nftId1{token::getNextID(env, alice, 0u, tfTransferable)};
|
||||
env(token::mint(alice, 0u), txflags(tfTransferable));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId1);
|
||||
|
||||
uint256 const nftId2{token::getNextID(env, alice, 0u, tfTransferable)};
|
||||
env(token::mint(alice, 0u), txflags(tfTransferable));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId2);
|
||||
|
||||
// Alice creates one sell offer for each NFT
|
||||
@@ -1363,32 +1370,32 @@ public:
|
||||
// meta
|
||||
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
|
||||
env(token::createOffer(alice, nftId1, drops(1)), txflags(tfSellNFToken));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(aliceOfferIndex1);
|
||||
|
||||
uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
|
||||
env(token::createOffer(alice, nftId2, drops(1)), txflags(tfSellNFToken));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(aliceOfferIndex2);
|
||||
|
||||
// Alice cancels two offers she created
|
||||
// Verify the NFTokenIDs are correct in the NFTokenCancelOffer tx
|
||||
// meta
|
||||
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenIDsInCancelOffer({nftId1, nftId2});
|
||||
|
||||
// Bobs creates a buy offer for nftId1
|
||||
// Verify the offer id is correct in the NFTokenCreateOffer tx meta
|
||||
auto const bobBuyOfferIndex = keylet::nftoffer(bob, env.seq(bob)).key;
|
||||
env(token::createOffer(bob, nftId1, drops(1)), token::owner(alice));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(bobBuyOfferIndex);
|
||||
|
||||
// Alice accepts bob's buy offer
|
||||
// Verify the NFTokenID is correct in the NFTokenAcceptOffer tx meta
|
||||
env(token::acceptBuyOffer(alice, bobBuyOfferIndex));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId1);
|
||||
}
|
||||
|
||||
@@ -1397,7 +1404,7 @@ public:
|
||||
// Alice mints a NFT
|
||||
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
|
||||
env(token::mint(alice, 0u), txflags(tfTransferable));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId);
|
||||
|
||||
// Alice creates sell offer and set broker as destination
|
||||
@@ -1405,18 +1412,18 @@ public:
|
||||
env(token::createOffer(alice, nftId, drops(1)),
|
||||
token::destination(broker),
|
||||
txflags(tfSellNFToken));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(offerAliceToBroker);
|
||||
|
||||
// Bob creates buy offer
|
||||
uint256 const offerBobToBroker = keylet::nftoffer(bob, env.seq(bob)).key;
|
||||
env(token::createOffer(bob, nftId, drops(1)), token::owner(alice));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(offerBobToBroker);
|
||||
|
||||
// Check NFTokenID meta for NFTokenAcceptOffer in brokered mode
|
||||
env(token::brokerOffers(broker, offerBobToBroker, offerAliceToBroker));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId);
|
||||
}
|
||||
|
||||
@@ -1426,24 +1433,24 @@ public:
|
||||
// Alice mints a NFT
|
||||
uint256 const nftId{token::getNextID(env, alice, 0u, tfTransferable)};
|
||||
env(token::mint(alice, 0u), txflags(tfTransferable));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenID(nftId);
|
||||
|
||||
// Alice creates 2 sell offers for the same NFT
|
||||
uint256 const aliceOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
|
||||
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(aliceOfferIndex1);
|
||||
|
||||
uint256 const aliceOfferIndex2 = keylet::nftoffer(alice, env.seq(alice)).key;
|
||||
env(token::createOffer(alice, nftId, drops(1)), txflags(tfSellNFToken));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(aliceOfferIndex2);
|
||||
|
||||
// Make sure the metadata only has 1 nft id, since both offers are
|
||||
// for the same nft
|
||||
env(token::cancelOffer(alice, {aliceOfferIndex1, aliceOfferIndex2}));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenIDsInCancelOffer({nftId});
|
||||
}
|
||||
|
||||
@@ -1451,7 +1458,7 @@ public:
|
||||
{
|
||||
uint256 const aliceMintWithOfferIndex1 = keylet::nftoffer(alice, env.seq(alice)).key;
|
||||
env(token::mint(alice), token::amount(XRP(0)));
|
||||
env.close();
|
||||
BEAST_EXPECT(env.syncClose());
|
||||
verifyNFTokenOfferID(aliceMintWithOfferIndex1);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -107,10 +107,8 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
|
||||
// Tell the ledger acquire system that we need the consensus ledger
|
||||
acquiringLedger_ = hash;
|
||||
|
||||
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
|
||||
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
|
||||
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
|
||||
});
|
||||
app_.getInboundLedgers().acquireAsync(
|
||||
jtADVANCE, "GetConsL1", hash, 0, InboundLedger::Reason::CONSENSUS);
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
@@ -985,7 +983,7 @@ void
|
||||
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
|
||||
{
|
||||
if (!positions && app_.getOPs().isFull())
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED);
|
||||
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -117,12 +117,8 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
|
||||
{
|
||||
JLOG(j_.warn()) << "Need validated ledger for preferred ledger analysis " << hash;
|
||||
|
||||
Application* pApp = &app_;
|
||||
|
||||
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
|
||||
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
|
||||
pApp->getInboundLedgers().acquireAsync(hash, 0, InboundLedger::Reason::CONSENSUS);
|
||||
});
|
||||
app_.getInboundLedgers().acquireAsync(
|
||||
jtADVANCE, "GetConsL2", hash, 0, InboundLedger::Reason::CONSENSUS);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
|
||||
@@ -26,7 +26,12 @@ public:
|
||||
// Queue. TODO review whether all callers of acquire() can use this
|
||||
// instead. Inbound ledger acquisition is asynchronous anyway.
|
||||
virtual void
|
||||
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) = 0;
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) = 0;
|
||||
|
||||
virtual std::shared_ptr<InboundLedger>
|
||||
find(LedgerHash const& hash) = 0;
|
||||
|
||||
@@ -353,7 +353,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
|
||||
|
||||
if (!wasProgress)
|
||||
{
|
||||
checkLocal();
|
||||
if (checkLocal())
|
||||
{
|
||||
// Done. Something else (probably consensus) built the ledger
|
||||
// locally while waiting for data (or possibly before requesting)
|
||||
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
|
||||
JLOG(journal_.info()) << "Finished while waiting " << hash_;
|
||||
return;
|
||||
}
|
||||
|
||||
mByHash = true;
|
||||
|
||||
|
||||
@@ -2,9 +2,9 @@
|
||||
#include <xrpld/app/ledger/LedgerMaster.h>
|
||||
#include <xrpld/app/main/Application.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/DecayingSample.h>
|
||||
#include <xrpl/basics/Log.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/container/aged_map.h>
|
||||
#include <xrpl/core/JobQueue.h>
|
||||
#include <xrpl/core/PerfLog.h>
|
||||
@@ -59,12 +59,15 @@ public:
|
||||
(reason != InboundLedger::Reason::CONSENSUS))
|
||||
return {};
|
||||
|
||||
std::stringstream ss;
|
||||
|
||||
bool isNew = true;
|
||||
std::shared_ptr<InboundLedger> inbound;
|
||||
{
|
||||
ScopedLockType sl(mLock);
|
||||
if (stopping_)
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
@@ -83,47 +86,61 @@ public:
|
||||
++mCounter;
|
||||
}
|
||||
}
|
||||
ss << " IsNew: " << (isNew ? "true" : "false");
|
||||
|
||||
if (inbound->isFailed())
|
||||
{
|
||||
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
if (!isNew)
|
||||
inbound->update(seq);
|
||||
|
||||
if (!inbound->isComplete())
|
||||
{
|
||||
JLOG(j_.debug()) << "InProgress: " << ss.str();
|
||||
return {};
|
||||
}
|
||||
|
||||
JLOG(j_.debug()) << "Complete: " << ss.str();
|
||||
return inbound->getLedger();
|
||||
};
|
||||
using namespace std::chrono_literals;
|
||||
std::shared_ptr<Ledger const> ledger =
|
||||
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
|
||||
|
||||
return ledger;
|
||||
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
|
||||
}
|
||||
|
||||
void
|
||||
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
|
||||
acquireAsync(
|
||||
JobType type,
|
||||
std::string const& name,
|
||||
uint256 const& hash,
|
||||
std::uint32_t seq,
|
||||
InboundLedger::Reason reason) override
|
||||
{
|
||||
std::unique_lock lock(acquiresMutex_);
|
||||
try
|
||||
if (auto check = std::make_shared<CanProcess const>(acquiresMutex_, pendingAcquires_, hash);
|
||||
*check)
|
||||
{
|
||||
if (pendingAcquires_.contains(hash))
|
||||
return;
|
||||
pendingAcquires_.insert(hash);
|
||||
scope_unlock unlock(lock);
|
||||
acquire(hash, seq, reason);
|
||||
app_.getJobQueue().addJob(type, name, [check, name, hash, seq, reason, this]() {
|
||||
JLOG(j_.debug()) << "JOB acquireAsync " << name << " started ";
|
||||
try
|
||||
{
|
||||
acquire(hash, seq, reason);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn()) << "Exception thrown for acquiring new "
|
||||
"inbound ledger "
|
||||
<< hash << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
|
||||
"inbound ledger "
|
||||
<< hash;
|
||||
}
|
||||
});
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
|
||||
<< e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
|
||||
}
|
||||
pendingAcquires_.erase(hash);
|
||||
}
|
||||
|
||||
std::shared_ptr<InboundLedger>
|
||||
|
||||
@@ -907,8 +907,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
|
||||
return;
|
||||
}
|
||||
|
||||
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq
|
||||
<< " with >= " << minVal << " validations";
|
||||
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
|
||||
<< to_short_string(ledger->header().hash) << ") with >= " << minVal
|
||||
<< " validations";
|
||||
|
||||
ledger->setValidated();
|
||||
ledger->setFull();
|
||||
|
||||
@@ -13,7 +13,8 @@ TimeoutCounter::TimeoutCounter(
|
||||
QueueJobParameter&& jobParameter,
|
||||
beast::Journal journal)
|
||||
: app_(app)
|
||||
, journal_(journal)
|
||||
, sink_(journal, to_short_string(hash) + " ")
|
||||
, journal_(sink_)
|
||||
, hash_(hash)
|
||||
, timeouts_(0)
|
||||
, complete_(false)
|
||||
@@ -33,6 +34,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
{
|
||||
if (isDone())
|
||||
return;
|
||||
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
|
||||
timer_.expires_after(timerInterval_);
|
||||
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
|
||||
if (ec == boost::asio::error::operation_aborted)
|
||||
@@ -40,6 +42,10 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
|
||||
|
||||
if (auto ptr = wptr.lock())
|
||||
{
|
||||
JLOG(ptr->journal_.debug())
|
||||
<< "timer: ec: " << ec
|
||||
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
|
||||
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
|
||||
ScopedLockType sl(ptr->mtx_);
|
||||
ptr->queueJob(sl);
|
||||
}
|
||||
|
||||
@@ -3,6 +3,7 @@
|
||||
#include <xrpld/app/main/Application.h>
|
||||
|
||||
#include <xrpl/beast/utility/Journal.h>
|
||||
#include <xrpl/beast/utility/WrappedSink.h>
|
||||
#include <xrpl/core/Job.h>
|
||||
|
||||
#include <boost/asio/basic_waitable_timer.hpp>
|
||||
@@ -103,6 +104,7 @@ protected:
|
||||
// Used in this class for access to boost::asio::io_context and
|
||||
// xrpl::Overlay. Used in subtypes for the kitchen sink.
|
||||
Application& app_;
|
||||
beast::WrappedSink sink_;
|
||||
beast::Journal journal_;
|
||||
mutable std::recursive_mutex mtx_;
|
||||
|
||||
|
||||
@@ -1072,6 +1072,12 @@ public:
|
||||
return trapTxID_;
|
||||
}
|
||||
|
||||
size_t
|
||||
getNumberOfThreads() const override
|
||||
{
|
||||
return get_number_of_threads();
|
||||
}
|
||||
|
||||
private:
|
||||
// For a newly-started validator, this is the greatest persisted ledger
|
||||
// and new validations must be greater than this.
|
||||
|
||||
@@ -157,6 +157,10 @@ public:
|
||||
* than the last ledger it persisted. */
|
||||
virtual LedgerIndex
|
||||
getMaxDisallowedLedger() = 0;
|
||||
|
||||
/** Returns the number of io_context (I/O worker) threads used by the application. */
|
||||
virtual size_t
|
||||
getNumberOfThreads() const = 0;
|
||||
};
|
||||
|
||||
std::unique_ptr<Application>
|
||||
|
||||
@@ -23,4 +23,10 @@ public:
|
||||
{
|
||||
return io_context_;
|
||||
}
|
||||
|
||||
size_t
|
||||
get_number_of_threads() const
|
||||
{
|
||||
return threads_.size();
|
||||
}
|
||||
};
|
||||
|
||||
@@ -30,10 +30,10 @@
|
||||
#include <xrpld/rpc/MPTokenIssuanceID.h>
|
||||
#include <xrpld/rpc/ServerHandler.h>
|
||||
|
||||
#include <xrpl/basics/CanProcess.h>
|
||||
#include <xrpl/basics/UptimeClock.h>
|
||||
#include <xrpl/basics/mulDiv.h>
|
||||
#include <xrpl/basics/safe_cast.h>
|
||||
#include <xrpl/basics/scope.h>
|
||||
#include <xrpl/beast/utility/rngfill.h>
|
||||
#include <xrpl/core/HashRouter.h>
|
||||
#include <xrpl/core/NetworkIDService.h>
|
||||
@@ -396,7 +396,7 @@ public:
|
||||
isFull() override;
|
||||
|
||||
void
|
||||
setMode(OperatingMode om) override;
|
||||
setMode(OperatingMode om, char const* reason) override;
|
||||
|
||||
bool
|
||||
isBlocked() override;
|
||||
@@ -841,7 +841,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
|
||||
inline void
|
||||
NetworkOPsImp::setStandAlone()
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "setStandAlone");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -984,7 +984,7 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
{
|
||||
if (mMode != OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::DISCONNECTED);
|
||||
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
|
||||
std::stringstream ss;
|
||||
ss << "Node count (" << numPeers << ") has fallen "
|
||||
<< "below required minimum (" << minPeerCount_ << ").";
|
||||
@@ -1008,7 +1008,7 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
|
||||
if (mMode == OperatingMode::DISCONNECTED)
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
|
||||
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
|
||||
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
|
||||
}
|
||||
@@ -1018,9 +1018,9 @@ NetworkOPsImp::processHeartbeatTimer()
|
||||
auto origMode = mMode.load();
|
||||
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
|
||||
if (mMode == OperatingMode::SYNCING)
|
||||
setMode(OperatingMode::SYNCING);
|
||||
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
|
||||
else if (mMode == OperatingMode::CONNECTED)
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
|
||||
auto newMode = mMode.load();
|
||||
if (origMode != newMode)
|
||||
{
|
||||
@@ -1710,7 +1710,7 @@ void
|
||||
NetworkOPsImp::setAmendmentBlocked()
|
||||
{
|
||||
amendmentBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
|
||||
}
|
||||
|
||||
inline bool
|
||||
@@ -1741,7 +1741,7 @@ void
|
||||
NetworkOPsImp::setUNLBlocked()
|
||||
{
|
||||
unlBlocked_ = true;
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
|
||||
}
|
||||
|
||||
inline void
|
||||
@@ -1837,7 +1837,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
|
||||
|
||||
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
|
||||
}
|
||||
|
||||
if (consensus)
|
||||
@@ -1922,8 +1922,8 @@ NetworkOPsImp::beginConsensus(
|
||||
// this shouldn't happen unless we jump ledgers
|
||||
if (mMode == OperatingMode::FULL)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING);
|
||||
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
|
||||
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
|
||||
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
|
||||
}
|
||||
|
||||
@@ -2052,7 +2052,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
// validations we have for LCL. If the ledger is good enough, go to
|
||||
// TRACKING - TODO
|
||||
if (!needNetworkLedger_)
|
||||
setMode(OperatingMode::TRACKING);
|
||||
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
|
||||
}
|
||||
|
||||
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
|
||||
@@ -2065,7 +2065,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
|
||||
if (registry_.timeKeeper().now() <
|
||||
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
|
||||
{
|
||||
setMode(OperatingMode::FULL);
|
||||
setMode(OperatingMode::FULL, "endConsensus: check full");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2077,7 +2077,7 @@ NetworkOPsImp::consensusViewChange()
|
||||
{
|
||||
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
|
||||
{
|
||||
setMode(OperatingMode::CONNECTED);
|
||||
setMode(OperatingMode::CONNECTED, "consensusViewChange");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2379,7 +2379,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
|
||||
}
|
||||
|
||||
void
|
||||
NetworkOPsImp::setMode(OperatingMode om)
|
||||
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
|
||||
{
|
||||
using namespace std::chrono_literals;
|
||||
if (om == OperatingMode::CONNECTED)
|
||||
@@ -2399,11 +2399,12 @@ NetworkOPsImp::setMode(OperatingMode om)
|
||||
if (mMode == om)
|
||||
return;
|
||||
|
||||
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
|
||||
mMode = om;
|
||||
|
||||
accounting_.mode(om);
|
||||
|
||||
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
|
||||
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
|
||||
pubServer();
|
||||
}
|
||||
|
||||
@@ -2412,32 +2413,24 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
|
||||
{
|
||||
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
|
||||
|
||||
std::unique_lock lock(validationsMutex_);
|
||||
BypassAccept bypassAccept = BypassAccept::no;
|
||||
try
|
||||
{
|
||||
if (pendingValidations_.contains(val->getLedgerHash()))
|
||||
bypassAccept = BypassAccept::yes;
|
||||
else
|
||||
pendingValidations_.insert(val->getLedgerHash());
|
||||
scope_unlock unlock(lock);
|
||||
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
|
||||
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
|
||||
try
|
||||
{
|
||||
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
|
||||
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
|
||||
<< val->getLedgerHash() << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn())
|
||||
<< "Unknown exception thrown for handling new validation " << val->getLedgerHash();
|
||||
}
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
|
||||
<< val->getLedgerHash() << ": " << e.what();
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
|
||||
<< val->getLedgerHash();
|
||||
}
|
||||
if (bypassAccept == BypassAccept::no)
|
||||
{
|
||||
pendingValidations_.erase(val->getLedgerHash());
|
||||
}
|
||||
lock.unlock();
|
||||
|
||||
pubValidation(val);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user