Reduce duplicate peer traffic for ledger data (#5126)

- Drop duplicate outgoing TMGetLedger messages per peer
  - Allow a retry after 30s in case of peer or network congestion.
  - Addresses RIPD-1870
  - (Changes levelization. That is not desirable, and will need to be fixed.)
- Drop duplicate incoming TMGetLedger messages per peer
  - Allow a retry after 15s in case of peer or network congestion.
  - The requestCookie is ignored when computing the hash, thus increasing
    the chances of detecting duplicate messages.
  - With duplicate messages, keep track of the different requestCookies
    (or lack of cookie). When work is finally done for a given request,
    send the response to all the peers that are waiting on the request,
    sending one message per peer, including all the cookies and
    a "directResponse" flag indicating the data is intended for the
    sender, too.
  - Addresses RIPD-1871
- Drop duplicate incoming TMLedgerData messages
  - Addresses RIPD-1869
- Improve logging related to ledger acquisition
- Class "CanProcess" to keep track of processing of distinct items

---------

Co-authored-by: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com>
This commit is contained in:
Ed Hennis
2025-02-14 18:51:51 -05:00
committed by GitHub
parent 7c9d652d9b
commit dd5e6559dd
27 changed files with 1017 additions and 147 deletions

View File

@@ -14,7 +14,7 @@ Loop: xrpld.app xrpld.net
xrpld.app > xrpld.net
Loop: xrpld.app xrpld.overlay
xrpld.overlay == xrpld.app
xrpld.overlay ~= xrpld.app
Loop: xrpld.app xrpld.peerfinder
xrpld.app > xrpld.peerfinder

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -631,6 +631,13 @@ to_string(base_uint<Bits, Tag> const& a)
return strHex(a.cbegin(), a.cend());
}
template <std::size_t Bits, class Tag>
inline std::string
to_short_string(base_uint<Bits, Tag> const& a)
{
return strHex(a.cbegin(), a.cend()).substr(0, 8) + "...";
}
template <std::size_t Bits, class Tag>
inline std::ostream&
operator<<(std::ostream& out, base_uint<Bits, Tag> const& u)

View File

@@ -321,8 +321,18 @@ message TMLedgerData
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}
message TMPing

View File

@@ -55,6 +55,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -242,6 +242,33 @@ class HashRouter_test : public beast::unit_test::suite
BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s));
}
void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(stopwatch, 5s);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}
public:
void
run() override
@@ -252,6 +279,7 @@ public:
testSetFlags();
testRelay();
testProcess();
testProcessPeer();
}
};

View File

@@ -322,6 +322,11 @@ public:
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
bool ledgerReplayEnabled_;
PublicKey nodePublicKey_;

View File

@@ -151,6 +151,7 @@ struct base_uint_test : beast::unit_test::suite
uset.insert(u);
BEAST_EXPECT(raw.size() == u.size());
BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C");
BEAST_EXPECT(to_short_string(u) == "01020304...");
BEAST_EXPECT(*u.data() == 1);
BEAST_EXPECT(u.signum() == 1);
BEAST_EXPECT(!!u);
@@ -173,6 +174,7 @@ struct base_uint_test : beast::unit_test::suite
test96 v{~u};
uset.insert(v);
BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3");
BEAST_EXPECT(to_short_string(v) == "FEFDFCFB...");
BEAST_EXPECT(*v.data() == 0xfe);
BEAST_EXPECT(v.signum() == 1);
BEAST_EXPECT(!!v);
@@ -193,6 +195,7 @@ struct base_uint_test : beast::unit_test::suite
test96 z{beast::zero};
uset.insert(z);
BEAST_EXPECT(to_string(z) == "000000000000000000000000");
BEAST_EXPECT(to_short_string(z) == "00000000...");
BEAST_EXPECT(*z.data() == 0);
BEAST_EXPECT(*z.begin() == 0);
BEAST_EXPECT(*std::prev(z.end(), 1) == 0);
@@ -213,6 +216,7 @@ struct base_uint_test : beast::unit_test::suite
BEAST_EXPECT(n == z);
n--;
BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF");
BEAST_EXPECT(to_short_string(n) == "FFFFFFFF...");
n = beast::zero;
BEAST_EXPECT(n == z);
@@ -223,6 +227,7 @@ struct base_uint_test : beast::unit_test::suite
test96 x{zm1 ^ zp1};
uset.insert(x);
BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x));
BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x));
BEAST_EXPECT(uset.size() == 4);

View File

@@ -87,8 +87,8 @@ public:
negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion(
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
make_protocol(2, 2));
"RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
make_protocol(2, 3));
BEAST_EXPECT(
negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") ==
std::nullopt);

View File

@@ -182,6 +182,11 @@ public:
removeTxQueue(const uint256&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};
/** Manually advanced clock. */

View File

@@ -1073,7 +1073,8 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED);
app_.getOPs().setMode(
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
}
void

View File

@@ -196,6 +196,25 @@ private:
std::unique_ptr<PeerSet> mPeerSet;
};
inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE(
"ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}
} // namespace ripple
#endif

View File

@@ -392,7 +392,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
checkLocal();
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true;
@@ -502,15 +509,17 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
std::stringstream ss;
ss << "Trigger acquiring ledger " << hash_;
if (peer)
stream << " from " << peer;
ss << " from " << peer;
if (complete_ || failed_)
stream << "complete=" << complete_ << " failed=" << failed_;
ss << " complete=" << complete_ << " failed=" << failed_;
else
stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions
<< " as=" << mHaveState;
stream << ss.str();
}
if (!mHaveHeader)

View File

@@ -23,9 +23,9 @@
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/jss.h>
@@ -77,11 +77,85 @@ public:
hash.isNonZero(),
"ripple::InboundLedgersImp::acquire::doAcquire : nonzero hash");
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger();
bool const shouldAcquire = [&]() {
if (!needNetworkLedger)
return true;
if (reason == InboundLedger::Reason::GENERIC)
return true;
if (reason == InboundLedger::Reason::CONSENSUS)
return true;
return false;
}();
std::stringstream ss;
ss << "InboundLedger::acquire: "
<< "Request: " << to_string(hash) << ", " << seq
<< " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no")
<< " Reason: " << to_string(reason)
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
/* Acquiring ledgers is somewhat expensive. It requires lots of
* computation and network communication. Avoid it when it's not
* appropriate. Every validation from a peer for a ledger that
* we do not have locally results in a call to this function: even
* if we are moments away from validating the same ledger.
*/
bool const shouldBroadcast = [&]() {
// If the node is not in "full" state, it needs to sync to
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
LedgerIndex const validSeq =
app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture =
(seq > validSeq) && (seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
// will be false other than on a brand new network.)
bool const consensus =
reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false")
<< ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq
<< ". Lag leeway: " << lagLeeway
<< ". request for near future ledger: "
<< (nearFuture ? "true" : "false")
<< ". Consensus: " << (consensus ? "true" : "false");
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
return false;
// If the request is because of consensus, do NOT send requests.
// This node is probably about to build it.
if (consensus)
return false;
return true;
}();
ss << ". Would broadcast to peers? "
<< (shouldBroadcast ? "true." : "false.");
if (!shouldAcquire)
{
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
return {};
}
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
@@ -89,6 +163,7 @@ public:
ScopedLockType sl(mLock);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -112,23 +187,29 @@ public:
++mCounter;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
return perf::measureDurationAndLog(
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
}
void
@@ -137,28 +218,25 @@ public:
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock unlock(lock);
acquire(hash, seq, reason);
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger "
<< hash << ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new inbound ledger "
<< hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -973,8 +973,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
}
JLOG(m_journal.info()) << "Advancing accepted ledger to "
<< ledger->info().seq << " with >= " << minVal
<< " validations";
<< ledger->info().seq << " ("
<< to_short_string(ledger->info().hash)
<< ") with >= " << minVal << " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -33,7 +33,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, journal_(journal)
, sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash)
, timeouts_(0)
, complete_(false)
@@ -53,6 +54,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
<< "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) {
@@ -61,6 +64,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec << " (operation_aborted: "
<< boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted"
: "other")
<< ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -24,6 +24,8 @@
#include <xrpld/core/Job.h>
#include <xrpl/beast/clock/abstract_clock.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <boost/asio/basic_waitable_timer.hpp>
#include <mutex>
@@ -121,6 +123,7 @@ protected:
// Used in this class for access to boost::asio::io_service and
// ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -90,6 +90,20 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), tx_interval);
}
bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);
auto& entry = emplace(key).first;
return entry.shouldProcessForPeer(
peer, suppressionMap_.clock().now(), interval);
}
int
HashRouter::getFlags(uint256 const& key)
{
@@ -128,4 +142,13 @@ HashRouter::shouldRelay(uint256 const& key)
return s.releasePeerSet();
}
auto
HashRouter::getPeers(uint256 const& key) -> std::set<PeerShortID>
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
return s.peekPeerSet();
}
} // namespace ripple

View File

@@ -92,6 +92,13 @@ private:
return std::move(peers_);
}
/** Return set of peers waiting for reply. Leaves list unchanged. */
std::set<PeerShortID> const&
peekPeerSet()
{
return peers_;
}
/** Return seated relay time point if the message has been relayed */
std::optional<Stopwatch::time_point>
relayed() const
@@ -125,6 +132,21 @@ private:
return true;
}
bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) &&
((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}
private:
int flags_ = 0;
std::set<PeerShortID> peers_;
@@ -132,6 +154,7 @@ private:
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};
public:
@@ -163,7 +186,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the peer is added.
* element 1: true if the key is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
@@ -180,6 +203,18 @@ public:
int& flags,
std::chrono::seconds tx_interval);
/** Determines whether the hashed item should be processed for the given
peer. Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval);
/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
@@ -205,6 +240,11 @@ public:
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);
/** Returns a copy of the set of peers in the Entry for the key
*/
std::set<PeerShortID>
getPeers(uint256 const& key);
private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool>

View File

@@ -50,10 +50,10 @@
#include <xrpld/rpc/DeliveredAmount.h>
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/rfc2616.h>
#include <xrpl/beast/utility/rngfill.h>
#include <xrpl/crypto/RFC1751.h>
@@ -403,7 +403,7 @@ public:
isFull() override;
void
setMode(OperatingMode om) override;
setMode(OperatingMode om, const char* reason) override;
bool
isBlocked() override;
@@ -874,7 +874,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "setStandAlone");
}
inline void
@@ -1022,7 +1022,9 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mMode != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED);
setMode(
OperatingMode::DISCONNECTED,
"Heartbeat: insufficient peers");
JLOG(m_journal.warn())
<< "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1038,7 +1040,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient.";
}
@@ -1046,9 +1048,9 @@ NetworkOPsImp::processHeartbeatTimer()
// Check if the last validated ledger forces a change between these
// states.
if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING);
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
}
mConsensus.timerEntry(app_.timeKeeper().closeTime());
@@ -1614,7 +1616,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
}
inline bool
@@ -1645,7 +1647,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
}
inline void
@@ -1746,7 +1748,7 @@ NetworkOPsImp::checkLastClosedLedger(
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
}
if (consensus)
@@ -1833,8 +1835,9 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed)
// this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL)
{
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
JLOG(m_journal.warn())
<< "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
}
return false;
@@ -1944,7 +1947,7 @@ NetworkOPsImp::endConsensus()
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING);
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
}
if (((mMode == OperatingMode::CONNECTED) ||
@@ -1958,7 +1961,7 @@ NetworkOPsImp::endConsensus()
if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2 * current->info().closeTimeResolution))
{
setMode(OperatingMode::FULL);
setMode(OperatingMode::FULL, "endConsensus: check full");
}
}
@@ -1970,7 +1973,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED);
setMode(OperatingMode::CONNECTED, "consensusViewChange");
}
}
@@ -2288,7 +2291,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om)
NetworkOPsImp::setMode(OperatingMode om, const char* reason)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2308,11 +2311,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om)
return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om;
accounting_.mode(om);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode();
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer();
}
@@ -2324,34 +2328,28 @@ NetworkOPsImp::recvValidation(
JLOG(m_journal.trace())
<< "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
scope_unlock unlock(lock);
handleNewValidation(app_, val, source, bypassAccept, m_journal);
CanProcess const check(
validationsMutex_, pendingValidations_, val->getLedgerHash());
try
{
BypassAccept bypassAccept =
check ? BypassAccept::no : BypassAccept::yes;
handleNewValidation(app_, val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);

View File

@@ -197,7 +197,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om) = 0;
setMode(OperatingMode om, const char* reason) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -36,6 +36,7 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
LedgerDataCookies
};
/** Represents a peer connection in the overlay. */
@@ -133,6 +134,13 @@ public:
virtual bool
txReduceRelayEnabled() const = 0;
//
// Messages
//
virtual std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) = 0;
};
} // namespace ripple

View File

@@ -30,6 +30,7 @@
#include <xrpld/app/tx/apply.h>
#include <xrpld/overlay/Cluster.h>
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/ProtocolMessage.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/UptimeClock.h>
@@ -57,6 +58,9 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
/** How often we PING the peer to check for latency and sendq probe */
std::chrono::seconds constexpr peerTimerInterval{60};
/** How often we process duplicate incoming TMGetLedger messages */
std::chrono::seconds constexpr getledgerInterval{15};
} // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix
@@ -504,6 +508,8 @@ PeerImp::supportsFeature(ProtocolFeature f) const
return protocol_ >= make_protocol(2, 2);
case ProtocolFeature::LedgerReplay:
return ledgerReplayEnabled_;
case ProtocolFeature::LedgerDataCookies:
return protocol_ >= make_protocol(2, 3);
}
return false;
}
@@ -1346,8 +1352,9 @@ PeerImp::handleTransaction(
void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
auto badData = [&](std::string const& msg, bool chargefee = true) {
if (chargefee)
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
@@ -1424,12 +1431,74 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Drop duplicate requests from the same peer for at least
// `getLedgerInterval` seconds.
// Append a little junk to prevent the hash of an incoming messsage
// from matching the hash of the same outgoing message.
// `shouldProcessForPeer` does not distingish between incoming and
// outgoing, and some of the message relay logic checks the hash to see
// if the message has been relayed already. If the hashes are the same,
// a duplicate will be detected when sending the message is attempted,
// so it will fail.
auto const messageHash = sha512Half(*m, nullptr);
// Request cookies are not included in the hash. Track them here.
auto const requestCookie = [&m]() -> std::optional<uint64_t> {
if (m->has_requestcookie())
return m->requestcookie();
return std::nullopt;
}();
auto const [inserted, pending] = [&] {
std::lock_guard lock{cookieLock_};
auto& cookies = messageRequestCookies_[messageHash];
bool const pending = !cookies.empty();
return std::pair{cookies.emplace(requestCookie).second, pending};
}();
// Check if the request has been seen from this peer.
if (!app_.getHashRouter().shouldProcessForPeer(
messageHash, id_, getledgerInterval))
{
// This request has already been seen from this peer.
// Has it been seen with this request cookie (or lack thereof)?
if (inserted)
{
// This is a duplicate request, but with a new cookie. When a
// response is ready, one will be sent for each request cookie.
JLOG(p_journal_.debug())
<< "TMGetLedger: duplicate request with new request cookie: "
<< requestCookie.value_or(0)
<< ". Job pending: " << (pending ? "yes" : "no") << ": "
<< messageHash;
if (pending)
{
// Don't bother queueing up a new job if other requests are
// already pending. This should limit entries in the job queue
// to one per peer per unique request.
JLOG(p_journal_.debug())
<< "TMGetLedger: Suppressing recvGetLedger job, since one "
"is pending: "
<< messageHash;
return;
}
}
else
{
// Don't punish nodes that don't know any better
return badData(
"duplicate request: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
}
// Queue a job to process the request
JLOG(p_journal_.debug())
<< "TMGetLedger: Adding recvGetLedger job: " << messageHash;
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
});
app_.getJobQueue().addJob(
jtLEDGER_REQ, "recvGetLedger", [weak, m, messageHash]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m, messageHash);
});
}
void
@@ -1545,8 +1614,9 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{
auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg);
auto badData = [&](std::string const& msg, bool charge = true) {
if (charge)
fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
};
@@ -1597,23 +1667,99 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
"Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
}
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
auto const messageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_))
{
if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(messageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
bool const routed = m->has_directresponse() || m->responsecookies_size() ||
m->has_requestcookie();
{
// Check if this message needs to be forwarded to one or more peers.
// Maximum of one of the relevant fields should be populated.
XRPL_ASSERT(
!m->has_requestcookie() || !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) : valid cookie fields");
// Make a copy of the response cookies, then wipe the list so it can be
// forwarded cleanly
auto const responseCookies = m->responsecookies();
m->clear_responsecookies();
// Flag indicating if this response should be processed locally,
// possibly in addition to being forwarded.
bool const directResponse =
m->has_directresponse() && m->directresponse();
m->clear_directresponse();
auto const relay = [this, m, &messageHash](auto const cookie) {
if (auto peer = overlay_.findPeerByShortID(cookie))
{
XRPL_ASSERT(
!m->has_requestcookie() && !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) relay : no "
"cookies");
if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies))
// Setting this flag is not _strictly_ necessary for peers
// that support it if there are no cookies included in the
// message, but it is more accurate.
m->set_directresponse(true);
else
m->clear_directresponse();
peer->send(
std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
}
else
JLOG(p_journal_.info())
<< "Unable to route TX/ledger data reply to peer ["
<< cookie << "]: " << messageHash;
};
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie())
{
XRPL_ASSERT(
responseCookies.empty(),
"ripple::PeerImp::onMessage(TMLedgerData) : no response "
"cookies");
m->clear_requestcookie();
peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
relay(m->requestcookie());
if (!directResponse && responseCookies.empty())
return;
}
else
// If there's a list of request cookies, attempt to relay the message to
// all of them.
if (responseCookies.size())
{
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
for (auto const cookie : responseCookies)
relay(cookie);
if (!directResponse)
return;
}
}
// Now that any forwarding is done check the base message (data only, no
// routing info for duplicates)
if (routed)
{
m->clear_directresponse();
XRPL_ASSERT(
!m->has_requestcookie() && !m->responsecookies_size(),
"ripple::PeerImp::onMessage(TMLedgerData) : no cookies");
auto const baseMessageHash = sha512Half(*m);
if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_))
{
// Don't punish nodes that don't know any better
return badData(
"Duplicate message: " + to_string(baseMessageHash),
supportsFeature(ProtocolFeature::LedgerDataCookies));
}
return;
}
uint256 const ledgerHash{m->ledgerhash()};
// Otherwise check if received data for a candidate transaction set
if (m->type() == protocol::liTS_CANDIDATE)
{
@@ -2997,16 +3143,22 @@ PeerImp::checkValidation(
// the TX tree with the specified root hash.
//
static std::shared_ptr<PeerImp>
getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip)
getPeerWithTree(
OverlayImpl& ov,
uint256 const& rootHash,
PeerImp const* skip,
std::function<bool(Peer::id_t)> shouldProcessCallback)
{
std::shared_ptr<PeerImp> ret;
int retScore = 0;
XRPL_ASSERT(
shouldProcessCallback, "ripple::getPeerWithTree : callback provided");
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
if (p->hasTxSet(rootHash) && p.get() != skip)
{
auto score = p->getScore(true);
if (!ret || (score > retScore))
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
{
ret = std::move(p);
retScore = score;
@@ -3025,16 +3177,19 @@ getPeerWithLedger(
OverlayImpl& ov,
uint256 const& ledgerHash,
LedgerIndex ledger,
PeerImp const* skip)
PeerImp const* skip,
std::function<bool(Peer::id_t)> shouldProcessCallback)
{
std::shared_ptr<PeerImp> ret;
int retScore = 0;
XRPL_ASSERT(
shouldProcessCallback, "ripple::getPeerWithLedger : callback provided");
ov.for_each([&](std::shared_ptr<PeerImp>&& p) {
if (p->hasLedger(ledgerHash, ledger) && p.get() != skip)
{
auto score = p->getScore(true);
if (!ret || (score > retScore))
if (!ret || (score > retScore && shouldProcessCallback(p->id())))
{
ret = std::move(p);
retScore = score;
@@ -3048,7 +3203,8 @@ getPeerWithLedger(
void
PeerImp::sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData)
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations)
{
JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
@@ -3080,15 +3236,102 @@ PeerImp::sendLedgerBase(
}
}
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
send(message);
sendToMultiple(ledgerData, destinations);
}
void
PeerImp::sendToMultiple(
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations)
{
bool foundSelf = false;
for (auto const& [peer, cookies] : destinations)
{
if (peer.get() == this)
foundSelf = true;
bool const multipleCookies =
peer->supportsFeature(ProtocolFeature::LedgerDataCookies);
std::vector<std::uint64_t> sendCookies;
bool directResponse = false;
if (!multipleCookies)
{
JLOG(p_journal_.debug())
<< "sendToMultiple: Sending " << cookies.size()
<< " TMLedgerData messages to peer [" << peer->id()
<< "]: " << sha512Half(ledgerData);
}
for (auto const& cookie : cookies)
{
// Unfortunately, need a separate Message object for every
// combination
if (cookie)
{
if (multipleCookies)
{
// Save this one for later to send a single message
sendCookies.emplace_back(*cookie);
continue;
}
// Feature not supported, so send a single message with a
// single cookie
ledgerData.set_requestcookie(*cookie);
}
else
{
if (multipleCookies)
{
// Set this flag later on the single message
directResponse = true;
continue;
}
ledgerData.clear_requestcookie();
}
XRPL_ASSERT(
!multipleCookies,
"ripple::PeerImp::sendToMultiple : ledger data cookies "
"unsupported");
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);
}
if (multipleCookies)
{
// Send a single message with all the cookies and/or the direct
// response flag, so the receiver can farm out the single message to
// multiple peers and/or itself
XRPL_ASSERT(
sendCookies.size() || directResponse,
"ripple::PeerImp::sendToMultiple : valid response options");
ledgerData.clear_requestcookie();
ledgerData.clear_responsecookies();
ledgerData.set_directresponse(directResponse);
for (auto const& cookie : sendCookies)
ledgerData.add_responsecookies(cookie);
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
peer->send(message);
JLOG(p_journal_.debug())
<< "sendToMultiple: Sent 1 TMLedgerData message to peer ["
<< peer->id() << "]: including "
<< (directResponse ? "the direct response flag and " : "")
<< sendCookies.size() << " response cookies. "
<< ": " << sha512Half(ledgerData);
}
}
XRPL_ASSERT(
foundSelf, "ripple::PeerImp::sendToMultiple : current peer included");
}
std::shared_ptr<Ledger const>
PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::getLedger(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash)
{
JLOG(p_journal_.trace()) << "getLedger: Ledger";
JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash;
std::shared_ptr<Ledger const> ledger;
@@ -3105,22 +3348,33 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
if (m->has_querytype() && !m->has_requestcookie())
{
// Attempt to relay the request to a peer
// Note repeated messages will not relay to the same peer
// before `getLedgerInterval` seconds. This prevents one
// peer from getting flooded, and distributes the request
// load. If a request has been relayed to all eligible
// peers, then this message will not be relayed.
if (auto const peer = getPeerWithLedger(
overlay_,
ledgerHash,
m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
this,
[&](Peer::id_t id) {
return app_.getHashRouter().shouldProcessForPeer(
mHash, id, getledgerInterval);
}))
{
m->set_requestcookie(id());
peer->send(
std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug())
<< "getLedger: Request relayed to peer";
<< "getLedger: Request relayed to peer [" << peer->id()
<< "]: " << mHash;
return ledger;
}
JLOG(p_journal_.trace())
<< "getLedger: Failed to find peer to relay request";
<< "getLedger: Don't have ledger with hash " << ledgerHash
<< ": " << mHash;
}
}
}
@@ -3130,7 +3384,7 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
{
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request";
<< "getLedger: Early ledger sequence request " << mHash;
}
else
{
@@ -3139,7 +3393,7 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
{
JLOG(p_journal_.debug())
<< "getLedger: Don't have ledger with sequence "
<< m->ledgerseq();
<< m->ledgerseq() << ": " << mHash;
}
}
}
@@ -3162,29 +3416,33 @@ PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
Resource::feeMalformedRequest, "get_ledger ledgerSeq");
ledger.reset();
JLOG(p_journal_.warn())
<< "getLedger: Invalid ledger sequence " << ledgerSeq;
JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence "
<< ledgerSeq << ": " << mHash;
}
}
else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
{
ledger.reset();
JLOG(p_journal_.debug())
<< "getLedger: Early ledger sequence request " << ledgerSeq;
<< "getLedger: Early ledger sequence request " << ledgerSeq
<< ": " << mHash;
}
}
else
{
JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger";
JLOG(p_journal_.debug())
<< "getLedger: Unable to find ledger " << mHash;
}
return ledger;
}
std::shared_ptr<SHAMap const>
PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
PeerImp::getTxSet(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash) const
{
JLOG(p_journal_.trace()) << "getTxSet: TX set";
JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash;
uint256 const txSetHash{m->ledgerhash()};
std::shared_ptr<SHAMap> shaMap{
@@ -3194,22 +3452,34 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
if (m->has_querytype() && !m->has_requestcookie())
{
// Attempt to relay the request to a peer
if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
// Note repeated messages will not relay to the same peer
// before `getLedgerInterval` seconds. This prevents one
// peer from getting flooded, and distributes the request
// load. If a request has been relayed to all eligible
// peers, then this message will not be relayed.
if (auto const peer = getPeerWithTree(
overlay_, txSetHash, this, [&](Peer::id_t id) {
return app_.getHashRouter().shouldProcessForPeer(
mHash, id, getledgerInterval);
}))
{
m->set_requestcookie(id());
peer->send(
std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
JLOG(p_journal_.debug())
<< "getTxSet: Request relayed to peer [" << peer->id()
<< "]: " << mHash;
}
else
{
JLOG(p_journal_.debug())
<< "getTxSet: Failed to find relay peer";
<< "getTxSet: Failed to find relay peer: " << mHash;
}
}
else
{
JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
JLOG(p_journal_.debug())
<< "getTxSet: Failed to find TX set " << mHash;
}
}
@@ -3217,7 +3487,9 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
}
void
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
@@ -3231,9 +3503,74 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
bool fatLeaves{true};
auto const itype{m->itype()};
auto getDestinations = [&] {
// If a ledger data message is generated, it's going to be sent to every
// peer that is waiting for it.
PeerCookieMap result;
std::size_t numCookies = 0;
{
// Don't do the work under this peer if this peer is not waiting for
// any replies
auto myCookies = releaseRequestCookies(mHash);
if (myCookies.empty())
{
JLOG(p_journal_.debug()) << "TMGetLedger: peer is no longer "
"waiting for response to request: "
<< mHash;
return result;
}
numCookies += myCookies.size();
result[shared_from_this()] = myCookies;
}
std::set<HashRouter::PeerShortID> const peers =
app_.getHashRouter().getPeers(mHash);
for (auto const peerID : peers)
{
// This loop does not need to be done under the HashRouter
// lock because findPeerByShortID and releaseRequestCookies
// are thread safe, and everything else is local
if (auto p = overlay_.findPeerByShortID(peerID))
{
auto cookies = p->releaseRequestCookies(mHash);
numCookies += cookies.size();
if (result.contains(p))
{
// Unlikely, but if a request came in to this peer while
// iterating, add the items instead of copying /
// overwriting.
XRPL_ASSERT(
p.get() == this,
"ripple::PeerImp::processLedgerRequest : found self in "
"map");
for (auto const& cookie : cookies)
result[p].emplace(cookie);
}
else if (cookies.size())
result[p] = cookies;
}
}
JLOG(p_journal_.debug())
<< "TMGetLedger: Processing request for " << result.size()
<< " peers. Will send " << numCookies
<< " messages if successful: " << mHash;
return result;
};
// Will only populate this if we're going to do work.
PeerCookieMap destinations;
if (itype == protocol::liTS_CANDIDATE)
{
if (sharedMap = getTxSet(m); !sharedMap)
destinations = getDestinations();
if (destinations.empty())
// Nowhere to send the response!
return;
if (sharedMap = getTxSet(m, mHash); !sharedMap)
return;
map = sharedMap.get();
@@ -3241,8 +3578,6 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerseq(0);
ledgerData.set_ledgerhash(m->ledgerhash());
ledgerData.set_type(protocol::liTS_CANDIDATE);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
// We'll already have most transactions
fatLeaves = false;
@@ -3261,7 +3596,12 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
return;
}
if (ledger = getLedger(m); !ledger)
destinations = getDestinations();
if (destinations.empty())
// Nowhere to send the response!
return;
if (ledger = getLedger(m, mHash); !ledger)
return;
// Fill out the reply
@@ -3269,13 +3609,11 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
ledgerData.set_ledgerseq(ledger->info().seq);
ledgerData.set_type(itype);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
switch (itype)
{
case protocol::liBASE:
sendLedgerBase(ledger, ledgerData);
sendLedgerBase(ledger, ledgerData, destinations);
return;
case protocol::liTX_NODE:
@@ -3392,7 +3730,7 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
if (ledgerData.nodes_size() == 0)
return;
send(std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA));
sendToMultiple(ledgerData, destinations);
}
int
@@ -3450,6 +3788,19 @@ PeerImp::reduceRelayReady()
return vpReduceRelayEnabled_ && reduceRelayReady_;
}
std::set<std::optional<uint64_t>>
PeerImp::releaseRequestCookies(uint256 const& requestHash)
{
std::set<std::optional<uint64_t>> result;
std::lock_guard lock(cookieLock_);
if (messageRequestCookies_.contains(requestHash))
{
std::swap(result, messageRequestCookies_[requestHash]);
messageRequestCookies_.erase(requestHash);
}
return result;
};
void
PeerImp::Metrics::add_message(std::uint64_t bytes)
{

View File

@@ -195,6 +195,15 @@ private:
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
// Track message requests and responses
// TODO: Use an expiring cache or something
using MessageCookieMap =
std::map<uint256, std::set<std::optional<uint64_t>>>;
using PeerCookieMap =
std::map<std::shared_ptr<Peer>, std::set<std::optional<uint64_t>>>;
std::mutex mutable cookieLock_;
MessageCookieMap messageRequestCookies_;
friend class OverlayImpl;
class Metrics
@@ -441,6 +450,13 @@ public:
return txReduceRelayEnabled_;
}
//
// Messages
//
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override;
private:
void
close();
@@ -639,16 +655,28 @@ private:
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
sendToMultiple(
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
std::shared_ptr<Ledger const>
getLedger(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash);
std::shared_ptr<SHAMap const>
getTxSet(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash) const;
void
processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
uint256 const& mHash);
};
//------------------------------------------------------------------------------

View File

@@ -18,9 +18,11 @@
//==============================================================================
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/HashRouter.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/overlay/Overlay.h>
#include <xrpld/overlay/PeerSet.h>
#include <xrpl/protocol/digest.h>
namespace ripple {
@@ -104,16 +106,52 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);
auto const messageHash = [&]() {
auto const packetBuffer =
packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();
// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 30s;
if (peer)
{
peer->send(packet);
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, peer->id(), interval))
{
JLOG(journal_.trace())
<< "Sending " << protocolMessageName(type) << " message to ["
<< peer->id() << "]: " << messageHash;
peer->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << peer->id() << "]: " << messageHash;
return;
}
for (auto id : peers_)
{
if (auto p = app_.overlay().findPeerByShortID(id))
p->send(packet);
{
if (app_.getHashRouter().shouldProcessForPeer(
messageHash, p->id(), interval))
{
JLOG(journal_.trace())
<< "Sending " << protocolMessageName(type)
<< " message to [" << p->id() << "]: " << messageHash;
p->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate "
<< protocolMessageName(type) << " message to [" << p->id()
<< "]: " << messageHash;
}
}
}

View File

@@ -43,6 +43,12 @@ protocolMessageType(protocol::TMGetLedger const&)
return protocol::mtGET_LEDGER;
}
inline protocol::MessageType
protocolMessageType(protocol::TMLedgerData const&)
{
return protocol::mtLEDGER_DATA;
}
inline protocol::MessageType
protocolMessageType(protocol::TMReplayDeltaRequest const&)
{
@@ -486,4 +492,64 @@ invokeProtocolMessage(
} // namespace ripple
namespace protocol {
template <class Hasher>
void
hash_append(Hasher& h, TMGetLedger const& msg)
{
using beast::hash_append;
using namespace ripple;
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
hash_append(h, safe_cast<int>(msg.itype()));
if (msg.has_ltype())
hash_append(h, safe_cast<int>(msg.ltype()));
if (msg.has_ledgerhash())
hash_append(h, msg.ledgerhash());
if (msg.has_ledgerseq())
hash_append(h, msg.ledgerseq());
for (auto const& nodeId : msg.nodeids())
hash_append(h, nodeId);
hash_append(h, msg.nodeids_size());
// Do NOT include the request cookie. It does not affect the content of the
// request, but only where to route the results.
// if (msg.has_requestcookie())
// hash_append(h, msg.requestcookie());
if (msg.has_querytype())
hash_append(h, safe_cast<int>(msg.querytype()));
if (msg.has_querydepth())
hash_append(h, msg.querydepth());
}
template <class Hasher>
void
hash_append(Hasher& h, TMLedgerData const& msg)
{
using beast::hash_append;
using namespace ripple;
hash_append(h, safe_cast<int>(protocolMessageType(msg)));
hash_append(h, msg.ledgerhash());
hash_append(h, msg.ledgerseq());
hash_append(h, safe_cast<int>(msg.type()));
for (auto const& node : msg.nodes())
{
hash_append(h, node.nodedata());
if (node.has_nodeid())
hash_append(h, node.nodeid());
}
hash_append(h, msg.nodes_size());
if (msg.has_requestcookie())
hash_append(h, msg.requestcookie());
if (msg.has_error())
hash_append(h, safe_cast<int>(msg.error()));
}
} // namespace protocol
#endif

View File

@@ -37,7 +37,9 @@ namespace ripple {
constexpr ProtocolVersion const supportedProtocolList[]
{
{2, 1},
{2, 2}
{2, 2},
// Adds TMLedgerData::responseCookies and directResponse
{2, 3}
};
// clang-format on