Compare commits

..

7 Commits

Author SHA1 Message Date
Ed Hennis
a2e93188fc Merge branch 'develop' into ximinez/fix-getledger 2025-11-18 22:39:22 -05:00
Ed Hennis
5406d28357 Merge branch 'develop' into ximinez/fix-getledger 2025-11-15 03:08:34 -05:00
Ed Hennis
7804c09494 Merge branch 'develop' into ximinez/fix-getledger 2025-11-13 12:18:58 -05:00
Ed Hennis
79d294bd2d Merge branch 'develop' into ximinez/fix-getledger 2025-11-12 14:12:47 -05:00
Ed Hennis
acace507d0 Fix formatting 2025-11-10 19:52:59 -05:00
Ed Hennis
98732100fb Reduce duplicate peer traffic for ledger data (#5126)
- Drop duplicate outgoing TMGetLedger messages per peer
  - Allow a retry after 30s in case of peer or network congestion.
  - Addresses RIPD-1870
  - (Changes levelization. That is not desirable, and will need to be fixed.)
- Drop duplicate incoming TMGetLedger messages per peer
  - Allow a retry after 15s in case of peer or network congestion.
  - The requestCookie is ignored when computing the hash, thus increasing
    the chances of detecting duplicate messages.
  - With duplicate messages, keep track of the different requestCookies
    (or lack of cookie). When work is finally done for a given request,
    send the response to all the peers that are waiting on the request,
    sending one message per peer, including all the cookies and
    a "directResponse" flag indicating the data is intended for the
    sender, too.
  - Addresses RIPD-1871
- Drop duplicate incoming TMLedgerData messages
  - Addresses RIPD-1869
2025-11-10 19:52:59 -05:00
Ed Hennis
b186516d0a Improve job queue collision checks and logging
- Improve logging related to ledger acquisition and operating mode
  changes
- Class "CanProcess" to keep track of processing of distinct items
2025-11-10 19:52:59 -05:00
29 changed files with 996 additions and 552 deletions

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -1,70 +0,0 @@
#ifndef XRPL_BASICS_MALLOCTRIM_H_INCLUDED
#define XRPL_BASICS_MALLOCTRIM_H_INCLUDED
#include <xrpl/beast/utility/Journal.h>
#include <optional>
#include <string>
namespace ripple {
// -----------------------------------------------------------------------------
// Allocator interaction note:
// - This facility invokes glibc's malloc_trim(0) on Linux/glibc to request that
// ptmalloc return free heap pages to the OS.
// - If an alternative allocator (e.g. jemalloc or tcmalloc) is linked or
// preloaded (LD_PRELOAD), calling glibc's malloc_trim typically has no effect
// on the *active* heap. The call is harmless but may not reclaim memory
// because those allocators manage their own arenas.
// - Only glibc sbrk/arena space is eligible for trimming; large mmap-backed
// allocations are usually returned to the OS on free regardless of trimming.
// - Call at known reclamation points (e.g., after cache sweeps / online delete)
// and consider rate limiting to avoid churn.
// -----------------------------------------------------------------------------
struct MallocTrimReport
{
bool supported{false};
int trimResult{-1};
long rssBeforeKB{-1};
long rssAfterKB{-1};
[[nodiscard]] long
deltaKB() const noexcept
{
if (rssBeforeKB < 0 || rssAfterKB < 0)
return 0;
return rssAfterKB - rssBeforeKB;
}
};
/**
* @brief Attempt to return freed memory to the operating system.
*
* On Linux with glibc malloc, this issues ::malloc_trim(0), which may release
* free space from ptmalloc arenas back to the kernel. On other platforms, or if
* a different allocator is in use, this function is a no-op and the report will
* indicate that trimming is unsupported or had no effect.
*
* @param tag Optional identifier for logging/debugging purposes.
* @param journal Journal for diagnostic logging.
* @return Report containing before/after metrics and the trim result.
*
* @note If an alternative allocator (jemalloc/tcmalloc) is linked or preloaded,
* calling glibc's malloc_trim may have no effect on the active heap. The
* call is harmless but typically does not reclaim memory under those
* allocators.
*
* @note Only memory served from glibc's sbrk/arena heaps is eligible for trim.
* Large allocations satisfied via mmap are usually returned on free
* independently of trimming.
*
* @note Intended for use after operations that free significant memory (e.g.,
* cache sweeps, ledger cleanup, online delete). Consider rate limiting.
*/
MallocTrimReport
mallocTrim(std::optional<std::string> const& tag, beast::Journal journal);
} // namespace ripple
#endif

View File

@@ -286,8 +286,18 @@ message TMLedgerData {
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}
message TMPing {

View File

@@ -36,6 +36,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -1,121 +0,0 @@
#include <xrpl/basics/Log.h>
#include <xrpl/basics/MallocTrim.h>
#include <boost/predef.h>
#include <cstdio>
#include <fstream>
#if defined(__GLIBC__) && BOOST_OS_LINUX
#include <malloc.h>
#include <unistd.h>
namespace {
pid_t const cachedPid = ::getpid();
} // namespace
#endif
namespace ripple {
namespace detail {
#if defined(__GLIBC__) && BOOST_OS_LINUX
long
parseVmRSSkB(std::string const& status)
{
std::istringstream iss(status);
std::string line;
while (std::getline(iss, line))
{
// Allow leading spaces/tabs before the key.
auto const firstNonWs = line.find_first_not_of(" \t");
if (firstNonWs == std::string::npos)
continue;
constexpr char key[] = "VmRSS:";
constexpr auto keyLen = sizeof(key) - 1;
// Require the line (after leading whitespace) to start with "VmRSS:".
// Check if we have enough characters and the substring matches.
if (firstNonWs + keyLen > line.size() ||
line.substr(firstNonWs, keyLen) != key)
continue;
// Move past "VmRSS:" and any following whitespace.
auto pos = firstNonWs + keyLen;
while (pos < line.size() &&
std::isspace(static_cast<unsigned char>(line[pos])))
{
++pos;
}
long value = -1;
if (std::sscanf(line.c_str() + pos, "%ld", &value) == 1)
return value;
// Found the key but couldn't parse a number.
return -1;
}
// No VmRSS line found.
return -1;
}
#endif // __GLIBC__ && BOOST_OS_LINUX
} // namespace detail
MallocTrimReport
mallocTrim(
[[maybe_unused]] std::optional<std::string> const& tag,
beast::Journal journal)
{
MallocTrimReport report;
#if !(defined(__GLIBC__) && BOOST_OS_LINUX)
JLOG(journal.debug()) << "malloc_trim not supported on this platform";
#else
report.supported = true;
if (journal.debug())
{
auto readFile = [](std::string const& path) -> std::string {
std::ifstream ifs(path);
if (!ifs.is_open())
return {};
return std::string(
std::istreambuf_iterator<char>(ifs),
std::istreambuf_iterator<char>());
};
std::string const tagStr = tag.value_or("default");
std::string const statusPath =
"/proc/" + std::to_string(cachedPid) + "/status";
auto const statusBefore = readFile(statusPath);
report.rssBeforeKB = detail::parseVmRSSkB(statusBefore);
report.trimResult = ::malloc_trim(0);
auto const statusAfter = readFile(statusPath);
report.rssAfterKB = detail::parseVmRSSkB(statusAfter);
JLOG(journal.debug())
<< "malloc_trim tag=" << tagStr << " result=" << report.trimResult
<< " rss_before=" << report.rssBeforeKB << "kB"
<< " rss_after=" << report.rssAfterKB << "kB"
<< " delta=" << report.deltaKB() << "kB";
}
else
{
report.trimResult = ::malloc_trim(0);
}
#endif
return report;
}
} // namespace ripple

View File

@@ -389,6 +389,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(!any(HF::UNDEFINED));
}
void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(getSetup(5s, 5s), stopwatch);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}
public:
void
run() override
@@ -401,6 +428,7 @@ public:
testProcess();
testSetup();
testFlagsOps();
testProcessPeer();
}
};

View File

@@ -305,6 +305,11 @@ public:
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
std::string const&
fingerprint() const override

View File

@@ -69,8 +69,8 @@ public:
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);

View File

@@ -170,6 +170,11 @@ public:
removeTxQueue(uint256 const&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};
/** Manually advanced clock. */

View File

@@ -1,207 +0,0 @@
#include <xrpl/basics/MallocTrim.h>
#include <boost/predef.h>
#include <doctest/doctest.h>
using namespace ripple;
#if defined(__GLIBC__) && BOOST_OS_LINUX
namespace ripple::detail {
long
parseVmRSSkB(std::string const& status);
} // namespace ripple::detail
#endif
TEST_CASE("MallocTrimReport structure")
{
// Test default construction
MallocTrimReport report;
CHECK(report.supported == false);
CHECK(report.trimResult == -1);
CHECK(report.rssBeforeKB == -1);
CHECK(report.rssAfterKB == -1);
CHECK(report.deltaKB() == 0);
// Test deltaKB calculation - memory freed
report.rssBeforeKB = 1000;
report.rssAfterKB = 800;
CHECK(report.deltaKB() == -200);
// Test deltaKB calculation - memory increased
report.rssBeforeKB = 500;
report.rssAfterKB = 600;
CHECK(report.deltaKB() == 100);
// Test deltaKB calculation - no change
report.rssBeforeKB = 1234;
report.rssAfterKB = 1234;
CHECK(report.deltaKB() == 0);
}
#if defined(__GLIBC__) && BOOST_OS_LINUX
TEST_CASE("parseVmRSSkB")
{
using ripple::detail::parseVmRSSkB;
// Test standard format
{
std::string status = "VmRSS: 123456 kB\n";
long result = parseVmRSSkB(status);
CHECK(result == 123456);
}
// Test with multiple lines
{
std::string status =
"Name: rippled\n"
"VmPeak: 1234567 kB\n"
"VmSize: 1234567 kB\n"
"VmRSS: 987654 kB\n"
"VmData: 123456 kB\n";
long result = parseVmRSSkB(status);
CHECK(result == 987654);
}
// Test with minimal whitespace
{
std::string status = "VmRSS: 42 kB";
long result = parseVmRSSkB(status);
CHECK(result == 42);
}
// Test with extra whitespace
{
std::string status = "VmRSS: 999999 kB";
long result = parseVmRSSkB(status);
CHECK(result == 999999);
}
// Test with tabs
{
std::string status = "VmRSS:\t\t12345 kB";
long result = parseVmRSSkB(status);
// Note: tabs are not explicitly handled as spaces, this documents
// current behavior
CHECK(result == 12345);
}
// Test zero value
{
std::string status = "VmRSS: 0 kB\n";
long result = parseVmRSSkB(status);
CHECK(result == 0);
}
// Test missing VmRSS
{
std::string status =
"Name: rippled\n"
"VmPeak: 1234567 kB\n"
"VmSize: 1234567 kB\n";
long result = parseVmRSSkB(status);
CHECK(result == -1);
}
// Test empty string
{
std::string status = "";
long result = parseVmRSSkB(status);
CHECK(result == -1);
}
// Test malformed data (VmRSS but no number)
{
std::string status = "VmRSS: \n";
long result = parseVmRSSkB(status);
// sscanf should fail to parse and return -1 unchanged
CHECK(result == -1);
}
// Test malformed data (VmRSS but invalid number)
{
std::string status = "VmRSS: abc kB\n";
long result = parseVmRSSkB(status);
// sscanf should fail and return -1 unchanged
CHECK(result == -1);
}
// Test partial match (should not match "NotVmRSS:")
{
std::string status = "NotVmRSS: 123456 kB\n";
long result = parseVmRSSkB(status);
CHECK(result == -1);
}
}
#endif
TEST_CASE("mallocTrim basic functionality")
{
beast::Journal journal{beast::Journal::getNullSink()};
// Test with no tag
{
MallocTrimReport report = mallocTrim(std::nullopt, journal);
#if defined(__GLIBC__) && BOOST_OS_LINUX
// On Linux with glibc, should be supported
CHECK(report.supported == true);
// trimResult should be 0 or 1 (success indicators)
CHECK(report.trimResult >= 0);
#else
// On other platforms, should be unsupported
CHECK(report.supported == false);
CHECK(report.trimResult == -1);
CHECK(report.rssBeforeKB == -1);
CHECK(report.rssAfterKB == -1);
#endif
}
// Test with tag
{
MallocTrimReport report =
mallocTrim(std::optional<std::string>("test_tag"), journal);
#if defined(__GLIBC__) && BOOST_OS_LINUX
CHECK(report.supported == true);
CHECK(report.trimResult >= 0);
#else
CHECK(report.supported == false);
#endif
}
}
TEST_CASE("mallocTrim with debug logging")
{
beast::Journal journal{beast::Journal::getNullSink()};
MallocTrimReport report =
mallocTrim(std::optional<std::string>("debug_test"), journal);
#if defined(__GLIBC__) && BOOST_OS_LINUX
CHECK(report.supported == true);
// The function should complete without crashing
#else
CHECK(report.supported == false);
#endif
}
TEST_CASE("mallocTrim repeated calls")
{
beast::Journal journal{beast::Journal::getNullSink()};
// Call malloc_trim multiple times to ensure it's safe
for (int i = 0; i < 5; ++i)
{
MallocTrimReport report = mallocTrim(
std::optional<std::string>("iteration_" + std::to_string(i)),
journal);
#if defined(__GLIBC__) && BOOST_OS_LINUX
CHECK(report.supported == true);
CHECK(report.trimResult >= 0);
#else
CHECK(report.supported == false);
#endif
}
}

View File

@@ -1056,7 +1056,8 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -179,6 +179,25 @@ private:
std::unique_ptr<PeerSet> mPeerSet;
};
inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE(
"ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}
} // namespace ripple
#endif

View File

@@ -373,7 +373,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true;

View File

@@ -5,9 +5,9 @@
#include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/protocol/jss.h>
@@ -58,11 +58,85 @@ public:
hash.isNonZero(),
"ripple::InboundLedgersImp::acquire::doAcquire : nonzero hash");
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger();
bool const shouldAcquire = [&]() {
if (!needNetworkLedger)
return true;
if (reason == InboundLedger::Reason::GENERIC)
return true;
if (reason == InboundLedger::Reason::CONSENSUS)
return true;
return false;
}();
std::stringstream ss;
ss << "InboundLedger::acquire: "
<< "Request: " << to_string(hash) << ", " << seq
<< " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no")
<< " Reason: " << to_string(reason)
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
/* Acquiring ledgers is somewhat expensive. It requires lots of
* computation and network communication. Avoid it when it's not
* appropriate. Every validation from a peer for a ledger that
* we do not have locally results in a call to this function: even
* if we are moments away from validating the same ledger.
*/
bool const shouldBroadcast = [&]() {
// If the node is not in "full" state, it needs to sync to
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
LedgerIndex const validSeq =
app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture =
(seq > validSeq) && (seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
// will be false other than on a brand new network.)
bool const consensus =
reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false")
<< ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq
<< ". Lag leeway: " << lagLeeway
<< ". request for near future ledger: "
<< (nearFuture ? "true" : "false")
<< ". Consensus: " << (consensus ? "true" : "false");
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
return false;
// If the request is because of consensus, do NOT send requests.
// This node is probably about to build it.
if (consensus)
return false;
return true;
}();
ss << ". Would broadcast to peers? "
<< (shouldBroadcast ? "true." : "false.");
if (!shouldAcquire)
{
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
return {};
}
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
@@ -70,6 +144,7 @@ public:
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -93,23 +168,29 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
return perf::measureDurationAndLog(
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
}
void
@@ -118,28 +199,25 @@ public:
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock unlock(lock);
acquire(hash, seq, reason);
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new inbound ledger "
<< hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -942,8 +942,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
}
JLOG(m_journal.info()) << "Advancing accepted ledger to "
<< ledger->info().seq << " with >= " << minVal
<< " validations";
<< ledger->info().seq << " ("
<< to_short_string(ledger->info().hash)
<< ") with >= " << minVal << " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -12,7 +12,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timeouts_(0)
, complete_(false)
@@ -32,6 +33,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
<< "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
@@ -40,6 +43,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec << " (operation_aborted: "
<< boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted"
: "other")
<< ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -5,6 +5,7 @@
#include <xrpld/core/Job.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -104,6 +105,7 @@ protected:
// Used in this class for access to boost::asio::io_context and
// ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -37,7 +37,6 @@
#include <xrpld/shamap/NodeFamily.h>
#include <xrpl/basics/ByteUtilities.h>
#include <xrpl/basics/MallocTrim.h>
#include <xrpl/basics/ResolverAsio.h>
#include <xrpl/basics/random.h>
#include <xrpl/beast/asio/io_latency_probe.h>
@@ -1107,8 +1106,6 @@ public:
<< "; size after: " << cachedSLEs_.size();
}
mallocTrim(std::optional<std::string>("doSweep"), m_journal);
// Set timer to do another sweep later.
setSweepTimer();
}

View File

@@ -75,6 +75,20 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), tx_interval);
}
bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);
auto& entry = emplace(key).first;
return entry.shouldProcessForPeer(
peer, suppressionMap_.clock().now(), interval);
}
HashRouterFlags
HashRouter::getFlags(uint256 const& key)
{
@@ -149,4 +163,13 @@ setup_HashRouter(Config const& config)
return setup;
}
auto
HashRouter::getPeers(uint256 const& key) -> std::set<PeerShortID>
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
return s.peekPeerSet();
}
} // namespace ripple

View File

@@ -140,6 +140,13 @@ private:
return std::move(peers_);
}
/** Return set of peers waiting for reply. Leaves list unchanged. */
std::set<PeerShortID> const&
peekPeerSet()
{
return peers_;
}
/** Return seated relay time point if the message has been relayed */
std::optional<Stopwatch::time_point>
relayed() const
@@ -173,6 +180,21 @@ private:
return true;
}
bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) &&
((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}
private:
HashRouterFlags flags_ = HashRouterFlags::UNDEFINED;
std::set<PeerShortID> peers_;
@@ -180,6 +202,7 @@ private:
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};
public:
@@ -203,7 +226,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
@@ -223,6 +246,18 @@ public:
HashRouterFlags& flags,
std::chrono::seconds tx_interval);
/** Determines whether the hashed item should be processed for the given
peer. Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval);
/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
@@ -248,6 +283,11 @@ public:
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);
/** Returns a copy of the set of peers in the Entry for the key
*/
std::set<PeerShortID>
getPeers(uint256 const& key);
private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool>

View File

@@ -34,11 +34,10 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/MallocTrim.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/crypto/RFC1751.h>
#include <xrpl/crypto/csprng.h>
@@ -409,7 +408,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, char const* reason) override;
bool
isBlocked() override;
@@ -887,7 +886,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -1037,7 +1036,9 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(
OperatingMode::DISCONNECTED,
"Heartbeat: insufficient peers");
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1062,7 +1063,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
@@ -1074,9 +1075,9 @@ NetworkOPsImp::processHeartbeatTimer()
auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
auto newMode = mMode.load();
if (origMode != newMode)
{
@@ -1825,7 +1826,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1856,7 +1857,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1957,7 +1958,7 @@ NetworkOPsImp::checkLastClosedLedger(
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -2046,8 +2047,9 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(m_journal.warn())
<< "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2182,7 +2184,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mMode == OperatingMode::CONNECTED) ||
@@ -2196,7 +2198,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2 * current->info().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -2208,7 +2210,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2527,7 +2529,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2547,15 +2549,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om)
return;
auto const oldMode = mMode.load(std::memory_order_relaxed);
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
if (oldMode != OperatingMode::FULL && om == OperatingMode::FULL)
mallocTrim(std::optional<std::string>("SyncComplete"), m_journal);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2567,34 +2566,28 @@ NetworkOPsImp::recvValidation(
JLOG(m_journal.trace())
<< "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
scope_unlock unlock(lock);
handleNewValidation(app_, val, source, bypassAccept, m_journal);
CanProcess const check(
validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
BypassAccept bypassAccept =
check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(app_, val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);

View File

@@ -191,7 +191,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, char const* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -5,7 +5,6 @@
#include <xrpld/app/rdb/backend/SQLiteDatabase.h>
#include <xrpld/core/ConfigSections.h>
#include <xrpl/basics/MallocTrim.h>
#include <xrpl/beast/core/CurrentThreadName.h>
#include <xrpl/nodestore/Scheduler.h>
#include <xrpl/nodestore/detail/DatabaseRotatingImp.h>
@@ -546,8 +545,6 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq)
{
ledgerMaster_->clearLedgerCachePrior(validatedSeq);
fullBelowCache_->clear();
mallocTrim(std::optional<std::string>("clearCaches"), journal_);
}
void
@@ -613,8 +610,6 @@ SHAMapStoreImp::clearPrior(LedgerIndex lastRotated)
});
if (healthWait() == stopping)
return;
mallocTrim(std::optional<std::string>("clearPrior"), journal_);
}
SHAMapStoreImp::HealthResult

View File

@@ -18,6 +18,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
LedgerDataCookies
};
/** Represents a peer connection in the overlay. */
@@ -117,6 +118,13 @@ public:
virtual bool
txReduceRelayEnabled() const = 0;
//
// Messages
//
virtual std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) = 0;
};
} // namespace ripple

View File

@@ -11,6 +11,7 @@
#include <xrpld/app/tx/apply.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/ProtocolMessage.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
@@ -45,6 +46,8 @@ std::chrono::seconds constexpr peerTimerInterval{60};
/** The timeout for a shutdown timer */
std::chrono::seconds constexpr shutdownTimerInterval{5};
/** How often we process duplicate incoming TMGetLedger messages */
std::chrono::seconds constexpr getledgerInterval{15};
} // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix
@@ -499,6 +502,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
case ProtocolFeature::LedgerDataCookies:
return protocol_ >= make_protocol(2, 3);
}
return false;
}
@@ -1460,8 +1465,9 @@ PeerImp::handleTransaction(
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
auto badData = [&](std::string const& msg, bool chargefee = true) {
if (chargefee)
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
@@ -1538,12 +1544,74 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Drop duplicate requests from the same peer for at least
// `getLedgerInterval` seconds.
// Append a little junk to prevent the hash of an incoming messsage
// from matching the hash of the same outgoing message.
// `shouldProcessForPeer` does not distingish between incoming and
// outgoing, and some of the message relay logic checks the hash to see
// if the message has been relayed already. If the hashes are the same,
// a duplicate will be detected when sending the message is attempted,
// so it will fail.
auto const messageHash = sha512Half(*m, nullptr);
// Request cookies are not included in the hash. Track them here.
auto const requestCookie = [&m]() -> std::optional<uint64_t> {
if (m->has_requestcookie())
return m->requestcookie();
return std::nullopt;
}();
auto const [inserted, pending] = [&] {
std::lock_guard lock{cookieLock_};
auto& cookies = messageRequestCookies_[messageHash];
bool const pending = !cookies.empty();
return std::pair{cookies.emplace(requestCookie).second, pending};
}();
// Check if the request has been seen from this peer.
if (!app_.getHashRouter().shouldProcessForPeer(
messageHash, id_, getledgerInterval))
{
// This request has already been seen from this peer.
// Has it been seen with this request cookie (or lack thereof)?
if (inserted)
{
// This is a duplicate request, but with a new cookie. When a
// response is ready, one will be sent for each request cookie.
JLOG(p_journal_.debug())
<< "TMGetLedger: duplicate request with new request cookie: "
<< requestCookie.value_or(0)
<< ". Job pending: " << (pending ? "yes" : "no") << ": "
<< messageHash;
if (pending)
{
// Don't bother queueing up a new job if other requests are
// already pending. This should limit entries in the job queue
// to one per peer per unique request.
JLOG(p_journal_.debug())
<< "TMGetLedger: Suppressing recvGetLedger job, since one "
"is pending: "
<< messageHash;
return;
}
}
else
{
// Don't punish nodes that don't know any better
return badData(
"duplicate request: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
}
// Queue a job to process the request
JLOG(p_journal_.debug())
<< "TMGetLedger: Adding recvGetLedger job: " << messageHash;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
});
app_.getJobQueue().addJob(
jtLEDGER_REQ, "recvGetLedger", [weak, m, messageHash]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m, messageHash);
});
}
void
@@ -1659,8 +1727,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg);
auto badData = [&](std::string const& msg, bool charge = true) {
if (charge)
fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
};
@@ -1711,23 +1780,99 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
"Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
}
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
auto const messageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_))
{
if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
bool const routed = m->has_directresponse() || m->responsecookies_size() ||
m->has_requestcookie();
{
// Check if this message needs to be forwarded to one or more peers.
// Maximum of one of the relevant fields should be populated.
XRPL_ASSERT(
!m->has_requestcookie() || !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) : valid cookie fields");
// Make a copy of the response cookies, then wipe the list so it can be
// forwarded cleanly
auto const responseCookies = m->responsecookies();
m->clear_responsecookies();
// Flag indicating if this response should be processed locally,
// possibly in addition to being forwarded.
bool const directResponse =
m->has_directresponse() && m->directresponse();
m->clear_directresponse();
auto const relay = [this, m, &messageHash](auto const cookie) {
if (auto peer = overlay_.findPeerByShortID(cookie))
{
XRPL_ASSERT(
!m->has_requestcookie() && !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) relay : no "
"cookies");
if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies))
// Setting this flag is not _strictly_ necessary for peers
// that support it if there are no cookies included in the
// message, but it is more accurate.
m->set_directresponse(true);
else
m->clear_directresponse();
peer->send(
std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
}
else
JLOG(p_journal_.info())
<< "Unable to route TX/ledger data reply to peer ["
<< cookie << "]: " << messageHash;
};
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
{
XRPL_ASSERT(
responseCookies.empty(),
"ripple::PeerImp::onMessage(TMLedgerData) : no response "
"cookies");
m->clear_requestcookie();
peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
relay(m->requestcookie());
if (!directResponse && responseCookies.empty())
return;
}
else
// If there's a list of request cookies, attempt to relay the message to
// all of them.
if (responseCookies.size())
{
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
for (auto const cookie : responseCookies)
relay(cookie);
if (!directResponse)
return;
}
}
// Now that any forwarding is done check the base message (data only, no
// routing info for duplicates)
if (routed)
{
m->clear_directresponse();
XRPL_ASSERT(
!m->has_requestcookie() && !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) : no cookies");
auto const baseMessageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_))
{
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(baseMessageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
return;
}
uint256 const ledgerHash{m->ledgerhash()};
// Otherwise check if received data for a candidate transaction set
if (m->type() == protocol::liTS_CANDIDATE)
{
@@ -3142,16 +3287,22 @@ PeerImp::checkValidation(
// the TX tree with the specified root hash.
//
static std::shared_ptr<PeerImp>
getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
getPeerWithTree(
OverlayImpl& ov,
uint256 const& rootHash,
PeerImp const* skip,
std::function<bool(Peer::id_t)> shouldProcessCallback)
{
std::shared_ptr<PeerImp> ret;
int retScore = 0;
XRPL_ASSERT(
shouldProcessCallback, "ripple::getPeerWithTree : callback provided");
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
if (p->hasTxSet(rootHash) && p.get() != skip)
{
auto score = p->getScore(true);
if (!ret || (score > retScore))
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
{
ret = std::move(p);
retScore = score;
@@ -3170,16 +3321,19 @@ getPeerWithLedger(
OverlayImpl& ov,
uint256 const& ledgerHash,
LedgerIndex ledger,
PeerImp const* skip)
PeerImp const* skip,
std::function<bool(Peer::id_t)> shouldProcessCallback)
{
std::shared_ptr<PeerImp> ret;
int retScore = 0;
XRPL_ASSERT(
shouldProcessCallback, "ripple::getPeerWithLedger : callback provided");
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
{
auto score = p->getScore(true);
if (!ret || (score > retScore))
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
{
ret = std::move(p);
retScore = score;
@@ -3193,7 +3347,8 @@ getPeerWithLedger(
void
PeerImp::sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData)
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations)
{
JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
@@ -3225,15 +3380,102 @@ PeerImp::sendLedgerBase(
}
}
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
send(message);
sendToMultiple(ledgerData, destinations);
}
void
PeerImp::sendToMultiple(
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations)
{
bool foundSelf = false;
for (auto const& [peer, cookies] : destinations)
{
if (peer.get() == this)
foundSelf = true;
bool const multipleCookies =
peer->supportsFeature(ProtocolFeature::LedgerDataCookies);
std::vector<std::uint64_t> sendCookies;
bool directResponse = false;
if (!multipleCookies)
{
JLOG(p_journal_.debug())
<< "sendToMultiple: Sending " << cookies.size()
<< " TMLedgerData messages to peer [" << peer->id()
<< "]: " << sha512Half(ledgerData);
}
for (auto const& cookie : cookies)
{
// Unfortunately, need a separate Message object for every
// combination
if (cookie)
{
if (multipleCookies)
{
// Save this one for later to send a single message
sendCookies.emplace_back(*cookie);
continue;
}
// Feature not supported, so send a single message with a
// single cookie
ledgerData.set_requestcookie(*cookie);
}
else
{
if (multipleCookies)
{
// Set this flag later on the single message
directResponse = true;
continue;
}
ledgerData.clear_requestcookie();
}
XRPL_ASSERT(
!multipleCookies,
"ripple::PeerImp::sendToMultiple : ledger data cookies "
"unsupported");
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);
}
if (multipleCookies)
{
// Send a single message with all the cookies and/or the direct
// response flag, so the receiver can farm out the single message to
// multiple peers and/or itself
XRPL_ASSERT(
sendCookies.size() || directResponse,
"ripple::PeerImp::sendToMultiple : valid response options");
ledgerData.clear_requestcookie();
ledgerData.clear_responsecookies();
ledgerData.set_directresponse(directResponse);
for (auto const& cookie : sendCookies)
ledgerData.add_responsecookies(cookie);
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);
JLOG(p_journal_.debug())
<< "sendToMultiple: Sent 1 TMLedgerData message to peer ["
<< peer->id() << "]: including "
<< (directResponse ? "the direct response flag and " : "")
<< sendCookies.size() << " response cookies. "
<< ": " << sha512Half(ledgerData);
}
}
XRPL_ASSERT(
foundSelf, "ripple::PeerImp::sendToMultiple : current peer included");
}
std::shared_ptr<Ledger const>
PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::getLedger(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash)
{
JLOG(p_journal_.trace()) << "getLedger: Ledger";
JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash;
std::shared_ptr<Ledger const> ledger;
@@ -3250,22 +3492,33 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
if (m->has_querytype() && !m->has_requestcookie())
{
// Attempt to relay the request to a peer
// Note repeated messages will not relay to the same peer
// before `getLedgerInterval` seconds. This prevents one
// peer from getting flooded, and distributes the request
// load. If a request has been relayed to all eligible
// peers, then this message will not be relayed.
if (auto const peer = getPeerWithLedger(
overlay_,
ledgerHash,
m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
this,
[&](Peer::id_t id) {
return app_.getHashRouter().shouldProcessForPeer(
mHash, id, getledgerInterval);
}))
{
m->set_requestcookie(id());
peer->send(
std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug())
<< "getLedger: Request relayed to peer";
<< "getLedger: Request relayed to peer [" << peer->id()
<< "]: " << mHash;
return ledger;
}
JLOG(p_journal_.trace())
<< "getLedger: Failed to find peer to relay request";
<< "getLedger: Don't have ledger with hash " << ledgerHash
<< ": " << mHash;
}
}
}
@@ -3275,7 +3528,7 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
{
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request";
<< "getLedger: Early ledger sequence request " << mHash;
}
else
{
@@ -3284,7 +3537,7 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
{
JLOG(p_journal_.debug())
<< "getLedger: Don't have ledger with sequence "
<< m->ledgerseq();
<< m->ledgerseq() << ": " << mHash;
}
}
}
@@ -3307,29 +3560,33 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
Resource::feeMalformedRequest, "get_ledger ledgerSeq");
ledger.reset();
JLOG(p_journal_.warn())
<< "getLedger: Invalid ledger sequence " << ledgerSeq;
JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence "
<< ledgerSeq << ": " << mHash;
}
}
else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
{
ledger.reset();
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request " << ledgerSeq;
<< "getLedger: Early ledger sequence request " << ledgerSeq
<< ": " << mHash;
}
}
else
{
JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
JLOG(p_journal_.debug())
<< "getLedger: Unable to find ledger " << mHash;
}
return ledger;
}
std::shared_ptr<SHAMap const>
PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
PeerImp::getTxSet(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash) const
{
JLOG(p_journal_.trace()) << "getTxSet: TX set";
JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash;
uint256 const txSetHash{m->ledgerhash()};
std::shared_ptr<SHAMap> shaMap{
@@ -3339,22 +3596,34 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
if (m->has_querytype() && !m->has_requestcookie())
{
// Attempt to relay the request to a peer
if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
// Note repeated messages will not relay to the same peer
// before `getLedgerInterval` seconds. This prevents one
// peer from getting flooded, and distributes the request
// load. If a request has been relayed to all eligible
// peers, then this message will not be relayed.
if (auto const peer = getPeerWithTree(
overlay_, txSetHash, this, [&](Peer::id_t id) {
return app_.getHashRouter().shouldProcessForPeer(
mHash, id, getledgerInterval);
}))
{
m->set_requestcookie(id());
peer->send(
std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
JLOG(p_journal_.debug())
<< "getTxSet: Request relayed to peer [" << peer->id()
<< "]: " << mHash;
}
else
{
JLOG(p_journal_.debug())
<< "getTxSet: Failed to find relay peer";
<< "getTxSet: Failed to find relay peer: " << mHash;
}
}
else
{
JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
JLOG(p_journal_.debug())
<< "getTxSet: Failed to find TX set " << mHash;
}
}
@@ -3362,7 +3631,9 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
}
void
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
@@ -3376,9 +3647,74 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
bool fatLeaves{true};
auto const itype{m->itype()};
auto getDestinations = [&] {
// If a ledger data message is generated, it's going to be sent to every
// peer that is waiting for it.
PeerCookieMap result;
std::size_t numCookies = 0;
{
// Don't do the work under this peer if this peer is not waiting for
// any replies
auto myCookies = releaseRequestCookies(mHash);
if (myCookies.empty())
{
JLOG(p_journal_.debug()) << "TMGetLedger: peer is no longer "
"waiting for response to request: "
<< mHash;
return result;
}
numCookies += myCookies.size();
result[shared_from_this()] = myCookies;
}
std::set<HashRouter::PeerShortID> const peers =
app_.getHashRouter().getPeers(mHash);
for (auto const peerID : peers)
{
// This loop does not need to be done under the HashRouter
// lock because findPeerByShortID and releaseRequestCookies
// are thread safe, and everything else is local
if (auto p = overlay_.findPeerByShortID(peerID))
{
auto cookies = p->releaseRequestCookies(mHash);
numCookies += cookies.size();
if (result.contains(p))
{
// Unlikely, but if a request came in to this peer while
// iterating, add the items instead of copying /
// overwriting.
XRPL_ASSERT(
p.get() == this,
"ripple::PeerImp::processLedgerRequest : found self in "
"map");
for (auto const& cookie : cookies)
result[p].emplace(cookie);
}
else if (cookies.size())
result[p] = cookies;
}
}
JLOG(p_journal_.debug())
<< "TMGetLedger: Processing request for " << result.size()
<< " peers. Will send " << numCookies
<< " messages if successful: " << mHash;
return result;
};
// Will only populate this if we're going to do work.
PeerCookieMap destinations;
if (itype == protocol::liTS_CANDIDATE)
{
if (sharedMap = getTxSet(m); !sharedMap)
destinations = getDestinations();
if (destinations.empty())
// Nowhere to send the response!
return;
if (sharedMap = getTxSet(m, mHash); !sharedMap)
return;
map = sharedMap.get();
@@ -3386,8 +3722,6 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerseq(0);
ledgerData.set_ledgerhash(m->ledgerhash());
ledgerData.set_type(protocol::liTS_CANDIDATE);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
// We'll already have most transactions
fatLeaves = false;
@@ -3406,7 +3740,12 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
return;
}
if (ledger = getLedger(m); !ledger)
destinations = getDestinations();
if (destinations.empty())
// Nowhere to send the response!
return;
if (ledger = getLedger(m, mHash); !ledger)
return;
// Fill out the reply
@@ -3414,13 +3753,11 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
ledgerData.set_ledgerseq(ledger->info().seq);
ledgerData.set_type(itype);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
switch (itype)
{
case protocol::liBASE:
sendLedgerBase(ledger, ledgerData);
sendLedgerBase(ledger, ledgerData, destinations);
return;
case protocol::liTX_NODE:
@@ -3537,7 +3874,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
if (ledgerData.nodes_size() == 0)
return;
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
sendToMultiple(ledgerData, destinations);
}
int
@@ -3585,6 +3922,19 @@ PeerImp::isHighLatency() const
return latency_ >= peerHighLatency;
}
std::set<std::optional<uint64_t>>
PeerImp::releaseRequestCookies(uint256 const& requestHash)
{
std::set<std::optional<uint64_t>> result;
std::lock_guard lock(cookieLock_);
if (messageRequestCookies_.contains(requestHash))
{
std::swap(result, messageRequestCookies_[requestHash]);
messageRequestCookies_.erase(requestHash);
}
return result;
};
void
PeerImp::Metrics::add_message(std::uint64_t bytes)
{

View File

@@ -253,6 +253,15 @@ private:
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
// Track message requests and responses
// TODO: Use an expiring cache or something
using MessageCookieMap =
std::map<uint256, std::set<std::optional<uint64_t>>>;
using PeerCookieMap =
std::map<std::shared_ptr<Peer>, std::set<std::optional<uint64_t>>>;
std::mutex mutable cookieLock_;
MessageCookieMap messageRequestCookies_;
friend class OverlayImpl;
class Metrics
@@ -496,6 +505,13 @@ public:
return txReduceRelayEnabled_;
}
//
// Messages
//
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override;
private:
/**
* @brief Handles a failure associated with a specific error code.
@@ -798,16 +814,28 @@ private:
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
sendToMultiple(
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
std::shared_ptr<Ledger const>
getLedger(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash);
std::shared_ptr<SHAMap const>
getTxSet(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash) const;
void
processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash);
};
//------------------------------------------------------------------------------

View File

@@ -1,8 +1,11 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/PeerSet.h>
#include <xrpl/protocol/digest.h>
namespace ripple {
class PeerSetImpl : public PeerSet
@@ -85,16 +88,52 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);
auto const messageHash = [&]() {
auto const packetBuffer =
packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();
// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 30s;
if (peer)
{
peer->send(packet);
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, peer->id(), interval))
{
JLOG(journal_.trace())
<< "Sending " << protocolMessageName(type) << " message to ["
<< peer->id() << "]: " << messageHash;
peer->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << peer->id() << "]: " << messageHash;
return;
}
for (auto id : peers_)
{
if (auto p = app_.overlay().findPeerByShortID(id))
p->send(packet);
{
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, p->id(), interval))
{
JLOG(journal_.trace())
<< "Sending " << protocolMessageName(type)
<< " message to [" << p->id() << "]: " << messageHash;
p->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate "
<< protocolMessageName(type) << " message to [" << p->id()
<< "]: " << messageHash;
}
}
}

View File

@@ -24,6 +24,12 @@ protocolMessageType(protocol::TMGetLedger const&)
return protocol::mtGET_LEDGER;
}
inline protocol::MessageType
protocolMessageType(protocol::TMLedgerData const&)
{
return protocol::mtLEDGER_DATA;
}
inline protocol::MessageType
protocolMessageType(protocol::TMReplayDeltaRequest const&)
{
@@ -467,4 +473,64 @@ invokeProtocolMessage(
} // namespace ripple
namespace protocol {
template <class Hasher>
void
hash_append(Hasher& h, TMGetLedger const& msg)
{
using beast::hash_append;
using namespace ripple;
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
hash_append(h, safe_cast<int>(msg.itype()));
if (msg.has_ltype())
hash_append(h, safe_cast<int>(msg.ltype()));
if (msg.has_ledgerhash())
hash_append(h, msg.ledgerhash());
if (msg.has_ledgerseq())
hash_append(h, msg.ledgerseq());
for (auto const& nodeId : msg.nodeids())
hash_append(h, nodeId);
hash_append(h, msg.nodeids_size());
// Do NOT include the request cookie. It does not affect the content of the
// request, but only where to route the results.
// if (msg.has_requestcookie())
// hash_append(h, msg.requestcookie());
if (msg.has_querytype())
hash_append(h, safe_cast<int>(msg.querytype()));
if (msg.has_querydepth())
hash_append(h, msg.querydepth());
}
template <class Hasher>
void
hash_append(Hasher& h, TMLedgerData const& msg)
{
using beast::hash_append;
using namespace ripple;
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
hash_append(h, msg.ledgerhash());
hash_append(h, msg.ledgerseq());
hash_append(h, safe_cast<int>(msg.type()));
for (auto const& node : msg.nodes())
{
hash_append(h, node.nodedata());
if (node.has_nodeid())
hash_append(h, node.nodeid());
}
hash_append(h, msg.nodes_size());
if (msg.has_requestcookie())
hash_append(h, msg.requestcookie());
if (msg.has_error())
hash_append(h, safe_cast<int>(msg.error()));
}
} // namespace protocol
#endif

View File

@@ -21,7 +21,9 @@ namespace ripple {
constexpr ProtocolVersion const supportedProtocolList[]
{
{2, 1},
{2, 2}
{2, 2},
// Adds TMLedgerData::responseCookies and directResponse
{2, 3}
};
// clang-format on