Compare commits

..

37 Commits

Author SHA1 Message Date
Ed Hennis
1cf85a5a8d Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop: (99 commits)
  refactor: Clean up tec object deletion logic (6588)
  fix: Move AMMInvariant weakInvariantCheck logic into the transaction (7032)
  fix: Improve ValidAMM invariant (7295)
  feat: Remove clear mutable flags for DynamicMPT XLS-94 (7439)
  ci: Build and push docker images in forks too (7588)
  ci: [DEPENDABOT] bump actions/checkout from 6.0.3 to 7.0.0 (7585)
  fix: Ensure xrpld service directories exist at startup (7565)
  fix: Use template for granular delegation permissions (6613)
  docs: Fix some comments to improve readability (7405)
  ci: Disable assertions on Release builds (7443)
  build: Add graphviz to Nix images (7566)
  refactor: Rerevert "Explicitly trim the heap after cache sweeps (6022)"
  release: Bump version to 3.3.0-b0
  ci: Make clang-tidy workflow adjustments to stay in sync with Clio (7563)
  build: Add git-lfs to Nix images (7561)
  build: Add zip to Nix images (7551)
  docs: Rewrite build environment docs (7533)
  release: Bump version to 3.2.0
  ci: [DEPENDABOT] bump codecov/codecov-action from 6.0.1 to 7.0.0 (7426)
  release: Bump version to 3.2.0-rc6
  ...
2026-06-22 17:30:27 -04:00
Ed Hennis
eb109eccf7 Rename getLedgerInterval to kGetLedgerInterval 2026-05-26 16:26:16 -04:00
Ed Hennis
5bbc553acd Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop: (30 commits)
  chore: Pin Python packages for codegen using uv (7329)
  style: Use shfmt instead of bashate (7326)
  fix: Fix edge-case where vault-depositor may get stuck (7139)
  fix: Fix `VaultInvariant` and `VaultDeposit` precision bugs at IOU scale boundaries (7272)
  ci: Add clang to nix images (7308)
  fix: Include management-fee delta in doOverpayment assertion (7039)
  fix: Fix clang-tidy pre-commit hook to locate compile_commands.json from repo root (7325)
  fix: Use consistent scale for `debtTotal` (7093)
  fix: Skip deleted book directories and non-root modifications in `ValidBookDirectory` invariant (7312)
  fix: Address review feedback on FD/handle guarding (5823 follow-up) (7310)
  fix: Fix non-canonical MPT amount (7117)
  release: Bump version to 3.2.0-b7 (7316)
  fix: Check if the MPT first loss cover can be sent to the broker before deleting the broker (7125)
  fix: Fix RPM prerelease ordering and start xrpld on DEB install (7313)
  ci: Re-enable full nproc for Linux (7315)
  fix: Add assorted MPT/DEX fixes (7040)
  refactor: Remove dead `fetchBatch` code (7309)
  release: Bump version to 3.2.0-b6 (7311)
  chore: Revert graceful peer disconnection and follow-up fix (7296)
  fix: Fix IOU precision issues in LoanBrokerCover transactions (7274)
  ...
2026-05-26 16:22:23 -04:00
Ed Hennis
2bc15d9838 Merge branch 'develop' into ximinez/fix-getledger 2026-05-19 16:53:41 -04:00
Ed Hennis
77fd2c3cf7 Merge branch 'develop' into ximinez/fix-getledger 2026-05-19 10:15:20 -04:00
Ed Hennis
518bd4ec46 Merge branch 'develop' into ximinez/fix-getledger 2026-05-19 05:15:56 -04:00
Ed Hennis
b551562db9 Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop:
  refactor: Clean up comments post-clang-tidy changes (7283)
  release: Set version to 3.3.0-b0 (7280)
  refactor: Rename static constants (7120)
  refactor: Use `isFlag` where possible instead of bitwise math (7278)
  ci: Update XRPLF/actions (7281)
2026-05-15 21:41:59 -04:00
Ed Hennis
98604be7a2 Merge branch 'develop' into ximinez/fix-getledger 2026-05-14 20:39:15 -04:00
Ed Hennis
edbb9daa36 Merge branch 'develop' into ximinez/fix-getledger 2026-05-13 22:31:59 -04:00
Ed Hennis
2bcbef1b63 Merge branch 'develop' into ximinez/fix-getledger 2026-05-13 12:03:44 -04:00
Ed Hennis
59189e6234 Merge branch 'develop' into ximinez/fix-getledger 2026-05-12 19:18:53 -04:00
Ed Hennis
9f0ca26875 Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop:
  refactor: Fill txJson based on apiVersion (7109)
  fix: Fix touchy "funds are conserved" assertion in LoanPay (6231) (6967)
  refactor: Remove erroneous base_uint ctor from container (7123)
  ci: Make `Show test failure summary` work with no build dir (7124)
  refactor: Replace `featureInvariantsV1_1` with `fixCleanup3_2_0` (7116)
2026-05-12 16:03:45 -04:00
Ed Hennis
2aa493e0e4 Merge branch 'develop' into ximinez/fix-getledger 2026-05-11 13:34:17 -04:00
Ed Hennis
6074510d47 Merge branch 'develop' into ximinez/fix-getledger 2026-05-07 18:10:09 -04:00
Ed Hennis
3d39d9e10e Merge branch 'develop' into ximinez/fix-getledger 2026-05-07 14:18:34 -04:00
Ed Hennis
2ec1276b5f Merge branch 'develop' into ximinez/fix-getledger 2026-05-07 13:28:38 -04:00
Ed Hennis
61049d9ef8 Merge branch 'develop' into ximinez/fix-getledger 2026-05-06 22:34:29 -04:00
Ed Hennis
d52851eecb Merge branch 'develop' into ximinez/fix-getledger 2026-05-06 14:18:07 -04:00
Ed Hennis
e2a7a51e78 Merge branch 'develop' into ximinez/fix-getledger 2026-05-05 16:46:49 -04:00
Ed Hennis
3e38ee2997 Merge branch 'develop' into ximinez/fix-getledger 2026-05-05 10:50:39 -04:00
Ed Hennis
25d246fc92 Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop:
  ci: Rewrite clang-tidy workflow(s) in a reusable manner (7062)
  chore: Ignore identifier-naming update in git blame (7066)
  refactor: Enable clang-tidy `readability-identifier-naming` check (6571)
  refactor: Revert certain `Throw`s by `LogicError`s (7036)
  ci: Rename print-env -> print-build-env (7061)
  fix: Gate -mcmodel flags to x86_64 in sanitizer builds (7049)
  fix: Prevents overwriting a bool value in an invariant (6609)
  fix: Address code review comments regarding `boost::coroutine2` (6977)
  refactor: Apply various minor improvements and corrections (7045)
  fix: Store `Delegate` object in delegating and authorized account directories for proper deletion (6681)
  ci: Use print-env from XRPLF/actions (7052)
  fix: Make assorted RPC fixes (6529)
  chore: Enable clang-tidy v21 new checks (7031)
  feat: Create new transaction testing framework `TxTest` (6537)
  feat: Add cleanup amendment for 3.2.0 (7037)
  fix: Fix ubsan flagged issues (6151)
2026-05-05 00:08:59 -04:00
Ed Hennis
2881c08d2e Merge branch 'develop' into ximinez/fix-getledger 2026-04-25 14:45:50 -04:00
Ed Hennis
ddbe7e87fb Merge branch 'develop' into ximinez/fix-getledger 2026-04-23 15:56:06 -04:00
Ed Hennis
f1ea5233a8 Merge branch 'develop' into ximinez/fix-getledger 2026-04-22 23:40:16 -04:00
Ed Hennis
85342554b7 Merge branch 'develop' into ximinez/fix-getledger 2026-04-22 14:49:08 -04:00
Ed Hennis
cb9b5f0399 Merge branch 'develop' into ximinez/fix-getledger 2026-04-22 13:10:39 -04:00
Ed Hennis
2f79477190 Merge branch 'develop' into ximinez/fix-getledger 2026-04-21 18:48:35 -04:00
Ed Hennis
55e5374f56 Merge branch 'develop' into ximinez/fix-getledger 2026-04-21 15:20:21 -04:00
Ed Hennis
846f29e8ad Merge branch 'develop' into ximinez/fix-getledger 2026-04-20 17:49:43 -04:00
Ed Hennis
d236569282 Merge branch 'develop' into ximinez/fix-getledger 2026-04-20 15:45:00 -04:00
Ed Hennis
58170b5ece Merge branch 'develop' into ximinez/fix-getledger 2026-04-20 11:38:57 -04:00
Ed Hennis
8bb87e9ca9 Merge remote-tracking branch 'XRPLF/develop' into ximinez/fix-getledger
* XRPLF/develop:
  chore: Enable clang-tidy include cleaner (6947)
  fix: Change AMMClawback return code to tecNO_PERMISSION (6946)
  ci: [DEPENDABOT] bump actions/upload-pages-artifact from 4.0.0 to 5.0.0 (6927)
  ci: [DEPENDABOT] bump actions/upload-artifact from 7.0.0 to 7.0.1 (6928)
  chore: Enable clang-tidy readability checks (6930)
2026-04-17 18:08:54 -04:00
Ed Hennis
9545e1f7ce Merge branch 'develop' into ximinez/fix-getledger 2026-04-16 13:44:33 -04:00
Ed Hennis
22b3cbbada Merge branch 'develop' into ximinez/fix-getledger 2026-04-15 19:06:26 -04:00
Ed Hennis
cbc2288bdc Merge branch 'develop' into ximinez/fix-getledger 2026-04-15 14:28:51 -04:00
Ed Hennis
5fa9fc55bb Merge branch 'develop' into ximinez/fix-getledger 2026-04-13 20:28:41 -04:00
Ed Hennis
4ab2950410 Reduce duplicate peer traffic for ledger data (#5126)
Improve job queue collision checks and logging
- Improve logging related to ledger acquisition and operating mode
  changes
- Class "CanProcess" to keep track of processing of distinct items

- Drop duplicate outgoing TMGetLedger messages per peer
  - Allow a retry after 30s in case of peer or network congestion.
  - Addresses RIPD-1870
  - (Changes levelization. That is not desirable, and will need to be fixed.)
- Drop duplicate incoming TMGetLedger messages per peer
  - Allow a retry after 15s in case of peer or network congestion.
  - The requestCookie is ignored when computing the hash, thus increasing
    the chances of detecting duplicate messages.
  - With duplicate messages, keep track of the different requestCookies
    (or lack of cookie). When work is finally done for a given request,
    send the response to all the peers that are waiting on the request,
    sending one message per peer, including all the cookies and
    a "directResponse" flag indicating the data is intended for the
    sender, too.
  - Addresses RIPD-1871
- Drop duplicate incoming TMLedgerData messages
  - Addresses RIPD-1869
2026-04-13 18:40:46 -04:00
28 changed files with 1152 additions and 338 deletions

View File

@@ -227,8 +227,7 @@ jobs:
--build . \
--config "${BUILD_TYPE}" \
--parallel "${BUILD_NPROC}" \
--target "${CMAKE_TARGET}" \
2>&1 | tee build.log
--target "${CMAKE_TARGET}"
# This step is needed to allow running in non-Nix environments
- name: Patch binary to use default loader and remove rpath (Linux)
@@ -341,7 +340,7 @@ jobs:
LD_PRELOAD="$PRELOAD" ./xrpld --unittest --unittest-jobs "${BUILD_NPROC}" 2>&1 | tee unittest.log
- name: Show test failure summary
if: ${{ failure() }}
if: ${{ failure() && !inputs.build_only }}
env:
WORKING_DIR: ${{ runner.os == 'Windows' && format('{0}\{1}', env.BUILD_DIR, inputs.build_type) || env.BUILD_DIR }}
run: |
@@ -352,17 +351,13 @@ jobs:
cd "${WORKING_DIR}"
if [ -f unittest.log ]; then
if ! grep -E "failed" unittest.log | grep -vE "^I[0-9]|^[0-9]+> (ERR:|FTL:)"; then
echo "unittest.log present but no failure lines found."
fi
else
if [ ! -f unittest.log ]; then
echo "unittest.log not found; embedded tests may not have run."
if [ -f build.log ]; then
if ! grep -E "error:" build.log; then
echo "build.log present but no compile errors found."
fi
fi
exit 0
fi
if ! grep -E "failed" unittest.log; then
echo "Log present but no failure lines found in unittest.log."
fi
- name: Debug failure (Linux)
if: ${{ failure() && runner.os == 'Linux' && !inputs.build_only }}

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -44,7 +44,7 @@ template <class T>
inline void
maybeReverseBytes(T& t, std::true_type)
{
reverse_bytes(t);
reverseBytes(t);
}
template <class T, class Hasher>

View File

@@ -137,6 +137,13 @@ private:
return std::move(peers_);
}
/** Return set of peers waiting for reply. Leaves list unchanged. */
std::set<PeerShortID> const&
peekPeerSet()
{
return peers_;
}
/** Return seated relay time point if the message has been relayed */
[[nodiscard]] std::optional<Stopwatch::time_point>
relayed() const
@@ -168,6 +175,20 @@ private:
return true;
}
bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) && ((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}
private:
HashRouterFlags flags_ = HashRouterFlags::UNDEFINED;
std::set<PeerShortID> peers_;
@@ -175,6 +196,7 @@ private:
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};
public:
@@ -197,7 +219,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
@@ -214,6 +236,15 @@ public:
HashRouterFlags& flags,
std::chrono::seconds txInterval);
/** Determines whether the hashed item should be processed for the given
peer. Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(uint256 const& key, PeerShortID peer, std::chrono::seconds interval);
/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
@@ -239,6 +270,11 @@ public:
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);
/** Returns a copy of the set of peers in the Entry for the key
*/
std::set<PeerShortID>
getPeers(uint256 const& key);
private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool>

View File

@@ -288,8 +288,18 @@ message TMLedgerData {
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}
message TMPing {

View File

@@ -35,6 +35,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, char const* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -82,6 +82,19 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), txInterval);
}
bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);
auto& entry = emplace(key).first;
return entry.shouldProcessForPeer(peer, suppressionMap_.clock().now(), interval);
}
HashRouterFlags
HashRouter::getFlags(uint256 const& key)
{
@@ -119,4 +132,13 @@ HashRouter::shouldRelay(uint256 const& key) -> std::optional<std::set<PeerShortI
return s.releasePeerSet();
}
auto
HashRouter::getPeers(uint256 const& key) -> std::set<PeerShortID>
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
return s.peekPeerSet();
}
} // namespace xrpl

View File

@@ -395,6 +395,33 @@ class HashRouter_test : public beast::unit_test::Suite
BEAST_EXPECT(!any(HF::UNDEFINED));
}
void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(getSetup(5s, 5s), stopwatch);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}
public:
void
run() override
@@ -407,6 +434,7 @@ public:
testProcess();
testSetup();
testFlagsOps();
testProcessPeer();
}
};

View File

@@ -3,7 +3,6 @@
#include <test/jtx/Env.h>
#include <test/jtx/TestHelpers.h>
#include <test/jtx/amount.h>
#include <test/jtx/envconfig.h>
#include <test/jtx/fee.h>
#include <test/jtx/mpt.h>
#include <test/jtx/pay.h>
@@ -101,12 +100,6 @@ class Invariants_test : public beast::unit_test::Suite
return xrpl::test::jtx::testableAmendments() | fixCleanup3_1_3 | fixCleanup3_2_0;
}
test::jtx::Env
makeEnv(FeatureBitset features)
{
return {*this, test::jtx::envconfig(), features, nullptr, beast::Severity::Disabled};
}
/** Run a specific test case to put the ledger into a state that will be
* detected by an invariant. Simulates the actions of a transaction that
* would violate an invariant.
@@ -135,7 +128,7 @@ class Invariants_test : public beast::unit_test::Suite
TxAccount setTxAccount = TxAccount::None)
{
doInvariantCheck(
makeEnv(defaultAmendments()),
test::jtx::Env(*this, defaultAmendments()),
expectLogs,
precheck,
fee,
@@ -1412,7 +1405,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain" + std::string(fixEnabled ? " fix" : "");
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain with no rules."}},
[](Account const& a1, Account const& a2, ApplyContext& ac) {
return createPermissionedDomain(ac, a1, a2, 0).get();
@@ -1425,7 +1418,7 @@ class Invariants_test : public beast::unit_test::Suite
static constexpr auto kTooBig = kMaxPermissionedDomainCredentialsArraySize + 1;
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain bad credentials size " + std::to_string(kTooBig)}},
[](Account const& a1, Account const& a2, ApplyContext& ac) {
return !!createPermissionedDomain(ac, a1, a2, kTooBig);
@@ -1436,7 +1429,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain 3";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain credentials aren't sorted"}},
[](Account const& a1, Account const& a2, ApplyContext& ac) {
auto slePd = createPermissionedDomain(ac, a1, a2, 0);
@@ -1460,7 +1453,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain 4";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain credentials aren't unique"}},
[](Account const& a1, Account const& a2, ApplyContext& ac) {
auto slePd = createPermissionedDomain(ac, a1, a2, 0);
@@ -1483,7 +1476,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain Set 1";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain with no rules."}},
[&](Account const& a1, Account const& a2, ApplyContext& ac) {
// create PD
@@ -1504,7 +1497,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain Set 2";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain bad credentials size " + std::to_string(kTooBig)}},
[&](Account const& a1, Account const& a2, ApplyContext& ac) {
// create PD
@@ -1535,7 +1528,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain Set 3";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain credentials aren't sorted"}},
[&](Account const& a1, Account const& a2, ApplyContext& ac) {
// create PD
@@ -1565,7 +1558,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDomain Set 4";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"permissioned domain credentials aren't unique"}},
[&](Account const& a1, Account const& a2, ApplyContext& ac) {
// create PD
@@ -1606,7 +1599,7 @@ class Invariants_test : public beast::unit_test::Suite
{
testcase << "PermissionedDomain set 2 domains ";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? badMoreThan1 : emptyV,
[](Account const& a1, Account const& a2, ApplyContext& ac) {
createPermissionedDomain(ac, a1, a2);
@@ -1652,7 +1645,7 @@ class Invariants_test : public beast::unit_test::Suite
{
testcase << "PermissionedDomain set 0 domains ";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? badNoDomains : emptyV,
[](Account const&, Account const&, ApplyContext&) { return true; },
XRPAmount{},
@@ -1675,7 +1668,7 @@ class Invariants_test : public beast::unit_test::Suite
env1.close();
doInvariantCheck(
makeEnv(features),
Env(*this, features),
a1,
a2,
fixEnabled ? badNoDomains : emptyV,
@@ -1716,7 +1709,7 @@ class Invariants_test : public beast::unit_test::Suite
{
testcase << "PermissionedDomain del, create domain ";
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? badNotDeleted : emptyV,
[](Account const& a1, Account const& a2, ApplyContext& ac) {
createPermissionedDomain(ac, a1, a2);
@@ -1896,7 +1889,7 @@ class Invariants_test : public beast::unit_test::Suite
testcase << "PermissionedDEX" + std::string(fixEnabled ? " fix" : "");
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"domain doesn't exist"}},
[](Account const& a1, Account const&, ApplyContext& ac) {
Keylet const offerKey = keylet::offer(a1.id(), 10);
@@ -1923,7 +1916,7 @@ class Invariants_test : public beast::unit_test::Suite
// missing domain ID in offer object
doInvariantCheck(
makeEnv(features),
Env(*this, features),
{{"hybrid offer is malformed"}},
[&](Account const& a1, Account const& a2, ApplyContext& ac) {
Keylet const offerKey = keylet::offer(a2.id(), 10);
@@ -4237,7 +4230,7 @@ class Invariants_test : public beast::unit_test::Suite
};
doInvariantCheck(
makeEnv(defaultAmendments() - fixCleanup3_2_0),
Env{*this, defaultAmendments() - fixCleanup3_2_0},
{},
[](Account const&, Account const&, ApplyContext&) { return true; },
XRPAmount{},
@@ -4756,7 +4749,7 @@ class Invariants_test : public beast::unit_test::Suite
// sfHighLimit issue, not the keylet currency).
testcase << "overwrite: NoXRPTrustLines" + std::string(fixEnabled ? " fix" : "");
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? std::vector<std::string>{{"an XRP trust line was created"}}
: std::vector<std::string>{},
[&insertOrderedTrustLinePair](Account const& a1, Account const& a2, ApplyContext& ac) {
@@ -4784,7 +4777,7 @@ class Invariants_test : public beast::unit_test::Suite
// Regression: bad deep-freeze trust line followed by a valid one.
testcase << "overwrite: NoDeepFreeze" + std::string(fixEnabled ? " fix" : "");
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? std::vector<std::string>{{"a trust line with deep freeze flag without "
"normal freeze was created"}}
: std::vector<std::string>{},
@@ -4818,7 +4811,7 @@ class Invariants_test : public beast::unit_test::Suite
// still fires ("a MPT issuance was created").
testcase << "overwrite: NoZeroEscrow MPT" + std::string(fixEnabled ? " fix" : "");
doInvariantCheck(
makeEnv(features),
Env(*this, features),
fixEnabled ? std::vector<std::string>{{"escrow specifies invalid amount"}}
: std::vector<std::string>{{"a MPT issuance was created"}},
[](Account const& a1, Account const&, ApplyContext& ac) {

View File

@@ -339,6 +339,11 @@ public:
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
[[nodiscard]] std::string const&
fingerprint() const override

View File

@@ -63,8 +63,8 @@ public:
negotiateProtocolVersion("RTXP/1.2, XRPL/2.0, XRPL/2.1") == makeProtocol(2, 1));
BEAST_EXPECT(negotiateProtocolVersion("XRPL/2.2") == makeProtocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
makeProtocol(2, 2));
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
makeProtocol(2, 3));
BEAST_EXPECT(negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);
BEAST_EXPECT(negotiateProtocolVersion("") == std::nullopt);
}

View File

@@ -190,6 +190,11 @@ public:
removeTxQueue(uint256 const&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};
/** Manually advanced clock. */

View File

@@ -1052,7 +1052,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if ((positions == 0u) && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -172,4 +172,22 @@ private:
std::unique_ptr<PeerSet> peerSet_;
};
inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE("ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}
} // namespace xrpl

View File

@@ -367,7 +367,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
byHash_ = true;

View File

@@ -6,6 +6,7 @@
#include <xrpld/overlay/PeerSet.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/Slice.h>
@@ -78,10 +79,78 @@ public:
XRPL_ASSERT(
hash.isNonZero(), "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() && (reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger();
bool const shouldAcquire = [&]() {
if (!needNetworkLedger)
return true;
if (reason == InboundLedger::Reason::GENERIC)
return true;
if (reason == InboundLedger::Reason::CONSENSUS)
return true;
return false;
}();
std::stringstream ss;
ss << "InboundLedger::acquire: "
<< "Request: " << to_string(hash) << ", " << seq
<< " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no")
<< " Reason: " << to_string(reason)
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
/* Acquiring ledgers is somewhat expensive. It requires lots of
* computation and network communication. Avoid it when it's not
* appropriate. Every validation from a peer for a ledger that
* we do not have locally results in a call to this function: even
* if we are moments away from validating the same ledger.
*/
bool const shouldBroadcast = [&]() {
// If the node is not in "full" state, it needs to sync to
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
LedgerIndex const validSeq = app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture = (seq > validSeq) && (seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
// will be false other than on a brand new network.)
bool const consensus = reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false") << ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq << ". Lag leeway: " << lagLeeway
<< ". request for near future ledger: " << (nearFuture ? "true" : "false")
<< ". Consensus: " << (consensus ? "true" : "false");
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
return false;
// If the request is because of consensus, do NOT send requests.
// This node is probably about to build it.
if (consensus)
return false;
return true;
}();
ss << ". Would broadcast to peers? " << (shouldBroadcast ? "true." : "false.");
if (!shouldAcquire)
{
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
return {};
}
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
@@ -89,6 +158,7 @@ public:
ScopedLockType sl(lock_);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -107,47 +177,51 @@ public:
++counter_;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
}
void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
ScopeUnlock const unlock(lock);
acquire(hash, seq, reason);
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -965,8 +965,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(journal_.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
JLOG(journal_.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< toShortString(ledger->header().hash) << ") with >= " << minVal
<< " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -25,7 +25,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, toShortString(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter))
@@ -41,6 +42,7 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -48,6 +50,10 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,6 +3,7 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -103,6 +104,7 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -34,6 +34,7 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/ToString.h>
#include <xrpl/basics/UnorderedContainers.h>
@@ -485,7 +486,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, char const* reason) override;
bool
isBlocked() override;
@@ -971,7 +972,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -1114,7 +1115,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mode_ != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1138,7 +1139,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mode_ == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(journal_.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1149,11 +1150,11 @@ NetworkOPsImp::processHeartbeatTimer()
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mode_ == OperatingMode::SYNCING)
{
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
}
else if (mode_ == OperatingMode::CONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
}
auto newMode = mode_.load();
if (origMode != newMode)
@@ -1858,7 +1859,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1889,7 +1890,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1989,7 +1990,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mode_ == OperatingMode::TRACKING) || (mode_ == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -2077,8 +2078,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mode_ == OperatingMode::FULL)
{
JLOG(journal_.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(journal_.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2206,7 +2207,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mode_ == OperatingMode::CONNECTED) || (mode_ == OperatingMode::TRACKING)) &&
@@ -2219,7 +2220,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.get().getTimeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -2231,7 +2232,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mode_ == OperatingMode::FULL) || (mode_ == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2535,7 +2536,7 @@ NetworkOPsImp::pubPeerStatus(std::function<json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2555,11 +2556,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mode_ == om)
return;
auto const sink = om < mode_ ? journal_.warn() : journal_.info();
mode_ = om;
accounting_.mode(om);
JLOG(journal_.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2568,36 +2570,24 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(journal_.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::No;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
bypassAccept = BypassAccept::Yes;
BypassAccept bypassAccept = check ? BypassAccept::No : BypassAccept::Yes;
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, journal_);
}
else
catch (std::exception const& e)
{
pendingValidations_.insert(val->getLedgerHash());
JLOG(journal_.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(journal_.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
ScopeUnlock const unlock(lock);
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, journal_);
}
catch (std::exception const& e)
{
JLOG(journal_.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(journal_.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::No)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);

View File

@@ -200,11 +200,11 @@ public:
iter->second.upVotes |
boost::adaptors::transformed(to_string<256, void>),
", ");
// TODO: Maybe transform using to_short_string once #5126 is
// TODO: Maybe transform using toShortString once #5126 is
// merged
//
// iter->second.upVotes |
// boost::adaptors::transformed(to_short_string<256, void>)
// boost::adaptors::transformed(toShortString<256, void>)
}
else
{

View File

@@ -17,6 +17,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
LedgerDataCookies
};
/** Represents a peer connection in the overlay. */
@@ -116,6 +117,13 @@ public:
[[nodiscard]] virtual bool
txReduceRelayEnabled() const = 0;
//
// Messages
//
virtual std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) = 0;
};
} // namespace xrpl

File diff suppressed because it is too large Load Diff

View File

@@ -178,6 +178,13 @@ private:
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
// Track message requests and responses
// TODO: Use an expiring cache or something
using MessageCookieMap = std::map<uint256, std::set<std::optional<uint64_t>>>;
using PeerCookieMap = std::map<std::shared_ptr<Peer>, std::set<std::optional<uint64_t>>>;
std::mutex mutable cookieLock_;
MessageCookieMap messageRequestCookies_;
friend class OverlayImpl;
class Metrics
@@ -422,6 +429,13 @@ public:
return txReduceRelayEnabled_;
}
//
// Messages
//
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override;
private:
void
close();
@@ -620,16 +634,22 @@ private:
std::shared_ptr<protocol::TMValidation> const& packet);
void
sendLedgerBase(std::shared_ptr<Ledger const> const& ledger, protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
sendToMultiple(protocol::TMLedgerData& ledgerData, PeerCookieMap const& destinations);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
protected:
// Kept `protected` so test subclasses (see

View File

@@ -7,6 +7,9 @@
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/protocol/digest.h>
#include <google/protobuf/message.h>
@@ -97,16 +100,45 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);
auto const messageHash = [&]() {
auto const packetBuffer = packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();
// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 30s;
if (peer)
{
peer->send(packet);
if (app_.getHashRouter().shouldProcessForPeer(messageHash, peer->id(), interval))
{
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to ["
<< peer->id() << "]: " << messageHash;
peer->send(packet);
}
else
JLOG(journal_.debug()) << "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << peer->id() << "]: " << messageHash;
return;
}
for (auto id : peers_)
{
if (auto p = app_.getOverlay().findPeerByShortID(id))
p->send(packet);
{
if (app_.getHashRouter().shouldProcessForPeer(messageHash, p->id(), interval))
{
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to ["
<< p->id() << "]: " << messageHash;
p->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << p->id() << "]: " << messageHash;
}
}
}

View File

@@ -23,6 +23,12 @@ protocolMessageType(protocol::TMGetLedger const&)
return protocol::mtGET_LEDGER;
}
inline protocol::MessageType
protocolMessageType(protocol::TMLedgerData const&)
{
return protocol::mtLEDGER_DATA;
}
inline protocol::MessageType
protocolMessageType(protocol::TMReplayDeltaRequest const&)
{
@@ -434,3 +440,63 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hin
}
} // namespace xrpl
namespace protocol {
template <class Hasher>
void
hash_append(Hasher& h, TMGetLedger const& msg)
{
using beast::hash_append;
using namespace xrpl;
hash_append(h, safeCast<int>(protocolMessageType(msg)));
hash_append(h, safeCast<int>(msg.itype()));
if (msg.has_ltype())
hash_append(h, safeCast<int>(msg.ltype()));
if (msg.has_ledgerhash())
hash_append(h, msg.ledgerhash());
if (msg.has_ledgerseq())
hash_append(h, msg.ledgerseq());
for (auto const& nodeId : msg.nodeids())
hash_append(h, nodeId);
hash_append(h, msg.nodeids_size());
// Do NOT include the request cookie. It does not affect the content of the
// request, but only where to route the results.
// if (msg.has_requestcookie())
// hash_append(h, msg.requestcookie());
if (msg.has_querytype())
hash_append(h, safeCast<int>(msg.querytype()));
if (msg.has_querydepth())
hash_append(h, msg.querydepth());
}
template <class Hasher>
void
hash_append(Hasher& h, TMLedgerData const& msg)
{
using beast::hash_append;
using namespace xrpl;
hash_append(h, safeCast<int>(protocolMessageType(msg)));
hash_append(h, msg.ledgerhash());
hash_append(h, msg.ledgerseq());
hash_append(h, safeCast<int>(msg.type()));
for (auto const& node : msg.nodes())
{
hash_append(h, node.nodedata());
if (node.has_nodeid())
hash_append(h, node.nodeid());
}
hash_append(h, msg.nodes_size());
if (msg.has_requestcookie())
hash_append(h, msg.requestcookie());
if (msg.has_error())
hash_append(h, safeCast<int>(msg.error()));
}
} // namespace protocol

View File

@@ -28,6 +28,8 @@ namespace xrpl {
constexpr ProtocolVersion const kSupportedProtocolList[]{
{2, 1},
{2, 2},
// Adds TMLedgerData::responseCookies and directResponse
{2, 3},
};
// This ugly construct ensures that supportedProtocolList is sorted in strictly