Compare commits

..

6 Commits

Author SHA1 Message Date
Mayukha Vadari
72c3537d8c what did i ever do to you clang-tidy 2026-06-23 00:23:21 -04:00
Mayukha Vadari
fd82f68602 Merge branch 'develop' of https://github.com/XRPLF/rippled into mvadari/pd-tests 2026-06-23 00:17:03 -04:00
Bart
ff02269c0d refactor: Use dispatch instead of post (#7438)
Co-authored-by: Bart <11445373+bthomee@users.noreply.github.com>
2026-06-22 22:35:28 +00:00
Mayukha Vadari
cd9407e310 fix clang-tidy issues 2026-06-22 13:25:15 -04:00
Mayukha Vadari
71cf8bb589 Potential fix for pull request finding
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
2026-06-22 12:55:33 -04:00
Mayukha Vadari
840100b541 test: Add test for Permissioned Domain sequence fix 2026-06-22 12:29:13 -04:00
27 changed files with 341 additions and 1121 deletions

View File

@@ -1,134 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -44,7 +44,7 @@ template <class T>
inline void
maybeReverseBytes(T& t, std::true_type)
{
reverseBytes(t);
reverse_bytes(t);
}
template <class T, class Hasher>

View File

@@ -137,13 +137,6 @@ private:
return std::move(peers_);
}
/** Return set of peers waiting for reply. Leaves list unchanged. */
std::set<PeerShortID> const&
peekPeerSet()
{
return peers_;
}
/** Return seated relay time point if the message has been relayed */
[[nodiscard]] std::optional<Stopwatch::time_point>
relayed() const
@@ -175,20 +168,6 @@ private:
return true;
}
bool
shouldProcessForPeer(
PeerShortID peer,
Stopwatch::time_point now,
std::chrono::seconds interval)
{
if (peerProcessed_.contains(peer) && ((peerProcessed_[peer] + interval) > now))
return false;
// Peer may already be in the list, but adding it again doesn't hurt
addPeer(peer);
peerProcessed_[peer] = now;
return true;
}
private:
HashRouterFlags flags_ = HashRouterFlags::UNDEFINED;
std::set<PeerShortID> peers_;
@@ -196,7 +175,6 @@ private:
// than one flag needs to expire independently.
std::optional<Stopwatch::time_point> relayed_;
std::optional<Stopwatch::time_point> processed_;
std::map<PeerShortID, Stopwatch::time_point> peerProcessed_;
};
public:
@@ -219,7 +197,7 @@ public:
/** Add a suppression peer and get message's relay status.
* Return pair:
* element 1: true if the key is added.
* element 1: true if the peer is added.
* element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>>
@@ -236,15 +214,6 @@ public:
HashRouterFlags& flags,
std::chrono::seconds txInterval);
/** Determines whether the hashed item should be processed for the given
peer. Could be an incoming or outgoing message.
Items filtered with this function should only be processed for the given
peer once. Unlike shouldProcess, it can be processed for other peers.
*/
bool
shouldProcessForPeer(uint256 const& key, PeerShortID peer, std::chrono::seconds interval);
/** Set the flags on a hash.
@return `true` if the flags were changed. `false` if unchanged.
@@ -270,11 +239,6 @@ public:
std::optional<std::set<PeerShortID>>
shouldRelay(uint256 const& key);
/** Returns a copy of the set of peers in the Entry for the key
*/
std::set<PeerShortID>
getPeers(uint256 const& key);
private:
// pair.second indicates whether the entry was created
std::pair<Entry&, bool>

View File

@@ -288,18 +288,8 @@ message TMLedgerData {
required uint32 ledgerSeq = 2;
required TMLedgerInfoType type = 3;
repeated TMLedgerNode nodes = 4;
// If the peer supports "responseCookies", this field will
// never be populated.
optional uint32 requestCookie = 5;
optional TMReplyError error = 6;
// The old field is called "requestCookie", but this is
// a response, so this name makes more sense
repeated uint32 responseCookies = 7;
// If a TMGetLedger request was received without a "requestCookie",
// and the peer supports it, this flag will be set to true to
// indicate that the receiver should process the result in addition
// to forwarding it to its "responseCookies" peers.
optional bool directResponse = 8;
}
message TMPing {

View File

@@ -35,8 +35,6 @@ struct LedgerHeader
// If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable
bool mutable validated = false;
bool accepted = false;

View File

@@ -185,7 +185,7 @@ public:
virtual bool
isFull() = 0;
virtual void
setMode(OperatingMode om, char const* reason) = 0;
setMode(OperatingMode om) = 0;
virtual bool
isBlocked() = 0;
virtual bool

View File

@@ -82,19 +82,6 @@ HashRouter::shouldProcess(
return s.shouldProcess(suppressionMap_.clock().now(), txInterval);
}
bool
HashRouter::shouldProcessForPeer(
uint256 const& key,
PeerShortID peer,
std::chrono::seconds interval)
{
std::lock_guard lock(mutex_);
auto& entry = emplace(key).first;
return entry.shouldProcessForPeer(peer, suppressionMap_.clock().now(), interval);
}
HashRouterFlags
HashRouter::getFlags(uint256 const& key)
{
@@ -132,13 +119,4 @@ HashRouter::shouldRelay(uint256 const& key) -> std::optional<std::set<PeerShortI
return s.releasePeerSet();
}
auto
HashRouter::getPeers(uint256 const& key) -> std::set<PeerShortID>
{
std::lock_guard lock(mutex_);
auto& s = emplace(key).first;
return s.peekPeerSet();
}
} // namespace xrpl

View File

@@ -395,33 +395,6 @@ class HashRouter_test : public beast::unit_test::Suite
BEAST_EXPECT(!any(HF::UNDEFINED));
}
void
testProcessPeer()
{
using namespace std::chrono_literals;
TestStopwatch stopwatch;
HashRouter router(getSetup(5s, 5s), stopwatch);
uint256 const key(1);
HashRouter::PeerShortID peer1 = 1;
HashRouter::PeerShortID peer2 = 2;
auto const timeout = 2s;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
++stopwatch;
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout));
++stopwatch;
BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout));
BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout));
}
public:
void
run() override
@@ -434,7 +407,6 @@ public:
testProcess();
testSetup();
testFlagsOps();
testProcessPeer();
}
};

View File

@@ -339,11 +339,6 @@ public:
{
return false;
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
[[nodiscard]] std::string const&
fingerprint() const override

View File

@@ -8,18 +8,21 @@
#include <test/jtx/pay.h>
#include <test/jtx/permissioned_domains.h>
#include <test/jtx/ter.h>
#include <test/jtx/ticket.h>
#include <test/jtx/txflags.h>
#include <xrpl/basics/base_uint.h>
#include <xrpl/beast/unit_test/suite.h>
#include <xrpl/json/json_value.h>
#include <xrpl/protocol/Feature.h>
#include <xrpl/protocol/Indexes.h>
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/TER.h>
#include <xrpl/protocol/TxFlags.h>
#include <xrpl/protocol/jss.h>
#include <cstddef>
#include <cstdint>
#include <exception>
#include <map>
#include <optional>
@@ -526,6 +529,47 @@ class PermissionedDomains_test : public beast::unit_test::Suite
BEAST_EXPECT(env.ownerCount(alice) == 1);
}
void
testTicket(FeatureBitset features)
{
testcase("Tickets");
using namespace test::jtx;
Env env(*this, features);
Account const alice("alice");
env.fund(XRP(1000), alice);
pdomain::Credentials const credentials{
{.issuer = alice, .credType = "credential1"},
};
std::uint32_t seq{env.seq(alice)};
env(ticket::create(alice, 2));
{
env(pdomain::setTx(alice, credentials), ticket::Use(++seq));
auto domain = pdomain::getNewDomain(env.meta());
if (features[fixCleanup3_1_3])
{
BEAST_EXPECT(domain == keylet::permissionedDomain(alice.id(), seq).key);
}
else
{
BEAST_EXPECT(domain == keylet::permissionedDomain(alice.id(), 0).key);
}
}
if (features[fixCleanup3_1_3])
{
env(pdomain::setTx(alice, credentials), ticket::Use(++seq));
}
else
{
env(pdomain::setTx(alice, credentials), ticket::Use(++seq), Ter(tefEXCEPTION));
}
}
public:
void
run() override
@@ -540,6 +584,8 @@ public:
testDelete(withFix_);
testAccountReserve(withFeature_);
testAccountReserve(withFix_);
testTicket(withFeature_);
testTicket(withFix_);
}
};

View File

@@ -63,8 +63,8 @@ public:
negotiateProtocolVersion("RTXP/1.2, XRPL/2.0, XRPL/2.1") == makeProtocol(2, 1));
BEAST_EXPECT(negotiateProtocolVersion("XRPL/2.2") == makeProtocol(2, 2));
BEAST_EXPECT(
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") ==
makeProtocol(2, 3));
negotiateProtocolVersion("RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") ==
makeProtocol(2, 2));
BEAST_EXPECT(negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt);
BEAST_EXPECT(negotiateProtocolVersion("") == std::nullopt);
}

View File

@@ -190,11 +190,6 @@ public:
removeTxQueue(uint256 const&) override
{
}
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override
{
return {};
}
};
/** Manually advanced clock. */

View File

@@ -1052,7 +1052,7 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{
if ((positions == 0u) && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED, "updateOperatingMode: no positions");
app_.getOPs().setMode(OperatingMode::CONNECTED);
}
void

View File

@@ -172,22 +172,4 @@ private:
std::unique_ptr<PeerSet> peerSet_;
};
inline std::string
to_string(InboundLedger::Reason reason)
{
using enum InboundLedger::Reason;
switch (reason)
{
case HISTORY:
return "HISTORY";
case GENERIC:
return "GENERIC";
case CONSENSUS:
return "CONSENSUS";
default:
UNREACHABLE("ripple::to_string(InboundLedger::Reason) : unknown value");
return "unknown";
}
}
} // namespace xrpl

View File

@@ -367,14 +367,7 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress)
{
if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
checkLocal();
byHash_ = true;

View File

@@ -6,7 +6,6 @@
#include <xrpld/overlay/PeerSet.h>
#include <xrpl/basics/Blob.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/Slice.h>
@@ -79,78 +78,10 @@ public:
XRPL_ASSERT(
hash.isNonZero(), "xrpl::InboundLedgersImp::acquire::doAcquire : nonzero hash");
bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger();
bool const shouldAcquire = [&]() {
if (!needNetworkLedger)
return true;
if (reason == InboundLedger::Reason::GENERIC)
return true;
if (reason == InboundLedger::Reason::CONSENSUS)
return true;
return false;
}();
std::stringstream ss;
ss << "InboundLedger::acquire: "
<< "Request: " << to_string(hash) << ", " << seq
<< " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no")
<< " Reason: " << to_string(reason)
<< " Should acquire: " << (shouldAcquire ? "true." : "false.");
/* Acquiring ledgers is somewhat expensive. It requires lots of
* computation and network communication. Avoid it when it's not
* appropriate. Every validation from a peer for a ledger that
* we do not have locally results in a call to this function: even
* if we are moments away from validating the same ledger.
*/
bool const shouldBroadcast = [&]() {
// If the node is not in "full" state, it needs to sync to
// the network, and doesn't have the necessary tx's and
// ledger entries to build the ledger.
bool const isFull = app_.getOPs().isFull();
// If everything else is ok, don't try to acquire the ledger
// if the requested seq is in the near future relative to
// the validated ledger. If the requested ledger is between
// 1 and 19 inclusive ledgers ahead of the valid ledger this
// node has not built it yet, but it's possible/likely it
// has the tx's necessary to build it and get caught up.
// Plus it might not become validated. On the other hand, if
// it's more than 20 in the future, this node should request
// it so that it can jump ahead and get caught up.
LedgerIndex const validSeq = app_.getLedgerMaster().getValidLedgerIndex();
constexpr std::size_t lagLeeway = 20;
bool const nearFuture = (seq > validSeq) && (seq < validSeq + lagLeeway);
// If everything else is ok, don't try to acquire the ledger
// if the request is related to consensus. (Note that
// consensus calls usually pass a seq of 0, so nearFuture
// will be false other than on a brand new network.)
bool const consensus = reason == InboundLedger::Reason::CONSENSUS;
ss << " Evaluating whether to broadcast requests to peers"
<< ". full: " << (isFull ? "true" : "false") << ". ledger sequence " << seq
<< ". Valid sequence: " << validSeq << ". Lag leeway: " << lagLeeway
<< ". request for near future ledger: " << (nearFuture ? "true" : "false")
<< ". Consensus: " << (consensus ? "true" : "false");
// If the node is not synced, send requests.
if (!isFull)
return true;
// If the ledger is in the near future, do NOT send requests.
// This node is probably about to build it.
if (nearFuture)
return false;
// If the request is because of consensus, do NOT send requests.
// This node is probably about to build it.
if (consensus)
return false;
return true;
}();
ss << ". Would broadcast to peers? " << (shouldBroadcast ? "true." : "false.");
if (!shouldAcquire)
{
JLOG(j_.debug()) << "Abort(rule): " << ss.str();
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() && (reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
return {};
}
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
@@ -158,7 +89,6 @@ public:
ScopedLockType sl(lock_);
if (stopping_)
{
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {};
}
@@ -177,51 +107,47 @@ public:
++counter_;
}
}
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {};
}
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger();
};
using namespace std::chrono_literals;
return perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
std::shared_ptr<Ledger const> ledger =
perf::measureDurationAndLog(doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
}
void
acquireAsync(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason reason) override
{
if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
std::unique_lock lock(acquiresMutex_);
try
{
try
{
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
"inbound ledger "
<< hash;
}
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
ScopeUnlock const unlock(lock);
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn()) << "Exception thrown for acquiring new inbound ledger " << hash << ": "
<< e.what();
}
catch (...)
{
JLOG(j_.warn()) << "Unknown exception thrown for acquiring new inbound ledger " << hash;
}
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>

View File

@@ -965,9 +965,8 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
return;
}
JLOG(journal_.info()) << "Advancing accepted ledger to " << ledger->header().seq << " ("
<< toShortString(ledger->header().hash) << ") with >= " << minVal
<< " validations";
JLOG(journal_.info()) << "Advancing accepted ledger to " << ledger->header().seq
<< " with >= " << minVal << " validations";
ledger->setValidated();
ledger->setFull();

View File

@@ -25,8 +25,7 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter,
beast::Journal journal)
: app_(app)
, sink_(journal, toShortString(hash) + " ")
, journal_(sink_)
, journal_(journal)
, hash_(hash)
, timerInterval_(interval)
, queueJobParameter_(std::move(jobParameter))
@@ -42,7 +41,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{
if (isDone())
return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() << "ms";
timer_.expires_after(timerInterval_);
timer_.async_wait([wptr = pmDowncast()](boost::system::error_code const& ec) {
if (ec == boost::asio::error::operation_aborted)
@@ -50,10 +48,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock())
{
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec
<< " (operation_aborted: " << boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted" : "other") << ")";
ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl);
}

View File

@@ -3,7 +3,6 @@
#include <xrpld/app/main/Application.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <xrpl/core/Job.h>
#include <boost/asio/basic_waitable_timer.hpp>
@@ -104,7 +103,6 @@ protected:
// Used in this class for access to boost::asio::io_context and
// xrpl::Overlay. Used in subtypes for the kitchen sink.
Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_;
mutable std::recursive_mutex mtx_;

View File

@@ -34,7 +34,6 @@
#include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/ToString.h>
#include <xrpl/basics/UnorderedContainers.h>
@@ -486,7 +485,7 @@ public:
isFull() override;
void
setMode(OperatingMode om, char const* reason) override;
setMode(OperatingMode om) override;
bool
isBlocked() override;
@@ -972,7 +971,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void
NetworkOPsImp::setStandAlone()
{
setMode(OperatingMode::FULL, "setStandAlone");
setMode(OperatingMode::FULL);
}
inline void
@@ -1115,7 +1114,7 @@ NetworkOPsImp::processHeartbeatTimer()
{
if (mode_ != OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::DISCONNECTED, "Heartbeat: insufficient peers");
setMode(OperatingMode::DISCONNECTED);
std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ").";
@@ -1139,7 +1138,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mode_ == OperatingMode::DISCONNECTED)
{
setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
setMode(OperatingMode::CONNECTED);
JLOG(journal_.info()) << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers << " peers. ";
}
@@ -1150,11 +1149,11 @@ NetworkOPsImp::processHeartbeatTimer()
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mode_ == OperatingMode::SYNCING)
{
setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
setMode(OperatingMode::SYNCING);
}
else if (mode_ == OperatingMode::CONNECTED)
{
setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
setMode(OperatingMode::CONNECTED);
}
auto newMode = mode_.load();
if (origMode != newMode)
@@ -1859,7 +1858,7 @@ void
NetworkOPsImp::setAmendmentBlocked()
{
amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
setMode(OperatingMode::CONNECTED);
}
inline bool
@@ -1890,7 +1889,7 @@ void
NetworkOPsImp::setUNLBlocked()
{
unlBlocked_ = true;
setMode(OperatingMode::CONNECTED, "setUNLBlocked");
setMode(OperatingMode::CONNECTED);
}
inline void
@@ -1990,7 +1989,7 @@ NetworkOPsImp::checkLastClosedLedger(Overlay::PeerSequence const& peerList, uint
if ((mode_ == OperatingMode::TRACKING) || (mode_ == OperatingMode::FULL))
{
setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
setMode(OperatingMode::CONNECTED);
}
if (consensus)
@@ -2078,8 +2077,8 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers
if (mode_ == OperatingMode::FULL)
{
JLOG(journal_.warn()) << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
JLOG(journal_.warn()) << "Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING);
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
}
@@ -2207,7 +2206,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO
if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
setMode(OperatingMode::TRACKING);
}
if (((mode_ == OperatingMode::CONNECTED) || (mode_ == OperatingMode::TRACKING)) &&
@@ -2220,7 +2219,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (registry_.get().getTimeKeeper().now() <
(current->header().parentCloseTime + 2 * current->header().closeTimeResolution))
{
setMode(OperatingMode::FULL, "endConsensus: check full");
setMode(OperatingMode::FULL);
}
}
@@ -2232,7 +2231,7 @@ NetworkOPsImp::consensusViewChange()
{
if ((mode_ == OperatingMode::FULL) || (mode_ == OperatingMode::TRACKING))
{
setMode(OperatingMode::CONNECTED, "consensusViewChange");
setMode(OperatingMode::CONNECTED);
}
}
@@ -2536,7 +2535,7 @@ NetworkOPsImp::pubPeerStatus(std::function<json::Value(void)> const& func)
}
void
NetworkOPsImp::setMode(OperatingMode om, char const* reason)
NetworkOPsImp::setMode(OperatingMode om)
{
using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED)
@@ -2556,12 +2555,11 @@ NetworkOPsImp::setMode(OperatingMode om, char const* reason)
if (mode_ == om)
return;
auto const sink = om < mode_ ? journal_.warn() : journal_.info();
mode_ = om;
accounting_.mode(om);
JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
JLOG(journal_.info()) << "STATE->" << strOperatingMode();
pubServer();
}
@@ -2570,24 +2568,36 @@ NetworkOPsImp::recvValidation(std::shared_ptr<STValidation> const& val, std::str
{
JLOG(journal_.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::No;
try
{
CanProcess const check(validationsMutex_, pendingValidations_, val->getLedgerHash());
try
if (pendingValidations_.contains(val->getLedgerHash()))
{
BypassAccept bypassAccept = check ? BypassAccept::No : BypassAccept::Yes;
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, journal_);
bypassAccept = BypassAccept::Yes;
}
catch (std::exception const& e)
else
{
JLOG(journal_.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(journal_.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
pendingValidations_.insert(val->getLedgerHash());
}
ScopeUnlock const unlock(lock);
handleNewValidation(registry_.get().getApp(), val, source, bypassAccept, journal_);
}
catch (std::exception const& e)
{
JLOG(journal_.warn()) << "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(journal_.warn()) << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::No)
{
pendingValidations_.erase(val->getLedgerHash());
}
lock.unlock();
pubValidation(val);

View File

@@ -200,11 +200,11 @@ public:
iter->second.upVotes |
boost::adaptors::transformed(to_string<256, void>),
", ");
// TODO: Maybe transform using toShortString once #5126 is
// TODO: Maybe transform using to_short_string once #5126 is
// merged
//
// iter->second.upVotes |
// boost::adaptors::transformed(toShortString<256, void>)
// boost::adaptors::transformed(to_short_string<256, void>)
}
else
{

View File

@@ -17,7 +17,6 @@ enum class ProtocolFeature {
ValidatorListPropagation,
ValidatorList2Propagation,
LedgerReplay,
LedgerDataCookies
};
/** Represents a peer connection in the overlay. */
@@ -117,13 +116,6 @@ public:
[[nodiscard]] virtual bool
txReduceRelayEnabled() const = 0;
//
// Messages
//
virtual std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) = 0;
};
} // namespace xrpl

File diff suppressed because it is too large Load Diff

View File

@@ -178,13 +178,6 @@ private:
bool ledgerReplayEnabled_ = false;
LedgerReplayMsgHandler ledgerReplayMsgHandler_;
// Track message requests and responses
// TODO: Use an expiring cache or something
using MessageCookieMap = std::map<uint256, std::set<std::optional<uint64_t>>>;
using PeerCookieMap = std::map<std::shared_ptr<Peer>, std::set<std::optional<uint64_t>>>;
std::mutex mutable cookieLock_;
MessageCookieMap messageRequestCookies_;
friend class OverlayImpl;
class Metrics
@@ -429,13 +422,6 @@ public:
return txReduceRelayEnabled_;
}
//
// Messages
//
std::set<std::optional<uint64_t>>
releaseRequestCookies(uint256 const& requestHash) override;
private:
void
close();
@@ -634,22 +620,16 @@ private:
std::shared_ptr<protocol::TMValidation> const& packet);
void
sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData,
PeerCookieMap const& destinations);
void
sendToMultiple(protocol::TMLedgerData& ledgerData, PeerCookieMap const& destinations);
sendLedgerBase(std::shared_ptr<Ledger const> const& ledger, protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash) const;
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m, uint256 const& mHash);
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
protected:
// Kept `protected` so test subclasses (see

View File

@@ -7,9 +7,6 @@
#include <xrpl/basics/Log.h>
#include <xrpl/beast/utility/Journal.h>
#include <xrpl/core/HashRouter.h>
#include <xrpl/core/JobQueue.h>
#include <xrpl/protocol/digest.h>
#include <google/protobuf/message.h>
@@ -100,45 +97,16 @@ PeerSetImpl::sendRequest(
std::shared_ptr<Peer> const& peer)
{
auto packet = std::make_shared<Message>(message, type);
auto const messageHash = [&]() {
auto const packetBuffer = packet->getBuffer(compression::Compressed::Off);
return sha512Half(Slice(packetBuffer.data(), packetBuffer.size()));
}();
// Allow messages to be re-sent to the same peer after a delay
using namespace std::chrono_literals;
constexpr std::chrono::seconds interval = 30s;
if (peer)
{
if (app_.getHashRouter().shouldProcessForPeer(messageHash, peer->id(), interval))
{
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to ["
<< peer->id() << "]: " << messageHash;
peer->send(packet);
}
else
JLOG(journal_.debug()) << "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << peer->id() << "]: " << messageHash;
peer->send(packet);
return;
}
for (auto id : peers_)
{
if (auto p = app_.getOverlay().findPeerByShortID(id))
{
if (app_.getHashRouter().shouldProcessForPeer(messageHash, p->id(), interval))
{
JLOG(journal_.trace()) << "Sending " << protocolMessageName(type) << " message to ["
<< p->id() << "]: " << messageHash;
p->send(packet);
}
else
JLOG(journal_.debug())
<< "Suppressing sending duplicate " << protocolMessageName(type)
<< " message to [" << p->id() << "]: " << messageHash;
}
p->send(packet);
}
}

View File

@@ -23,12 +23,6 @@ protocolMessageType(protocol::TMGetLedger const&)
return protocol::mtGET_LEDGER;
}
inline protocol::MessageType
protocolMessageType(protocol::TMLedgerData const&)
{
return protocol::mtLEDGER_DATA;
}
inline protocol::MessageType
protocolMessageType(protocol::TMReplayDeltaRequest const&)
{
@@ -440,63 +434,3 @@ invokeProtocolMessage(Buffers const& buffers, Handler& handler, std::size_t& hin
}
} // namespace xrpl
namespace protocol {
template <class Hasher>
void
hash_append(Hasher& h, TMGetLedger const& msg)
{
using beast::hash_append;
using namespace xrpl;
hash_append(h, safeCast<int>(protocolMessageType(msg)));
hash_append(h, safeCast<int>(msg.itype()));
if (msg.has_ltype())
hash_append(h, safeCast<int>(msg.ltype()));
if (msg.has_ledgerhash())
hash_append(h, msg.ledgerhash());
if (msg.has_ledgerseq())
hash_append(h, msg.ledgerseq());
for (auto const& nodeId : msg.nodeids())
hash_append(h, nodeId);
hash_append(h, msg.nodeids_size());
// Do NOT include the request cookie. It does not affect the content of the
// request, but only where to route the results.
// if (msg.has_requestcookie())
// hash_append(h, msg.requestcookie());
if (msg.has_querytype())
hash_append(h, safeCast<int>(msg.querytype()));
if (msg.has_querydepth())
hash_append(h, msg.querydepth());
}
template <class Hasher>
void
hash_append(Hasher& h, TMLedgerData const& msg)
{
using beast::hash_append;
using namespace xrpl;
hash_append(h, safeCast<int>(protocolMessageType(msg)));
hash_append(h, msg.ledgerhash());
hash_append(h, msg.ledgerseq());
hash_append(h, safeCast<int>(msg.type()));
for (auto const& node : msg.nodes())
{
hash_append(h, node.nodedata());
if (node.has_nodeid())
hash_append(h, node.nodeid());
}
hash_append(h, msg.nodes_size());
if (msg.has_requestcookie())
hash_append(h, msg.requestcookie());
if (msg.has_error())
hash_append(h, safeCast<int>(msg.error()));
}
} // namespace protocol

View File

@@ -28,8 +28,6 @@ namespace xrpl {
constexpr ProtocolVersion const kSupportedProtocolList[]{
{2, 1},
{2, 2},
// Adds TMLedgerData::responseCookies and directResponse
{2, 3},
};
// This ugly construct ensures that supportedProtocolList is sorted in strictly