Compare commits

..

2 Commits

Author SHA1 Message Date
Vito
fc620bf29a test: Fix clang-tidy issues in new Vault+AMM tests
- AMM_test: directly include <xrpl/protocol/MPTIssue.h>
  (misc-include-cleaner).
- Vault_test::testMutateCanTransferAfterDeposit: declare local Vault
  const (misc-const-correctness).
2026-04-29 12:03:50 +02:00
Vito
b01ba1fce2 test: Add Vault+AMM integration coverage
- AMM_test::testVaultSharesAMM: pair vault-share MPT with XRP, do a
  single-asset deposit of additional shares.
- AMM_test::testLockedVaultMPTCashOut: lock the underlying MPT of a
  vault that backs an AMM and confirm the freeze cascades through
  isVaultPseudoAccountFrozen, blocking LP-token Payment (tecPATH_DRY)
  and AMM withdrawal (tecFROZEN).
- Vault_test::testMutateCanTransferAfterDeposit: exercise issuer
  mutating lsfMPTCanTransfer / lsfMPTCanTrade after holders have
  deposited. Confirms CanTrade has no effect on vault I/O, the
  issuer-as-depositor path is exempt from canTransfer, and a holder
  can still exit via VaultWithdraw with Destination = issuer while
  CanTransfer is cleared.
2026-04-29 11:45:43 +02:00
17 changed files with 410 additions and 415 deletions

View File

@@ -1,139 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
CanProcess(CanProcess const&) = delete;
CanProcess&
operator=(CanProcess const&) = delete;
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -197,7 +197,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the key is added.
* element 1: true if the peer is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>

View File

@@ -35,8 +35,6 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om, char const* reason) = 0;
setMode(OperatingMode om) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -8,6 +8,7 @@
#include <test/jtx/escrow.h>
#include <test/jtx/fee.h>
#include <test/jtx/flags.h>
#include <test/jtx/mpt.h>
#include <test/jtx/offer.h>
#include <test/jtx/paths.h>
#include <test/jtx/pay.h>
@@ -19,6 +20,7 @@
#include <test/jtx/ter.h>
#include <test/jtx/trust.h>
#include <test/jtx/txflags.h>
#include <test/jtx/vault.h>
#include <xrpl/basics/Number.h>
#include <xrpl/basics/base_uint.h>
@@ -35,6 +37,7 @@
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/Issue.h>
#include <xrpl/protocol/LedgerFormats.h>
#include <xrpl/protocol/MPTIssue.h>
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/Quality.h>
#include <xrpl/protocol/Rules.h>
@@ -7060,10 +7063,200 @@ private:
{all});
}
// Create a single-asset vault, deposit assets so the depositor receives
// shares (an MPT issued by the vault pseudo-account), then pair those
// shares with XRP in an AMM. Finally do a single-asset deposit of more
// shares into the AMM.
void
testVaultSharesAMM()
{
testcase("Vault Shares paired with XRP in AMM");
using namespace jtx;
// Vaults rely on featureSingleAssetVault (which the AMM_test class
// strips by default). MPT-AMM pairs require featureMPTokensV2.
FeatureBitset const features{
jtx::testable_amendments() | featureSingleAssetVault | featureMPTokensV2};
Env env{*this, features};
Account const owner{"vaultOwner"};
env.fund(XRP(1'000'000), owner);
env.close();
// Use XRP as the vault asset for simplicity.
PrettyAsset const asset{xrpIssue(), 1'000'000};
Vault const vault{env};
auto [vaultTx, vaultKeylet] = vault.create({.owner = owner, .asset = asset});
env(vaultTx);
env.close();
if (!BEAST_EXPECT(env.le(vaultKeylet)))
return;
// Deposit 10,000 XRP into the vault. Owner receives shares (MPT)
// issued by the vault's pseudo-account.
env(vault.deposit(
{.depositor = owner, .id = vaultKeylet.key, .amount = asset(10'000).value()}));
env.close();
auto const vaultSle = env.le(vaultKeylet);
if (!BEAST_EXPECT(vaultSle))
return;
MPTID const shareMptID = vaultSle->at(sfShareMPTID);
MPTIssue const shareIssue{shareMptID};
// The share MPT is issued by the vault's pseudo-account. Memoize so
// env.balance() can format share amounts.
env.memoize(Account{"vaultPseudo", vaultSle->at(sfAccount)});
// XRP vaults use scale=6, so a 10,000 XRP deposit yields
// 10,000 * 1e6 = 10^10 share units (raw MPT amount).
STAmount const sharesHeld = env.balance(owner, shareIssue);
BEAST_EXPECT(sharesHeld.mantissa() == 10'000'000'000ull);
BEAST_EXPECT(sharesHeld.asset() == shareIssue);
// Seed the AMM with half the shares + 5,000 XRP.
STAmount const halfShares(shareIssue, std::uint64_t{5'000'000'000});
AMM ammOwner(env, owner, halfShares, XRP(5'000));
BEAST_EXPECT(ammOwner.ammExists());
// Single-asset deposit: add 2,500,000,000 more shares (a quarter of
// the original holding) to the share side of the pool.
STAmount const extraShares(shareIssue, std::uint64_t{2'500'000'000});
ammOwner.deposit(owner, extraShares);
// The share-side pool should now equal halfShares + extraShares,
// while the XRP-side balance is unchanged at 5,000 XRP.
auto const [shareBalance, xrpBalance, lpt] = ammOwner.balances(shareIssue, xrpIssue());
BEAST_EXPECT(shareBalance == halfShares + extraShares);
BEAST_EXPECT(xrpBalance == XRP(5'000));
// Owner now holds the original 10B shares minus what was put into the
// AMM (5B seed + 2.5B single-asset deposit) = 2.5B.
STAmount const expectedOwnerShares(shareIssue, std::uint64_t{2'500'000'000});
BEAST_EXPECT(env.balance(owner, shareIssue) == expectedOwnerShares);
}
// Create a Vault whose underlying asset is a lockable / clawback-able
// MPT. Pair the vault shares with XRP in an AMM. Transfer half of the
// owner's LP tokens to a second account, then issuer-lock the
// underlying MPT, then try to transfer LP tokens / cash out again.
//
// Locking the underlying MPT cascades up via
// `isVaultPseudoAccountFrozen`: the vault-share MPT is treated as
// frozen because its underlying is locked. So:
// - LP-token Payment after lock fails (`tecPATH_DRY`).
// - AMM withdrawal of LP tokens fails (`tecFROZEN`).
// The LP tokens are effectively stuck for as long as the underlying
// MPT remains locked.
void
testLockedVaultMPTCashOut()
{
testcase("Cash out LP Tokens after vault MPT locked");
using namespace jtx;
FeatureBitset const features{
jtx::testable_amendments() | featureSingleAssetVault | featureMPTokensV2};
Env env{*this, features};
Account const issuer{"issuer"};
Account const owner{"vaultOwner"};
Account const trader{"trader"};
env.fund(XRP(1'000'000), issuer, owner, trader);
env.close();
// Underlying MPT supports lock + clawback. MPTDEXFlags adds
// CanTransfer + CanTrade so the vault and AMM can route it.
MPTTester mpt(
{.env = env,
.issuer = issuer,
.holders = {owner},
.pay = 100'000,
.flags = tfMPTCanLock | tfMPTCanClawback | MPTDEXFlags});
PrettyAsset const asset = MPT(mpt);
// Create the vault.
Vault const vault{env};
auto [vaultTx, vaultKeylet] = vault.create({.owner = owner, .asset = asset});
env(vaultTx);
env.close();
if (!BEAST_EXPECT(env.le(vaultKeylet)))
return;
// Deposit 50,000 of the underlying MPT.
env(vault.deposit(
{.depositor = owner, .id = vaultKeylet.key, .amount = asset(50'000).value()}));
env.close();
auto const vaultSle = env.le(vaultKeylet);
if (!BEAST_EXPECT(vaultSle))
return;
MPTID const shareMptID = vaultSle->at(sfShareMPTID);
MPTIssue const shareIssue{shareMptID};
env.memoize(Account{"vaultPseudo", vaultSle->at(sfAccount)});
// MPT vaults use scale=0, so 50,000 deposit -> 50,000 share units.
STAmount const sharesHeld = env.balance(owner, shareIssue);
BEAST_EXPECT(sharesHeld.mantissa() == 50'000);
// Create the AMM: 25,000 vault shares + 1,000 XRP.
STAmount const seedShares(shareIssue, std::uint64_t{25'000});
AMM ammOwner(env, owner, seedShares, XRP(1'000));
BEAST_EXPECT(ammOwner.ammExists());
// The AMM pseudo-account issues the LP tokens; memoize so
// env.balance() can format LP-token amounts.
env.memoize(Account{"ammPseudo", ammOwner.ammAccount()});
// Owner's LP token balance after AMM creation.
auto const lptIssue = ammOwner.lptIssue();
STAmount const lptOwner0 = env.balance(owner, lptIssue);
STAmount const lptZero(lptIssue, std::uint32_t{0});
BEAST_EXPECT(lptOwner0 != lptZero);
// Trader needs a trust line to receive LP tokens.
STAmount const lptTrustLimit(lptIssue, std::uint64_t{1'000'000'000});
env(trust(trader, lptTrustLimit));
env.close();
// Step 1: transfer half the LP tokens from owner -> trader.
STAmount const halfLpt(lptIssue, lptOwner0.mantissa() / 2, lptOwner0.exponent());
env(pay(owner, trader, halfLpt));
env.close();
BEAST_EXPECT(env.balance(trader, lptIssue) == halfLpt);
// Step 2: issuer locks the underlying MPT.
mpt.set({.flags = tfMPTLock});
env.close();
// Step 3: transfer LP tokens again. The lock on the underlying MPT
// cascades through the vault-share issuance via
// isVaultPseudoAccountFrozen, so the AMM-routed Payment fails.
STAmount const quarterLpt(lptIssue, lptOwner0.mantissa() / 4, lptOwner0.exponent());
env(pay(owner, trader, quarterLpt), ter(tecPATH_DRY));
env.close();
// Trader's balance is still just the half from before the lock.
BEAST_EXPECT(env.balance(trader, lptIssue) == halfLpt);
// Step 4: try to cash out the LP tokens. The AMM withdrawal must
// touch the vault-share side, which is now treated as frozen
// because its underlying is locked, so the withdrawal fails.
ammOwner.withdrawAll(trader, std::nullopt, ter(tecFROZEN));
env.close();
// Trader still holds the LP tokens; nothing was redeemed.
BEAST_EXPECT(env.balance(trader, lptIssue) == halfLpt);
BEAST_EXPECT(env.balance(trader, shareIssue) == STAmount(shareIssue, std::uint64_t{0}));
}
void
run() override
{
FeatureBitset const all{testable_amendments()};
testVaultSharesAMM();
testLockedVaultMPTCashOut();
testInvalidInstance();
testInstanceCreate();
testInvalidDeposit(all);

View File

@@ -130,12 +130,7 @@ public:
}
void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
{
}

View File

@@ -6139,6 +6139,141 @@ class Vault_test : public beast::unit_test::suite
runTest(amendments);
}
// Issuer mutates the underlying MPT's lsfMPTCanTransfer / lsfMPTCanTrade
// flags after holders have already deposited into a vault. Demonstrates:
//
// - VaultDeposit and VaultWithdraw both go through `canTransfer`,
// so clearing lsfMPTCanTransfer freezes every holder's funds in
// the vault until the issuer re-enables the flag (`tecNO_AUTH`).
//
// - The issuer is exempt: `canTransfer` short-circuits when either
// side of the transfer is the issuer, so the issuer can still
// deposit and withdraw.
//
// - lsfMPTCanTrade is *not* checked by VaultDeposit/VaultWithdraw at
// all — clearing it has no effect on vault I/O. (It only gates
// DEX/AMM operations via `canTrade`.)
void
testMutateCanTransferAfterDeposit()
{
using namespace test::jtx;
testcase("MPT vault: clearing CanTransfer/CanTrade after deposit");
Env env{*this, testable_amendments() | featureSingleAssetVault};
Account const issuer{"issuer"};
Account const alice{"alice"};
Account const bob{"bob"};
env.fund(XRP(1'000), issuer, alice, bob);
env.close();
// MPT is transferable, tradable, lockable, and clawback-capable. Both
// CanTransfer and CanTrade are mutable so the issuer can flip them
// later via MPTokenIssuanceSet.
MPTTester mptt{env, issuer, mptInitNoFund};
mptt.create(
{.flags = tfMPTCanTransfer | tfMPTCanTrade | tfMPTCanLock | tfMPTCanClawback,
.mutableFlags = tmfMPTCanMutateCanTransfer | tmfMPTCanMutateCanTrade});
PrettyAsset const asset = mptt.issuanceID();
mptt.authorize({.account = alice});
mptt.authorize({.account = bob});
env(pay(issuer, alice, asset(100'000)));
env(pay(issuer, bob, asset(100'000)));
env.close();
Vault const vault{env};
auto [createTx, vaultKeylet] = vault.create({.owner = alice, .asset = asset});
env(createTx);
env.close();
BEAST_EXPECT(env.le(vaultKeylet));
// Both holders deposit. Issuer also deposits (issuer can be a
// depositor too) so we can later confirm the issuer-exempt path.
env(vault.deposit({.depositor = alice, .id = vaultKeylet.key, .amount = asset(50'000)}));
env(vault.deposit({.depositor = bob, .id = vaultKeylet.key, .amount = asset(30'000)}));
env(vault.deposit({.depositor = issuer, .id = vaultKeylet.key, .amount = asset(20'000)}));
env.close();
// -- 1. Issuer clears lsfMPTCanTransfer ---------------------------
mptt.set({.mutableFlags = tmfMPTClearCanTransfer});
env.close();
{
auto const sle = env.le(keylet::mptIssuance(asset.raw().get<MPTIssue>().getMptID()));
BEAST_EXPECT(sle && !sle->isFlag(lsfMPTCanTransfer));
BEAST_EXPECT(sle && sle->isFlag(lsfMPTCanTrade));
}
// 2. Holder deposits and withdrawals are blocked: vault pseudo-
// account is neither sender nor receiver = issuer, so
// canTransfer returns tecNO_AUTH.
env(vault.deposit({.depositor = alice, .id = vaultKeylet.key, .amount = asset(1'000)}),
ter(tecNO_AUTH));
env(vault.withdraw({.depositor = alice, .id = vaultKeylet.key, .amount = asset(1'000)}),
ter(tecNO_AUTH));
env(vault.withdraw({.depositor = bob, .id = vaultKeylet.key, .amount = asset(1'000)}),
ter(tecNO_AUTH));
env.close();
// 3. Issuer-as-depositor is exempt — `canTransfer` short-circuits
// on the issuer side. Both deposit and withdraw succeed.
env(vault.deposit({.depositor = issuer, .id = vaultKeylet.key, .amount = asset(5'000)}));
env(vault.withdraw({.depositor = issuer, .id = vaultKeylet.key, .amount = asset(5'000)}));
env.close();
// 3b. A holder can also escape by withdrawing *to the issuer* via
// sfDestination. `canTransfer`'s issuer short-circuit fires on
// `to == issuer`, so the withdrawal succeeds even though
// CanTransfer is cleared. The holder's shares are burned and
// the underlying MPT lands at the issuer (presumably part of
// an off-ledger redemption arrangement).
auto const aliceMptBefore = env.balance(alice, asset);
auto withdrawToIssuer =
vault.withdraw({.depositor = alice, .id = vaultKeylet.key, .amount = asset(2'000)});
withdrawToIssuer[sfDestination] = issuer.human();
env(withdrawToIssuer);
env.close();
// Alice's MPT balance is unchanged — the asset went to the issuer,
// not back to her — but her share holding was burned.
BEAST_EXPECT(env.balance(alice, asset) == aliceMptBefore);
// -- 4. Also clear lsfMPTCanTrade. Vault paths don't consult
// CanTrade, so this changes nothing for vault I/O. ----------
mptt.set({.mutableFlags = tmfMPTClearCanTrade});
env.close();
{
auto const sle = env.le(keylet::mptIssuance(asset.raw().get<MPTIssue>().getMptID()));
BEAST_EXPECT(sle && !sle->isFlag(lsfMPTCanTrade));
}
// Holder ops still fail the same way (CanTransfer-driven), and the
// issuer is still exempt.
env(vault.withdraw({.depositor = alice, .id = vaultKeylet.key, .amount = asset(1'000)}),
ter(tecNO_AUTH));
env(vault.deposit({.depositor = issuer, .id = vaultKeylet.key, .amount = asset(1'000)}));
env.close();
// -- 5. Re-enable CanTransfer; leave CanTrade cleared. ------------
mptt.set({.mutableFlags = tmfMPTSetCanTransfer});
env.close();
// Holders can now withdraw all their stake — confirms CanTrade is
// not consulted by the vault transactors. Alice already redeemed
// 2,000 to the issuer, so only 48,000 remains for her.
env(vault.withdraw({.depositor = alice, .id = vaultKeylet.key, .amount = asset(48'000)}));
env(vault.withdraw({.depositor = bob, .id = vaultKeylet.key, .amount = asset(30'000)}));
env.close();
{
auto const sle = env.le(keylet::mptIssuance(asset.raw().get<MPTIssue>().getMptID()));
BEAST_EXPECT(sle && sle->isFlag(lsfMPTCanTransfer));
BEAST_EXPECT(sle && !sle->isFlag(lsfMPTCanTrade));
}
}
public:
void
run() override
@@ -6162,6 +6297,7 @@ public:
testAssetsMaximum();
testBug6_LimitBypassWithShares();
testRemoveEmptyHoldingLockedAmount();
testMutateCanTransferAfterDeposit();
}
};

View File

@@ -1,165 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012-2016 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <xrpl/basics/CanProcess.h>
#include <xrpl/beast/unit_test.h>
#include <memory>
namespace ripple {
namespace test {
struct CanProcess_test : beast::unit_test::suite
{
template <class Mutex, class Collection, class Item>
void
test(
std::string const& name,
Mutex& mtx,
Collection& collection,
std::vector<Item> const& items)
{
testcase(name);
if (!BEAST_EXPECT(!items.empty()))
return;
if (!BEAST_EXPECT(collection.empty()))
return;
// CanProcess objects can't be copied or moved. To make that easier,
// store shared_ptrs
std::vector<std::shared_ptr<CanProcess>> trackers;
// Fill up the vector with two CanProcess for each Item. The first
// inserts the item into the collection and is "good". The second does
// not and is "bad".
for (int i = 0; i < items.size(); ++i)
{
{
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
{
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * (i + 1));
BEAST_EXPECT(collection.size() == i + 1);
}
BEAST_EXPECT(collection.size() == items.size());
// Now remove the items from the vector<CanProcess> two at a time, and
// try to get another CanProcess for that item.
for (int i = 0; i < items.size(); ++i)
{
// Remove the "bad" one in the second position
// This will have no effect on the collection
{
auto const iter = trackers.begin() + 1;
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size());
{
// Append a new "bad" one
auto const& bad =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(!*bad);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
// Remove the "good" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * items.size()) - 1);
BEAST_EXPECT(collection.size() == items.size() - 1);
{
// Append a new "good" one
auto const& good =
trackers.emplace_back(std::make_shared<CanProcess>(mtx, collection, items[i]));
BEAST_EXPECT(*good);
}
BEAST_EXPECT(trackers.size() == 2 * items.size());
BEAST_EXPECT(collection.size() == items.size());
}
// Now remove them all two at a time
for (int i = items.size() - 1; i >= 0; --i)
{
// Remove the "bad" one from the front
{
auto const iter = trackers.begin();
BEAST_EXPECT(!**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == (2 * i) + 1);
BEAST_EXPECT(collection.size() == i + 1);
// Remove the "good" one now in front
{
auto const iter = trackers.begin();
BEAST_EXPECT(**iter);
trackers.erase(iter);
}
BEAST_EXPECT(trackers.size() == 2 * i);
BEAST_EXPECT(collection.size() == i);
}
BEAST_EXPECT(trackers.empty());
BEAST_EXPECT(collection.empty());
}
void
run() override
{
{
std::mutex m;
std::set<int> collection;
std::vector<int> const items{1, 2, 3, 4, 5};
test("set of int", m, collection, items);
}
{
std::mutex m;
std::set<std::string> collection;
std::vector<std::string> const items{"one", "two", "three", "four", "five"};
test("set of string", m, collection, items);
}
{
std::mutex m;
std::unordered_set<char> collection;
std::vector<char> const items{'1', '2', '3', '4', '5'};
test("unorderd_set of char", m, collection, items);
}
{
std::mutex m;
std::unordered_set<std::uint64_t> collection;
std::vector<std::uint64_t> const items{100u, 1000u, 150u, 4u, 0u};
test("unordered_set of uint64_t", m, collection, items);
}
}
};
BEAST_DEFINE_TESTSUITE(CanProcess, ripple_basics, ripple);
} // namespace test
} // namespace ripple

View File

@@ -159,8 +159,10 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
// Tell the ledger acquire system that we need the consensus ledger
acquiringLedger_ = hash;
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL1", hash, 0, InboundLedger::Reason::CONSENSUS);
app_.getJobQueue().addJob(jtADVANCE, "GetConsL1", [id = hash, &app = app_, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(id, 0, InboundLedger::Reason::CONSENSUS);
});
}
return std::nullopt;
}
@@ -1050,7 +1052,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if ((positions == 0u) && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
app_.getOPs().setMode(OperatingMode::CONNECTED);
}
void

View File

@@ -128,8 +128,12 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
{
JLOG(j_.warn()) << "Need validated ledger for preferred ledger analysis " << hash;
app_.getInboundLedgers().acquireAsync(
jtADVANCE, "GetConsL2", hash, 0, InboundLedger::Reason::CONSENSUS);
Application* pApp = &app_;
app_.getJobQueue().addJob(jtADVANCE, "GetConsL2", [pApp, hash, this]() {
JLOG(j_.debug()) << "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(hash, 0, InboundLedger::Reason::CONSENSUS);
});
return std::nullopt;
}

View File

@@ -26,12 +26,7 @@ public:
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

View File

@@ -385,14 +385,7 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
checkLocal();
mByHash = true;

View File

@@ -6,7 +6,6 @@
#include <xrpld/overlay/PeerSet.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/Slice.h>
@@ -84,15 +83,12 @@ public:
(reason != InboundLedger::Reason::CONSENSUS))
return {};
std::stringstream ss;
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -111,61 +107,47 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
}
void
acquireAsync(
JobType type,
std::string const& name,
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
{
if (auto check = std::make_shared<CanProcess const>(acquiresMutex_, pendingAcquires_, hash);
*check)
std::unique_lock lock(acquiresMutex_);
try
{
app_.getJobQueue().addJob(type, name, [check, name, hash, seq, reason, this]() {
JLOG(j_.debug()) << "JOB acquireAsync " << name << " started ";
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new "
"inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
});
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock const unlock(lock);
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -965,9 +965,8 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< to_short_string(ledger->header().hash) << ") with >= " << minVal
<< " validations";
JLOG(m_journal.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -25,8 +25,7 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, journal_(journal)
, hash_(hash)
, timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter))
@@ -42,7 +41,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -50,10 +48,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,7 +3,6 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -104,7 +103,6 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -35,7 +35,6 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/ToString.h>
#include <xrpl/basics/UnorderedContainers.h>
@@ -486,7 +485,7 @@ public:
isFull() override;
void
setMode(OperatingMode om, char const* reason) override;
setMode(OperatingMode om) override;
bool
isBlocked() override;
@@ -924,7 +923,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL, "setStandAlone");
setMode(OperatingMode::FULL);
}
inline void
@@ -1067,7 +1066,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
setMode(OperatingMode::DISCONNECTED);
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1091,7 +1090,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
setMode(OperatingMode::CONNECTED);
JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1102,11 +1101,11 @@ NetworkOPsImp::processHeartbeatTimer()
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
{
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
setMode(OperatingMode::SYNCING);
}
else if (mMode == OperatingMode::CONNECTED)
{
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
setMode(OperatingMode::CONNECTED);
}
auto newMode = mMode.load();
if (origMode != newMode)
@@ -1810,7 +1809,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
setMode(OperatingMode::CONNECTED);
}
inline bool
@@ -1841,7 +1840,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
setMode(OperatingMode::CONNECTED);
}
inline void
@@ -1941,7 +1940,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
setMode(OperatingMode::CONNECTED);
}
if (consensus)
@@ -2029,8 +2028,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2158,7 +2157,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
setMode(OperatingMode::TRACKING);
}
if (((mMode == OperatingMode::CONNECTED) || (mMode == OperatingMode::TRACKING)) &&
@@ -2171,7 +2170,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.get().getTimeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL, "endConsensus: check full");
setMode(OperatingMode::FULL);
}
}
@@ -2183,7 +2182,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED, "consensusViewChange");
setMode(OperatingMode::CONNECTED);
}
}
@@ -2487,7 +2486,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
NetworkOPsImp::setMode(OperatingMode om)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2507,12 +2506,11 @@ NetworkOPsImp::setMode(OperatingMode om, char const* reason)
if (mMode == om)
return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
pubServer();
}
@@ -2521,24 +2519,36 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
if (pendingValidations_.contains(val->getLedgerHash()))
{
BypassAccept bypassAccept = check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(registry_.app(), val, source, bypassAccept, m_journal);
bypassAccept = BypassAccept::yes;
}
catch (std::exception const& e)
else
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation " << val->getLedgerHash();
pendingValidations_.insert(val->getLedgerHash());
}
scope_unlock const unlock(lock);
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);