diff --git a/Builds/levelization/results/loops.txt b/Builds/levelization/results/loops.txt index 06ab5266c9..7c132f5429 100644 --- a/Builds/levelization/results/loops.txt +++ b/Builds/levelization/results/loops.txt @@ -14,7 +14,7 @@ Loop: xrpld.app xrpld.net xrpld.app > xrpld.net Loop: xrpld.app xrpld.overlay - xrpld.overlay ~= xrpld.app + xrpld.overlay == xrpld.app Loop: xrpld.app xrpld.peerfinder xrpld.app > xrpld.peerfinder diff --git a/include/xrpl/basics/CanProcess.h b/include/xrpl/basics/CanProcess.h deleted file mode 100644 index 3ee49d0087..0000000000 --- a/include/xrpl/basics/CanProcess.h +++ /dev/null @@ -1,134 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2024 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED -#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED - -#include -#include -#include - -/** RAII class to check if an Item is already being processed on another thread, - * as indicated by it's presence in a Collection. - * - * If the Item is not in the Collection, it will be added under lock in the - * ctor, and removed under lock in the dtor. The object will be considered - * "usable" and evaluate to `true`. - * - * If the Item is in the Collection, no changes will be made to the collection, - * and the CanProcess object will be considered "unusable". - * - * It's up to the caller to decide what "usable" and "unusable" mean. (e.g. - * Process or skip a block of code, or set a flag.) - * - * The current use is to avoid lock contention that would be involved in - * processing something associated with the Item. - * - * Examples: - * - * void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...) - * { - * if (CanProcess check{acquiresMutex_, pendingAcquires_, hash}) - * { - * acquire(hash, ...); - * } - * } - * - * bool - * NetworkOPsImp::recvValidation( - * std::shared_ptr const& val, - * std::string const& source) - * { - * CanProcess check( - * validationsMutex_, pendingValidations_, val->getLedgerHash()); - * BypassAccept bypassAccept = - * check ? BypassAccept::no : BypassAccept::yes; - * handleNewValidation(app_, val, source, bypassAccept, m_journal); - * } - * - */ -class CanProcess -{ -public: - template - CanProcess(Mutex& mtx, Collection& collection, Item const& item) - : cleanup_(insert(mtx, collection, item)) - { - } - - ~CanProcess() - { - if (cleanup_) - cleanup_(); - } - - explicit - operator bool() const - { - return static_cast(cleanup_); - } - -private: - template - std::function - doInsert(Mutex& mtx, Collection& collection, Item const& item) - { - std::unique_lock lock(mtx); - // TODO: Use structured binding once LLVM 16 is the minimum supported - // version. See also: https://github.com/llvm/llvm-project/issues/48582 - // https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c - auto const insertResult = collection.insert(item); - auto const it = insertResult.first; - if (!insertResult.second) - return {}; - if constexpr (useIterator) - return [&, it]() { - std::unique_lock lock(mtx); - collection.erase(it); - }; - else - return [&]() { - std::unique_lock lock(mtx); - collection.erase(item); - }; - } - - // Generic insert() function doesn't use iterators because they may get - // invalidated - template - std::function - insert(Mutex& mtx, Collection& collection, Item const& item) - { - return doInsert(mtx, collection, item); - } - - // Specialize insert() for std::set, which does not invalidate iterators for - // insert and erase - template - std::function - insert(Mutex& mtx, std::set& collection, Item const& item) - { - return doInsert(mtx, collection, item); - } - - // If set, then the item is "usable" - std::function cleanup_; -}; - -#endif diff --git a/include/xrpl/basics/base_uint.h b/include/xrpl/basics/base_uint.h index a2c714f4be..05d83b3bb0 100644 --- a/include/xrpl/basics/base_uint.h +++ b/include/xrpl/basics/base_uint.h @@ -631,13 +631,6 @@ to_string(base_uint const& a) return strHex(a.cbegin(), a.cend()); } -template -inline std::string -to_short_string(base_uint const& a) -{ - return strHex(a.cbegin(), a.cend()).substr(0, 8) + "..."; -} - template inline std::ostream& operator<<(std::ostream& out, base_uint const& u) diff --git a/include/xrpl/proto/ripple.proto b/include/xrpl/proto/ripple.proto index e121a39706..a06bbd9a31 100644 --- a/include/xrpl/proto/ripple.proto +++ b/include/xrpl/proto/ripple.proto @@ -321,18 +321,8 @@ message TMLedgerData required uint32 ledgerSeq = 2; required TMLedgerInfoType type = 3; repeated TMLedgerNode nodes = 4; - // If the peer supports "responseCookies", this field will - // never be populated. optional uint32 requestCookie = 5; optional TMReplyError error = 6; - // The old field is called "requestCookie", but this is - // a response, so this name makes more sense - repeated uint32 responseCookies = 7; - // If a TMGetLedger request was received without a "requestCookie", - // and the peer supports it, this flag will be set to true to - // indicate that the receiver should process the result in addition - // to forwarding it to its "responseCookies" peers. - optional bool directResponse = 8; } message TMPing diff --git a/include/xrpl/protocol/LedgerHeader.h b/include/xrpl/protocol/LedgerHeader.h index 806e732593..0b35979971 100644 --- a/include/xrpl/protocol/LedgerHeader.h +++ b/include/xrpl/protocol/LedgerHeader.h @@ -55,8 +55,6 @@ struct LedgerHeader // If validated is false, it means "not yet validated." // Once validated is true, it will never be set false at a later time. - // NOTE: If you are accessing this directly, you are probably doing it - // wrong. Use LedgerMaster::isValidated(). // VFALCO TODO Make this not mutable bool mutable validated = false; bool accepted = false; diff --git a/src/test/app/HashRouter_test.cpp b/src/test/app/HashRouter_test.cpp index 68e0d83065..1234bc5b9c 100644 --- a/src/test/app/HashRouter_test.cpp +++ b/src/test/app/HashRouter_test.cpp @@ -242,33 +242,6 @@ class HashRouter_test : public beast::unit_test::suite BEAST_EXPECT(router.shouldProcess(key, peer, flags, 1s)); } - void - testProcessPeer() - { - using namespace std::chrono_literals; - TestStopwatch stopwatch; - HashRouter router(stopwatch, 5s); - uint256 const key(1); - HashRouter::PeerShortID peer1 = 1; - HashRouter::PeerShortID peer2 = 2; - auto const timeout = 2s; - - BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); - BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout)); - ++stopwatch; - BEAST_EXPECT(!router.shouldProcessForPeer(key, peer1, timeout)); - BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout)); - BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); - ++stopwatch; - BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); - BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); - ++stopwatch; - BEAST_EXPECT(router.shouldProcessForPeer(key, peer2, timeout)); - ++stopwatch; - BEAST_EXPECT(router.shouldProcessForPeer(key, peer1, timeout)); - BEAST_EXPECT(!router.shouldProcessForPeer(key, peer2, timeout)); - } - public: void run() override @@ -279,7 +252,6 @@ public: testSetFlags(); testRelay(); testProcess(); - testProcessPeer(); } }; diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index d4911f8283..883aca7bce 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -322,11 +322,6 @@ public: { return false; } - std::set> - releaseRequestCookies(uint256 const& requestHash) override - { - return {}; - } bool ledgerReplayEnabled_; PublicKey nodePublicKey_; diff --git a/src/test/basics/base_uint_test.cpp b/src/test/basics/base_uint_test.cpp index 50411461e0..9f3194f4fb 100644 --- a/src/test/basics/base_uint_test.cpp +++ b/src/test/basics/base_uint_test.cpp @@ -151,7 +151,6 @@ struct base_uint_test : beast::unit_test::suite uset.insert(u); BEAST_EXPECT(raw.size() == u.size()); BEAST_EXPECT(to_string(u) == "0102030405060708090A0B0C"); - BEAST_EXPECT(to_short_string(u) == "01020304..."); BEAST_EXPECT(*u.data() == 1); BEAST_EXPECT(u.signum() == 1); BEAST_EXPECT(!!u); @@ -174,7 +173,6 @@ struct base_uint_test : beast::unit_test::suite test96 v{~u}; uset.insert(v); BEAST_EXPECT(to_string(v) == "FEFDFCFBFAF9F8F7F6F5F4F3"); - BEAST_EXPECT(to_short_string(v) == "FEFDFCFB..."); BEAST_EXPECT(*v.data() == 0xfe); BEAST_EXPECT(v.signum() == 1); BEAST_EXPECT(!!v); @@ -195,7 +193,6 @@ struct base_uint_test : beast::unit_test::suite test96 z{beast::zero}; uset.insert(z); BEAST_EXPECT(to_string(z) == "000000000000000000000000"); - BEAST_EXPECT(to_short_string(z) == "00000000..."); BEAST_EXPECT(*z.data() == 0); BEAST_EXPECT(*z.begin() == 0); BEAST_EXPECT(*std::prev(z.end(), 1) == 0); @@ -216,7 +213,6 @@ struct base_uint_test : beast::unit_test::suite BEAST_EXPECT(n == z); n--; BEAST_EXPECT(to_string(n) == "FFFFFFFFFFFFFFFFFFFFFFFF"); - BEAST_EXPECT(to_short_string(n) == "FFFFFFFF..."); n = beast::zero; BEAST_EXPECT(n == z); @@ -227,7 +223,6 @@ struct base_uint_test : beast::unit_test::suite test96 x{zm1 ^ zp1}; uset.insert(x); BEAST_EXPECTS(to_string(x) == "FFFFFFFFFFFFFFFFFFFFFFFE", to_string(x)); - BEAST_EXPECTS(to_short_string(x) == "FFFFFFFF...", to_short_string(x)); BEAST_EXPECT(uset.size() == 4); diff --git a/src/test/overlay/ProtocolVersion_test.cpp b/src/test/overlay/ProtocolVersion_test.cpp index 97469c5980..dfc0ee70b8 100644 --- a/src/test/overlay/ProtocolVersion_test.cpp +++ b/src/test/overlay/ProtocolVersion_test.cpp @@ -87,8 +87,8 @@ public: negotiateProtocolVersion("XRPL/2.2") == make_protocol(2, 2)); BEAST_EXPECT( negotiateProtocolVersion( - "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/2.4, XRPL/999.999") == - make_protocol(2, 3)); + "RTXP/1.2, XRPL/2.2, XRPL/2.3, XRPL/999.999") == + make_protocol(2, 2)); BEAST_EXPECT( negotiateProtocolVersion("XRPL/999.999, WebSocket/1.0") == std::nullopt); diff --git a/src/test/overlay/reduce_relay_test.cpp b/src/test/overlay/reduce_relay_test.cpp index e907f60b0e..e0edae5489 100644 --- a/src/test/overlay/reduce_relay_test.cpp +++ b/src/test/overlay/reduce_relay_test.cpp @@ -182,11 +182,6 @@ public: removeTxQueue(const uint256&) override { } - std::set> - releaseRequestCookies(uint256 const& requestHash) override - { - return {}; - } }; /** Manually advanced clock. */ diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index 47414cd20a..a746b30357 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -1073,8 +1073,7 @@ void RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const { if (!positions && app_.getOPs().isFull()) - app_.getOPs().setMode( - OperatingMode::CONNECTED, "updateOperatingMode: no positions"); + app_.getOPs().setMode(OperatingMode::CONNECTED); } void diff --git a/src/xrpld/app/ledger/InboundLedger.h b/src/xrpld/app/ledger/InboundLedger.h index ccd9aa0710..13f603e79d 100644 --- a/src/xrpld/app/ledger/InboundLedger.h +++ b/src/xrpld/app/ledger/InboundLedger.h @@ -196,25 +196,6 @@ private: std::unique_ptr mPeerSet; }; -inline std::string -to_string(InboundLedger::Reason reason) -{ - using enum InboundLedger::Reason; - switch (reason) - { - case HISTORY: - return "HISTORY"; - case GENERIC: - return "GENERIC"; - case CONSENSUS: - return "CONSENSUS"; - default: - UNREACHABLE( - "ripple::to_string(InboundLedger::Reason) : unknown value"); - return "unknown"; - } -} - } // namespace ripple #endif diff --git a/src/xrpld/app/ledger/detail/InboundLedger.cpp b/src/xrpld/app/ledger/detail/InboundLedger.cpp index ca955d14ff..32fdff76ab 100644 --- a/src/xrpld/app/ledger/detail/InboundLedger.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedger.cpp @@ -392,14 +392,7 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&) if (!wasProgress) { - if (checkLocal()) - { - // Done. Something else (probably consensus) built the ledger - // locally while waiting for data (or possibly before requesting) - XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done"); - JLOG(journal_.info()) << "Finished while waiting " << hash_; - return; - } + checkLocal(); mByHash = true; @@ -509,17 +502,15 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (auto stream = journal_.debug()) { - std::stringstream ss; - ss << "Trigger acquiring ledger " << hash_; + stream << "Trigger acquiring ledger " << hash_; if (peer) - ss << " from " << peer; + stream << " from " << peer; if (complete_ || failed_) - ss << " complete=" << complete_ << " failed=" << failed_; + stream << "complete=" << complete_ << " failed=" << failed_; else - ss << " header=" << mHaveHeader << " tx=" << mHaveTransactions - << " as=" << mHaveState; - stream << ss.str(); + stream << "header=" << mHaveHeader << " tx=" << mHaveTransactions + << " as=" << mHaveState; } if (!mHaveHeader) diff --git a/src/xrpld/app/ledger/detail/InboundLedgers.cpp b/src/xrpld/app/ledger/detail/InboundLedgers.cpp index a6699aa73f..99a26ce8f9 100644 --- a/src/xrpld/app/ledger/detail/InboundLedgers.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedgers.cpp @@ -23,9 +23,9 @@ #include #include #include -#include #include #include +#include #include #include #include @@ -77,85 +77,11 @@ public: hash.isNonZero(), "ripple::InboundLedgersImp::acquire::doAcquire : nonzero hash"); - bool const needNetworkLedger = app_.getOPs().isNeedNetworkLedger(); - bool const shouldAcquire = [&]() { - if (!needNetworkLedger) - return true; - if (reason == InboundLedger::Reason::GENERIC) - return true; - if (reason == InboundLedger::Reason::CONSENSUS) - return true; - return false; - }(); - - std::stringstream ss; - ss << "InboundLedger::acquire: " - << "Request: " << to_string(hash) << ", " << seq - << " NeedNetworkLedger: " << (needNetworkLedger ? "yes" : "no") - << " Reason: " << to_string(reason) - << " Should acquire: " << (shouldAcquire ? "true." : "false."); - - /* Acquiring ledgers is somewhat expensive. It requires lots of - * computation and network communication. Avoid it when it's not - * appropriate. Every validation from a peer for a ledger that - * we do not have locally results in a call to this function: even - * if we are moments away from validating the same ledger. - */ - bool const shouldBroadcast = [&]() { - // If the node is not in "full" state, it needs to sync to - // the network, and doesn't have the necessary tx's and - // ledger entries to build the ledger. - bool const isFull = app_.getOPs().isFull(); - // If everything else is ok, don't try to acquire the ledger - // if the requested seq is in the near future relative to - // the validated ledger. If the requested ledger is between - // 1 and 19 inclusive ledgers ahead of the valid ledger this - // node has not built it yet, but it's possible/likely it - // has the tx's necessary to build it and get caught up. - // Plus it might not become validated. On the other hand, if - // it's more than 20 in the future, this node should request - // it so that it can jump ahead and get caught up. - LedgerIndex const validSeq = - app_.getLedgerMaster().getValidLedgerIndex(); - constexpr std::size_t lagLeeway = 20; - bool const nearFuture = - (seq > validSeq) && (seq < validSeq + lagLeeway); - // If everything else is ok, don't try to acquire the ledger - // if the request is related to consensus. (Note that - // consensus calls usually pass a seq of 0, so nearFuture - // will be false other than on a brand new network.) - bool const consensus = - reason == InboundLedger::Reason::CONSENSUS; - ss << " Evaluating whether to broadcast requests to peers" - << ". full: " << (isFull ? "true" : "false") - << ". ledger sequence " << seq - << ". Valid sequence: " << validSeq - << ". Lag leeway: " << lagLeeway - << ". request for near future ledger: " - << (nearFuture ? "true" : "false") - << ". Consensus: " << (consensus ? "true" : "false"); - - // If the node is not synced, send requests. - if (!isFull) - return true; - // If the ledger is in the near future, do NOT send requests. - // This node is probably about to build it. - if (nearFuture) - return false; - // If the request is because of consensus, do NOT send requests. - // This node is probably about to build it. - if (consensus) - return false; - return true; - }(); - ss << ". Would broadcast to peers? " - << (shouldBroadcast ? "true." : "false."); - - if (!shouldAcquire) - { - JLOG(j_.debug()) << "Abort(rule): " << ss.str(); + // probably not the right rule + if (app_.getOPs().isNeedNetworkLedger() && + (reason != InboundLedger::Reason::GENERIC) && + (reason != InboundLedger::Reason::CONSENSUS)) return {}; - } bool isNew = true; std::shared_ptr inbound; @@ -163,7 +89,6 @@ public: ScopedLockType sl(mLock); if (stopping_) { - JLOG(j_.debug()) << "Abort(stopping): " << ss.str(); return {}; } @@ -187,29 +112,23 @@ public: ++mCounter; } } - ss << " IsNew: " << (isNew ? "true" : "false"); if (inbound->isFailed()) - { - JLOG(j_.debug()) << "Abort(failed): " << ss.str(); return {}; - } if (!isNew) inbound->update(seq); if (!inbound->isComplete()) - { - JLOG(j_.debug()) << "InProgress: " << ss.str(); return {}; - } - JLOG(j_.debug()) << "Complete: " << ss.str(); return inbound->getLedger(); }; using namespace std::chrono_literals; - return perf::measureDurationAndLog( + std::shared_ptr ledger = perf::measureDurationAndLog( doAcquire, "InboundLedgersImp::acquire", 500ms, j_); + + return ledger; } void @@ -218,25 +137,28 @@ public: std::uint32_t seq, InboundLedger::Reason reason) override { - if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash}) + std::unique_lock lock(acquiresMutex_); + try { - try - { - acquire(hash, seq, reason); - } - catch (std::exception const& e) - { - JLOG(j_.warn()) - << "Exception thrown for acquiring new inbound ledger " - << hash << ": " << e.what(); - } - catch (...) - { - JLOG(j_.warn()) << "Unknown exception thrown for acquiring new " - "inbound ledger " - << hash; - } + if (pendingAcquires_.contains(hash)) + return; + pendingAcquires_.insert(hash); + scope_unlock unlock(lock); + acquire(hash, seq, reason); } + catch (std::exception const& e) + { + JLOG(j_.warn()) + << "Exception thrown for acquiring new inbound ledger " << hash + << ": " << e.what(); + } + catch (...) + { + JLOG(j_.warn()) + << "Unknown exception thrown for acquiring new inbound ledger " + << hash; + } + pendingAcquires_.erase(hash); } std::shared_ptr diff --git a/src/xrpld/app/ledger/detail/LedgerMaster.cpp b/src/xrpld/app/ledger/detail/LedgerMaster.cpp index 7875541e7b..6bc894da48 100644 --- a/src/xrpld/app/ledger/detail/LedgerMaster.cpp +++ b/src/xrpld/app/ledger/detail/LedgerMaster.cpp @@ -973,9 +973,8 @@ LedgerMaster::checkAccept(std::shared_ptr const& ledger) } JLOG(m_journal.info()) << "Advancing accepted ledger to " - << ledger->info().seq << " (" - << to_short_string(ledger->info().hash) - << ") with >= " << minVal << " validations"; + << ledger->info().seq << " with >= " << minVal + << " validations"; ledger->setValidated(); ledger->setFull(); diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp index 343bbd83db..35d8f1fffb 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp @@ -33,8 +33,7 @@ TimeoutCounter::TimeoutCounter( QueueJobParameter&& jobParameter, beast::Journal journal) : app_(app) - , sink_(journal, to_short_string(hash) + " ") - , journal_(sink_) + , journal_(journal) , hash_(hash) , timeouts_(0) , complete_(false) @@ -54,8 +53,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl) { if (isDone()) return; - JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() - << "ms"; timer_.expires_after(timerInterval_); timer_.async_wait( [wptr = pmDowncast()](boost::system::error_code const& ec) { @@ -64,12 +61,6 @@ TimeoutCounter::setTimer(ScopedLockType& sl) if (auto ptr = wptr.lock()) { - JLOG(ptr->journal_.debug()) - << "timer: ec: " << ec << " (operation_aborted: " - << boost::asio::error::operation_aborted << " - " - << (ec == boost::asio::error::operation_aborted ? "aborted" - : "other") - << ")"; ScopedLockType sl(ptr->mtx_); ptr->queueJob(sl); } diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.h b/src/xrpld/app/ledger/detail/TimeoutCounter.h index a65208a938..228e879d4d 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.h +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.h @@ -24,8 +24,6 @@ #include #include #include -#include - #include #include @@ -123,7 +121,6 @@ protected: // Used in this class for access to boost::asio::io_service and // ripple::Overlay. Used in subtypes for the kitchen sink. Application& app_; - beast::WrappedSink sink_; beast::Journal journal_; mutable std::recursive_mutex mtx_; diff --git a/src/xrpld/app/misc/HashRouter.cpp b/src/xrpld/app/misc/HashRouter.cpp index 28d2449db5..58e811d4b8 100644 --- a/src/xrpld/app/misc/HashRouter.cpp +++ b/src/xrpld/app/misc/HashRouter.cpp @@ -90,20 +90,6 @@ HashRouter::shouldProcess( return s.shouldProcess(suppressionMap_.clock().now(), tx_interval); } -bool -HashRouter::shouldProcessForPeer( - uint256 const& key, - PeerShortID peer, - std::chrono::seconds interval) -{ - std::lock_guard lock(mutex_); - - auto& entry = emplace(key).first; - - return entry.shouldProcessForPeer( - peer, suppressionMap_.clock().now(), interval); -} - int HashRouter::getFlags(uint256 const& key) { @@ -142,13 +128,4 @@ HashRouter::shouldRelay(uint256 const& key) return s.releasePeerSet(); } -auto -HashRouter::getPeers(uint256 const& key) -> std::set -{ - std::lock_guard lock(mutex_); - - auto& s = emplace(key).first; - return s.peekPeerSet(); -} - } // namespace ripple diff --git a/src/xrpld/app/misc/HashRouter.h b/src/xrpld/app/misc/HashRouter.h index 403c7ce860..e9d040fc8b 100644 --- a/src/xrpld/app/misc/HashRouter.h +++ b/src/xrpld/app/misc/HashRouter.h @@ -92,13 +92,6 @@ private: return std::move(peers_); } - /** Return set of peers waiting for reply. Leaves list unchanged. */ - std::set const& - peekPeerSet() - { - return peers_; - } - /** Return seated relay time point if the message has been relayed */ std::optional relayed() const @@ -132,21 +125,6 @@ private: return true; } - bool - shouldProcessForPeer( - PeerShortID peer, - Stopwatch::time_point now, - std::chrono::seconds interval) - { - if (peerProcessed_.contains(peer) && - ((peerProcessed_[peer] + interval) > now)) - return false; - // Peer may already be in the list, but adding it again doesn't hurt - addPeer(peer); - peerProcessed_[peer] = now; - return true; - } - private: int flags_ = 0; std::set peers_; @@ -154,7 +132,6 @@ private: // than one flag needs to expire independently. std::optional relayed_; std::optional processed_; - std::map peerProcessed_; }; public: @@ -186,7 +163,7 @@ public: /** Add a suppression peer and get message's relay status. * Return pair: - * element 1: true if the key is added. + * element 1: true if the peer is added. * element 2: optional is seated to the relay time point or * is unseated if has not relayed yet. */ std::pair> @@ -203,18 +180,6 @@ public: int& flags, std::chrono::seconds tx_interval); - /** Determines whether the hashed item should be processed for the given - peer. Could be an incoming or outgoing message. - - Items filtered with this function should only be processed for the given - peer once. Unlike shouldProcess, it can be processed for other peers. - */ - bool - shouldProcessForPeer( - uint256 const& key, - PeerShortID peer, - std::chrono::seconds interval); - /** Set the flags on a hash. @return `true` if the flags were changed. `false` if unchanged. @@ -240,11 +205,6 @@ public: std::optional> shouldRelay(uint256 const& key); - /** Returns a copy of the set of peers in the Entry for the key - */ - std::set - getPeers(uint256 const& key); - private: // pair.second indicates whether the entry was created std::pair diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index 3800b359ef..a5c8200ccb 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -50,10 +50,10 @@ #include #include #include -#include #include #include #include +#include #include #include #include @@ -403,7 +403,7 @@ public: isFull() override; void - setMode(OperatingMode om, const char* reason) override; + setMode(OperatingMode om) override; bool isBlocked() override; @@ -874,7 +874,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const inline void NetworkOPsImp::setStandAlone() { - setMode(OperatingMode::FULL, "setStandAlone"); + setMode(OperatingMode::FULL); } inline void @@ -1022,9 +1022,7 @@ NetworkOPsImp::processHeartbeatTimer() { if (mMode != OperatingMode::DISCONNECTED) { - setMode( - OperatingMode::DISCONNECTED, - "Heartbeat: insufficient peers"); + setMode(OperatingMode::DISCONNECTED); JLOG(m_journal.warn()) << "Node count (" << numPeers << ") has fallen " << "below required minimum (" << minPeerCount_ << ")."; @@ -1040,7 +1038,7 @@ NetworkOPsImp::processHeartbeatTimer() if (mMode == OperatingMode::DISCONNECTED) { - setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers"); + setMode(OperatingMode::CONNECTED); JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient."; } @@ -1048,9 +1046,9 @@ NetworkOPsImp::processHeartbeatTimer() // Check if the last validated ledger forces a change between these // states. if (mMode == OperatingMode::SYNCING) - setMode(OperatingMode::SYNCING, "Heartbeat: check syncing"); + setMode(OperatingMode::SYNCING); else if (mMode == OperatingMode::CONNECTED) - setMode(OperatingMode::CONNECTED, "Heartbeat: check connected"); + setMode(OperatingMode::CONNECTED); } mConsensus.timerEntry(app_.timeKeeper().closeTime()); @@ -1616,7 +1614,7 @@ void NetworkOPsImp::setAmendmentBlocked() { amendmentBlocked_ = true; - setMode(OperatingMode::CONNECTED, "setAmendmentBlocked"); + setMode(OperatingMode::CONNECTED); } inline bool @@ -1647,7 +1645,7 @@ void NetworkOPsImp::setUNLBlocked() { unlBlocked_ = true; - setMode(OperatingMode::CONNECTED, "setUNLBlocked"); + setMode(OperatingMode::CONNECTED); } inline void @@ -1748,7 +1746,7 @@ NetworkOPsImp::checkLastClosedLedger( if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) { - setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger"); + setMode(OperatingMode::CONNECTED); } if (consensus) @@ -1835,9 +1833,8 @@ NetworkOPsImp::beginConsensus(uint256 const& networkClosed) // this shouldn't happen unless we jump ledgers if (mMode == OperatingMode::FULL) { - JLOG(m_journal.warn()) - << "beginConsensus Don't have LCL, going to tracking"; - setMode(OperatingMode::TRACKING, "beginConsensus: No LCL"); + JLOG(m_journal.warn()) << "Don't have LCL, going to tracking"; + setMode(OperatingMode::TRACKING); } return false; @@ -1947,7 +1944,7 @@ NetworkOPsImp::endConsensus() // validations we have for LCL. If the ledger is good enough, go to // TRACKING - TODO if (!needNetworkLedger_) - setMode(OperatingMode::TRACKING, "endConsensus: check tracking"); + setMode(OperatingMode::TRACKING); } if (((mMode == OperatingMode::CONNECTED) || @@ -1961,7 +1958,7 @@ NetworkOPsImp::endConsensus() if (app_.timeKeeper().now() < (current->info().parentCloseTime + 2 * current->info().closeTimeResolution)) { - setMode(OperatingMode::FULL, "endConsensus: check full"); + setMode(OperatingMode::FULL); } } @@ -1973,7 +1970,7 @@ NetworkOPsImp::consensusViewChange() { if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING)) { - setMode(OperatingMode::CONNECTED, "consensusViewChange"); + setMode(OperatingMode::CONNECTED); } } @@ -2291,7 +2288,7 @@ NetworkOPsImp::pubPeerStatus(std::function const& func) } void -NetworkOPsImp::setMode(OperatingMode om, const char* reason) +NetworkOPsImp::setMode(OperatingMode om) { using namespace std::chrono_literals; if (om == OperatingMode::CONNECTED) @@ -2311,12 +2308,11 @@ NetworkOPsImp::setMode(OperatingMode om, const char* reason) if (mMode == om) return; - auto const sink = om < mMode ? m_journal.warn() : m_journal.info(); mMode = om; accounting_.mode(om); - JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason; + JLOG(m_journal.info()) << "STATE->" << strOperatingMode(); pubServer(); } @@ -2328,28 +2324,34 @@ NetworkOPsImp::recvValidation( JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source; + std::unique_lock lock(validationsMutex_); + BypassAccept bypassAccept = BypassAccept::no; + try { - CanProcess const check( - validationsMutex_, pendingValidations_, val->getLedgerHash()); - try - { - BypassAccept bypassAccept = - check ? BypassAccept::no : BypassAccept::yes; - handleNewValidation(app_, val, source, bypassAccept, m_journal); - } - catch (std::exception const& e) - { - JLOG(m_journal.warn()) - << "Exception thrown for handling new validation " - << val->getLedgerHash() << ": " << e.what(); - } - catch (...) - { - JLOG(m_journal.warn()) - << "Unknown exception thrown for handling new validation " - << val->getLedgerHash(); - } + if (pendingValidations_.contains(val->getLedgerHash())) + bypassAccept = BypassAccept::yes; + else + pendingValidations_.insert(val->getLedgerHash()); + scope_unlock unlock(lock); + handleNewValidation(app_, val, source, bypassAccept, m_journal); } + catch (std::exception const& e) + { + JLOG(m_journal.warn()) + << "Exception thrown for handling new validation " + << val->getLedgerHash() << ": " << e.what(); + } + catch (...) + { + JLOG(m_journal.warn()) + << "Unknown exception thrown for handling new validation " + << val->getLedgerHash(); + } + if (bypassAccept == BypassAccept::no) + { + pendingValidations_.erase(val->getLedgerHash()); + } + lock.unlock(); pubValidation(val); diff --git a/src/xrpld/app/misc/NetworkOPs.h b/src/xrpld/app/misc/NetworkOPs.h index 96969f4bcb..166b9e9e11 100644 --- a/src/xrpld/app/misc/NetworkOPs.h +++ b/src/xrpld/app/misc/NetworkOPs.h @@ -197,7 +197,7 @@ public: virtual bool isFull() = 0; virtual void - setMode(OperatingMode om, const char* reason) = 0; + setMode(OperatingMode om) = 0; virtual bool isBlocked() = 0; virtual bool diff --git a/src/xrpld/overlay/Peer.h b/src/xrpld/overlay/Peer.h index b53fcb21a9..2646b24a3e 100644 --- a/src/xrpld/overlay/Peer.h +++ b/src/xrpld/overlay/Peer.h @@ -36,7 +36,6 @@ enum class ProtocolFeature { ValidatorListPropagation, ValidatorList2Propagation, LedgerReplay, - LedgerDataCookies }; /** Represents a peer connection in the overlay. */ @@ -134,13 +133,6 @@ public: virtual bool txReduceRelayEnabled() const = 0; - - // - // Messages - // - - virtual std::set> - releaseRequestCookies(uint256 const& requestHash) = 0; }; } // namespace ripple diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index 0fcff03111..c3656c9445 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -30,7 +30,6 @@ #include #include #include -#include #include #include #include @@ -58,9 +57,6 @@ std::chrono::milliseconds constexpr peerHighLatency{300}; /** How often we PING the peer to check for latency and sendq probe */ std::chrono::seconds constexpr peerTimerInterval{60}; - -/** How often we process duplicate incoming TMGetLedger messages */ -std::chrono::seconds constexpr getledgerInterval{15}; } // namespace // TODO: Remove this exclusion once unit tests are added after the hotfix @@ -508,8 +504,6 @@ PeerImp::supportsFeature(ProtocolFeature f) const return protocol_ >= make_protocol(2, 2); case ProtocolFeature::LedgerReplay: return ledgerReplayEnabled_; - case ProtocolFeature::LedgerDataCookies: - return protocol_ >= make_protocol(2, 3); } return false; } @@ -1352,9 +1346,8 @@ PeerImp::handleTransaction( void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg, bool chargefee = true) { - if (chargefee) - fee_.update(Resource::feeInvalidData, "get_ledger " + msg); + auto badData = [&](std::string const& msg) { + fee_.update(Resource::feeInvalidData, "get_ledger " + msg); JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; }; auto const itype{m->itype()}; @@ -1431,74 +1424,12 @@ PeerImp::onMessage(std::shared_ptr const& m) } } - // Drop duplicate requests from the same peer for at least - // `getLedgerInterval` seconds. - // Append a little junk to prevent the hash of an incoming messsage - // from matching the hash of the same outgoing message. - // `shouldProcessForPeer` does not distingish between incoming and - // outgoing, and some of the message relay logic checks the hash to see - // if the message has been relayed already. If the hashes are the same, - // a duplicate will be detected when sending the message is attempted, - // so it will fail. - auto const messageHash = sha512Half(*m, nullptr); - // Request cookies are not included in the hash. Track them here. - auto const requestCookie = [&m]() -> std::optional { - if (m->has_requestcookie()) - return m->requestcookie(); - return std::nullopt; - }(); - auto const [inserted, pending] = [&] { - std::lock_guard lock{cookieLock_}; - auto& cookies = messageRequestCookies_[messageHash]; - bool const pending = !cookies.empty(); - return std::pair{cookies.emplace(requestCookie).second, pending}; - }(); - // Check if the request has been seen from this peer. - if (!app_.getHashRouter().shouldProcessForPeer( - messageHash, id_, getledgerInterval)) - { - // This request has already been seen from this peer. - // Has it been seen with this request cookie (or lack thereof)? - - if (inserted) - { - // This is a duplicate request, but with a new cookie. When a - // response is ready, one will be sent for each request cookie. - JLOG(p_journal_.debug()) - << "TMGetLedger: duplicate request with new request cookie: " - << requestCookie.value_or(0) - << ". Job pending: " << (pending ? "yes" : "no") << ": " - << messageHash; - if (pending) - { - // Don't bother queueing up a new job if other requests are - // already pending. This should limit entries in the job queue - // to one per peer per unique request. - JLOG(p_journal_.debug()) - << "TMGetLedger: Suppressing recvGetLedger job, since one " - "is pending: " - << messageHash; - return; - } - } - else - { - // Don't punish nodes that don't know any better - return badData( - "duplicate request: " + to_string(messageHash), - supportsFeature(ProtocolFeature::LedgerDataCookies)); - } - } - // Queue a job to process the request - JLOG(p_journal_.debug()) - << "TMGetLedger: Adding recvGetLedger job: " << messageHash; std::weak_ptr weak = shared_from_this(); - app_.getJobQueue().addJob( - jtLEDGER_REQ, "recvGetLedger", [weak, m, messageHash]() { - if (auto peer = weak.lock()) - peer->processLedgerRequest(m, messageHash); - }); + app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m]() { + if (auto peer = weak.lock()) + peer->processLedgerRequest(m); + }); } void @@ -1614,9 +1545,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { - auto badData = [&](std::string const& msg, bool charge = true) { - if (charge) - fee_.update(Resource::feeInvalidData, msg); + auto badData = [&](std::string const& msg) { + fee_.update(Resource::feeInvalidData, msg); JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; }; @@ -1667,99 +1597,23 @@ PeerImp::onMessage(std::shared_ptr const& m) "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size())); } - auto const messageHash = sha512Half(*m); - if (!app_.getHashRouter().addSuppressionPeer(messageHash, id_)) + // If there is a request cookie, attempt to relay the message + if (m->has_requestcookie()) { - // Don't punish nodes that don't know any better - return badData( - "Duplicate message: " + to_string(messageHash), - supportsFeature(ProtocolFeature::LedgerDataCookies)); - } - - bool const routed = m->has_directresponse() || m->responsecookies_size() || - m->has_requestcookie(); - - { - // Check if this message needs to be forwarded to one or more peers. - // Maximum of one of the relevant fields should be populated. - XRPL_ASSERT( - !m->has_requestcookie() || !m->responsecookies_size(), - "ripple::PeerImp::onMessage(TMLedgerData) : valid cookie fields"); - - // Make a copy of the response cookies, then wipe the list so it can be - // forwarded cleanly - auto const responseCookies = m->responsecookies(); - m->clear_responsecookies(); - // Flag indicating if this response should be processed locally, - // possibly in addition to being forwarded. - bool const directResponse = - m->has_directresponse() && m->directresponse(); - m->clear_directresponse(); - - auto const relay = [this, m, &messageHash](auto const cookie) { - if (auto peer = overlay_.findPeerByShortID(cookie)) - { - XRPL_ASSERT( - !m->has_requestcookie() && !m->responsecookies_size(), - "ripple::PeerImp::onMessage(TMLedgerData) relay : no " - "cookies"); - if (peer->supportsFeature(ProtocolFeature::LedgerDataCookies)) - // Setting this flag is not _strictly_ necessary for peers - // that support it if there are no cookies included in the - // message, but it is more accurate. - m->set_directresponse(true); - else - m->clear_directresponse(); - peer->send( - std::make_shared(*m, protocol::mtLEDGER_DATA)); - } - else - JLOG(p_journal_.info()) - << "Unable to route TX/ledger data reply to peer [" - << cookie << "]: " << messageHash; - }; - // If there is a request cookie, attempt to relay the message - if (m->has_requestcookie()) + if (auto peer = overlay_.findPeerByShortID(m->requestcookie())) { - XRPL_ASSERT( - responseCookies.empty(), - "ripple::PeerImp::onMessage(TMLedgerData) : no response " - "cookies"); m->clear_requestcookie(); - relay(m->requestcookie()); - if (!directResponse && responseCookies.empty()) - return; + peer->send(std::make_shared(*m, protocol::mtLEDGER_DATA)); } - // If there's a list of request cookies, attempt to relay the message to - // all of them. - if (responseCookies.size()) + else { - for (auto const cookie : responseCookies) - relay(cookie); - if (!directResponse) - return; - } - } - - // Now that any forwarding is done check the base message (data only, no - // routing info for duplicates) - if (routed) - { - m->clear_directresponse(); - XRPL_ASSERT( - !m->has_requestcookie() && !m->responsecookies_size(), - "ripple::PeerImp::onMessage(TMLedgerData) : no cookies"); - auto const baseMessageHash = sha512Half(*m); - if (!app_.getHashRouter().addSuppressionPeer(baseMessageHash, id_)) - { - // Don't punish nodes that don't know any better - return badData( - "Duplicate message: " + to_string(baseMessageHash), - supportsFeature(ProtocolFeature::LedgerDataCookies)); + JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply"; } + return; } uint256 const ledgerHash{m->ledgerhash()}; + // Otherwise check if received data for a candidate transaction set if (m->type() == protocol::liTS_CANDIDATE) { @@ -3143,22 +2997,16 @@ PeerImp::checkValidation( // the TX tree with the specified root hash. // static std::shared_ptr -getPeerWithTree( - OverlayImpl& ov, - uint256 const& rootHash, - PeerImp const* skip, - std::function shouldProcessCallback) +getPeerWithTree(OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip) { std::shared_ptr ret; int retScore = 0; - XRPL_ASSERT( - shouldProcessCallback, "ripple::getPeerWithTree : callback provided"); ov.for_each([&](std::shared_ptr&& p) { if (p->hasTxSet(rootHash) && p.get() != skip) { auto score = p->getScore(true); - if (!ret || (score > retScore && shouldProcessCallback(p->id()))) + if (!ret || (score > retScore)) { ret = std::move(p); retScore = score; @@ -3177,19 +3025,16 @@ getPeerWithLedger( OverlayImpl& ov, uint256 const& ledgerHash, LedgerIndex ledger, - PeerImp const* skip, - std::function shouldProcessCallback) + PeerImp const* skip) { std::shared_ptr ret; int retScore = 0; - XRPL_ASSERT( - shouldProcessCallback, "ripple::getPeerWithLedger : callback provided"); ov.for_each([&](std::shared_ptr&& p) { if (p->hasLedger(ledgerHash, ledger) && p.get() != skip) { auto score = p->getScore(true); - if (!ret || (score > retScore && shouldProcessCallback(p->id()))) + if (!ret || (score > retScore)) { ret = std::move(p); retScore = score; @@ -3203,8 +3048,7 @@ getPeerWithLedger( void PeerImp::sendLedgerBase( std::shared_ptr const& ledger, - protocol::TMLedgerData& ledgerData, - PeerCookieMap const& destinations) + protocol::TMLedgerData& ledgerData) { JLOG(p_journal_.trace()) << "sendLedgerBase: Base data"; @@ -3236,102 +3080,15 @@ PeerImp::sendLedgerBase( } } - sendToMultiple(ledgerData, destinations); -} - -void -PeerImp::sendToMultiple( - protocol::TMLedgerData& ledgerData, - PeerCookieMap const& destinations) -{ - bool foundSelf = false; - for (auto const& [peer, cookies] : destinations) - { - if (peer.get() == this) - foundSelf = true; - bool const multipleCookies = - peer->supportsFeature(ProtocolFeature::LedgerDataCookies); - std::vector sendCookies; - - bool directResponse = false; - if (!multipleCookies) - { - JLOG(p_journal_.debug()) - << "sendToMultiple: Sending " << cookies.size() - << " TMLedgerData messages to peer [" << peer->id() - << "]: " << sha512Half(ledgerData); - } - for (auto const& cookie : cookies) - { - // Unfortunately, need a separate Message object for every - // combination - if (cookie) - { - if (multipleCookies) - { - // Save this one for later to send a single message - sendCookies.emplace_back(*cookie); - continue; - } - - // Feature not supported, so send a single message with a - // single cookie - ledgerData.set_requestcookie(*cookie); - } - else - { - if (multipleCookies) - { - // Set this flag later on the single message - directResponse = true; - continue; - } - - ledgerData.clear_requestcookie(); - } - XRPL_ASSERT( - !multipleCookies, - "ripple::PeerImp::sendToMultiple : ledger data cookies " - "unsupported"); - auto message{ - std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; - peer->send(message); - } - if (multipleCookies) - { - // Send a single message with all the cookies and/or the direct - // response flag, so the receiver can farm out the single message to - // multiple peers and/or itself - XRPL_ASSERT( - sendCookies.size() || directResponse, - "ripple::PeerImp::sendToMultiple : valid response options"); - ledgerData.clear_requestcookie(); - ledgerData.clear_responsecookies(); - ledgerData.set_directresponse(directResponse); - for (auto const& cookie : sendCookies) - ledgerData.add_responsecookies(cookie); - auto message{ - std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; - peer->send(message); - - JLOG(p_journal_.debug()) - << "sendToMultiple: Sent 1 TMLedgerData message to peer [" - << peer->id() << "]: including " - << (directResponse ? "the direct response flag and " : "") - << sendCookies.size() << " response cookies. " - << ": " << sha512Half(ledgerData); - } - } - XRPL_ASSERT( - foundSelf, "ripple::PeerImp::sendToMultiple : current peer included"); + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + send(message); } std::shared_ptr -PeerImp::getLedger( - std::shared_ptr const& m, - uint256 const& mHash) +PeerImp::getLedger(std::shared_ptr const& m) { - JLOG(p_journal_.trace()) << "getLedger: Ledger " << mHash; + JLOG(p_journal_.trace()) << "getLedger: Ledger"; std::shared_ptr ledger; @@ -3348,33 +3105,22 @@ PeerImp::getLedger( if (m->has_querytype() && !m->has_requestcookie()) { // Attempt to relay the request to a peer - // Note repeated messages will not relay to the same peer - // before `getLedgerInterval` seconds. This prevents one - // peer from getting flooded, and distributes the request - // load. If a request has been relayed to all eligible - // peers, then this message will not be relayed. if (auto const peer = getPeerWithLedger( overlay_, ledgerHash, m->has_ledgerseq() ? m->ledgerseq() : 0, - this, - [&](Peer::id_t id) { - return app_.getHashRouter().shouldProcessForPeer( - mHash, id, getledgerInterval); - })) + this)) { m->set_requestcookie(id()); peer->send( std::make_shared(*m, protocol::mtGET_LEDGER)); JLOG(p_journal_.debug()) - << "getLedger: Request relayed to peer [" << peer->id() - << "]: " << mHash; + << "getLedger: Request relayed to peer"; return ledger; } JLOG(p_journal_.trace()) - << "getLedger: Don't have ledger with hash " << ledgerHash - << ": " << mHash; + << "getLedger: Failed to find peer to relay request"; } } } @@ -3384,7 +3130,7 @@ PeerImp::getLedger( if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) { JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request " << mHash; + << "getLedger: Early ledger sequence request"; } else { @@ -3393,7 +3139,7 @@ PeerImp::getLedger( { JLOG(p_journal_.debug()) << "getLedger: Don't have ledger with sequence " - << m->ledgerseq() << ": " << mHash; + << m->ledgerseq(); } } } @@ -3416,33 +3162,29 @@ PeerImp::getLedger( Resource::feeMalformedRequest, "get_ledger ledgerSeq"); ledger.reset(); - JLOG(p_journal_.warn()) << "getLedger: Invalid ledger sequence " - << ledgerSeq << ": " << mHash; + JLOG(p_journal_.warn()) + << "getLedger: Invalid ledger sequence " << ledgerSeq; } } else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch()) { ledger.reset(); JLOG(p_journal_.debug()) - << "getLedger: Early ledger sequence request " << ledgerSeq - << ": " << mHash; + << "getLedger: Early ledger sequence request " << ledgerSeq; } } else { - JLOG(p_journal_.debug()) - << "getLedger: Unable to find ledger " << mHash; + JLOG(p_journal_.debug()) << "getLedger: Unable to find ledger"; } return ledger; } std::shared_ptr -PeerImp::getTxSet( - std::shared_ptr const& m, - uint256 const& mHash) const +PeerImp::getTxSet(std::shared_ptr const& m) const { - JLOG(p_journal_.trace()) << "getTxSet: TX set " << mHash; + JLOG(p_journal_.trace()) << "getTxSet: TX set"; uint256 const txSetHash{m->ledgerhash()}; std::shared_ptr shaMap{ @@ -3452,34 +3194,22 @@ PeerImp::getTxSet( if (m->has_querytype() && !m->has_requestcookie()) { // Attempt to relay the request to a peer - // Note repeated messages will not relay to the same peer - // before `getLedgerInterval` seconds. This prevents one - // peer from getting flooded, and distributes the request - // load. If a request has been relayed to all eligible - // peers, then this message will not be relayed. - if (auto const peer = getPeerWithTree( - overlay_, txSetHash, this, [&](Peer::id_t id) { - return app_.getHashRouter().shouldProcessForPeer( - mHash, id, getledgerInterval); - })) + if (auto const peer = getPeerWithTree(overlay_, txSetHash, this)) { m->set_requestcookie(id()); peer->send( std::make_shared(*m, protocol::mtGET_LEDGER)); - JLOG(p_journal_.debug()) - << "getTxSet: Request relayed to peer [" << peer->id() - << "]: " << mHash; + JLOG(p_journal_.debug()) << "getTxSet: Request relayed"; } else { JLOG(p_journal_.debug()) - << "getTxSet: Failed to find relay peer: " << mHash; + << "getTxSet: Failed to find relay peer"; } } else { - JLOG(p_journal_.debug()) - << "getTxSet: Failed to find TX set " << mHash; + JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set"; } } @@ -3487,9 +3217,7 @@ PeerImp::getTxSet( } void -PeerImp::processLedgerRequest( - std::shared_ptr const& m, - uint256 const& mHash) +PeerImp::processLedgerRequest(std::shared_ptr const& m) { // Do not resource charge a peer responding to a relay if (!m->has_requestcookie()) @@ -3503,74 +3231,9 @@ PeerImp::processLedgerRequest( bool fatLeaves{true}; auto const itype{m->itype()}; - auto getDestinations = [&] { - // If a ledger data message is generated, it's going to be sent to every - // peer that is waiting for it. - - PeerCookieMap result; - - std::size_t numCookies = 0; - { - // Don't do the work under this peer if this peer is not waiting for - // any replies - auto myCookies = releaseRequestCookies(mHash); - if (myCookies.empty()) - { - JLOG(p_journal_.debug()) << "TMGetLedger: peer is no longer " - "waiting for response to request: " - << mHash; - return result; - } - numCookies += myCookies.size(); - result[shared_from_this()] = myCookies; - } - - std::set const peers = - app_.getHashRouter().getPeers(mHash); - for (auto const peerID : peers) - { - // This loop does not need to be done under the HashRouter - // lock because findPeerByShortID and releaseRequestCookies - // are thread safe, and everything else is local - if (auto p = overlay_.findPeerByShortID(peerID)) - { - auto cookies = p->releaseRequestCookies(mHash); - numCookies += cookies.size(); - if (result.contains(p)) - { - // Unlikely, but if a request came in to this peer while - // iterating, add the items instead of copying / - // overwriting. - XRPL_ASSERT( - p.get() == this, - "ripple::PeerImp::processLedgerRequest : found self in " - "map"); - for (auto const& cookie : cookies) - result[p].emplace(cookie); - } - else if (cookies.size()) - result[p] = cookies; - } - } - - JLOG(p_journal_.debug()) - << "TMGetLedger: Processing request for " << result.size() - << " peers. Will send " << numCookies - << " messages if successful: " << mHash; - - return result; - }; - // Will only populate this if we're going to do work. - PeerCookieMap destinations; - if (itype == protocol::liTS_CANDIDATE) { - destinations = getDestinations(); - if (destinations.empty()) - // Nowhere to send the response! - return; - - if (sharedMap = getTxSet(m, mHash); !sharedMap) + if (sharedMap = getTxSet(m); !sharedMap) return; map = sharedMap.get(); @@ -3578,6 +3241,8 @@ PeerImp::processLedgerRequest( ledgerData.set_ledgerseq(0); ledgerData.set_ledgerhash(m->ledgerhash()); ledgerData.set_type(protocol::liTS_CANDIDATE); + if (m->has_requestcookie()) + ledgerData.set_requestcookie(m->requestcookie()); // We'll already have most transactions fatLeaves = false; @@ -3596,12 +3261,7 @@ PeerImp::processLedgerRequest( return; } - destinations = getDestinations(); - if (destinations.empty()) - // Nowhere to send the response! - return; - - if (ledger = getLedger(m, mHash); !ledger) + if (ledger = getLedger(m); !ledger) return; // Fill out the reply @@ -3609,11 +3269,13 @@ PeerImp::processLedgerRequest( ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size()); ledgerData.set_ledgerseq(ledger->info().seq); ledgerData.set_type(itype); + if (m->has_requestcookie()) + ledgerData.set_requestcookie(m->requestcookie()); switch (itype) { case protocol::liBASE: - sendLedgerBase(ledger, ledgerData, destinations); + sendLedgerBase(ledger, ledgerData); return; case protocol::liTX_NODE: @@ -3730,7 +3392,7 @@ PeerImp::processLedgerRequest( if (ledgerData.nodes_size() == 0) return; - sendToMultiple(ledgerData, destinations); + send(std::make_shared(ledgerData, protocol::mtLEDGER_DATA)); } int @@ -3788,19 +3450,6 @@ PeerImp::reduceRelayReady() return vpReduceRelayEnabled_ && reduceRelayReady_; } -std::set> -PeerImp::releaseRequestCookies(uint256 const& requestHash) -{ - std::set> result; - std::lock_guard lock(cookieLock_); - if (messageRequestCookies_.contains(requestHash)) - { - std::swap(result, messageRequestCookies_[requestHash]); - messageRequestCookies_.erase(requestHash); - } - return result; -}; - void PeerImp::Metrics::add_message(std::uint64_t bytes) { diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 7db2ecf5f0..14591efbb1 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -195,15 +195,6 @@ private: bool ledgerReplayEnabled_ = false; LedgerReplayMsgHandler ledgerReplayMsgHandler_; - // Track message requests and responses - // TODO: Use an expiring cache or something - using MessageCookieMap = - std::map>>; - using PeerCookieMap = - std::map, std::set>>; - std::mutex mutable cookieLock_; - MessageCookieMap messageRequestCookies_; - friend class OverlayImpl; class Metrics @@ -450,13 +441,6 @@ public: return txReduceRelayEnabled_; } - // - // Messages - // - - std::set> - releaseRequestCookies(uint256 const& requestHash) override; - private: void close(); @@ -655,28 +639,16 @@ private: void sendLedgerBase( std::shared_ptr const& ledger, - protocol::TMLedgerData& ledgerData, - PeerCookieMap const& destinations); - - void - sendToMultiple( - protocol::TMLedgerData& ledgerData, - PeerCookieMap const& destinations); + protocol::TMLedgerData& ledgerData); std::shared_ptr - getLedger( - std::shared_ptr const& m, - uint256 const& mHash); + getLedger(std::shared_ptr const& m); std::shared_ptr - getTxSet( - std::shared_ptr const& m, - uint256 const& mHash) const; + getTxSet(std::shared_ptr const& m) const; void - processLedgerRequest( - std::shared_ptr const& m, - uint256 const& mHash); + processLedgerRequest(std::shared_ptr const& m); }; //------------------------------------------------------------------------------ diff --git a/src/xrpld/overlay/detail/PeerSet.cpp b/src/xrpld/overlay/detail/PeerSet.cpp index cb7b77db7f..909b20c307 100644 --- a/src/xrpld/overlay/detail/PeerSet.cpp +++ b/src/xrpld/overlay/detail/PeerSet.cpp @@ -18,11 +18,9 @@ //============================================================================== #include -#include #include #include #include -#include namespace ripple { @@ -106,52 +104,16 @@ PeerSetImpl::sendRequest( std::shared_ptr const& peer) { auto packet = std::make_shared(message, type); - - auto const messageHash = [&]() { - auto const packetBuffer = - packet->getBuffer(compression::Compressed::Off); - return sha512Half(Slice(packetBuffer.data(), packetBuffer.size())); - }(); - - // Allow messages to be re-sent to the same peer after a delay - using namespace std::chrono_literals; - constexpr std::chrono::seconds interval = 30s; - if (peer) { - if (app_.getHashRouter().shouldProcessForPeer( - messageHash, peer->id(), interval)) - { - JLOG(journal_.trace()) - << "Sending " << protocolMessageName(type) << " message to [" - << peer->id() << "]: " << messageHash; - peer->send(packet); - } - else - JLOG(journal_.debug()) - << "Suppressing sending duplicate " << protocolMessageName(type) - << " message to [" << peer->id() << "]: " << messageHash; + peer->send(packet); return; } for (auto id : peers_) { if (auto p = app_.overlay().findPeerByShortID(id)) - { - if (app_.getHashRouter().shouldProcessForPeer( - messageHash, p->id(), interval)) - { - JLOG(journal_.trace()) - << "Sending " << protocolMessageName(type) - << " message to [" << p->id() << "]: " << messageHash; - p->send(packet); - } - else - JLOG(journal_.debug()) - << "Suppressing sending duplicate " - << protocolMessageName(type) << " message to [" << p->id() - << "]: " << messageHash; - } + p->send(packet); } } diff --git a/src/xrpld/overlay/detail/ProtocolMessage.h b/src/xrpld/overlay/detail/ProtocolMessage.h index 86b630081a..54f99eb73d 100644 --- a/src/xrpld/overlay/detail/ProtocolMessage.h +++ b/src/xrpld/overlay/detail/ProtocolMessage.h @@ -43,12 +43,6 @@ protocolMessageType(protocol::TMGetLedger const&) return protocol::mtGET_LEDGER; } -inline protocol::MessageType -protocolMessageType(protocol::TMLedgerData const&) -{ - return protocol::mtLEDGER_DATA; -} - inline protocol::MessageType protocolMessageType(protocol::TMReplayDeltaRequest const&) { @@ -492,64 +486,4 @@ invokeProtocolMessage( } // namespace ripple -namespace protocol { - -template -void -hash_append(Hasher& h, TMGetLedger const& msg) -{ - using beast::hash_append; - using namespace ripple; - hash_append(h, safe_cast(protocolMessageType(msg))); - hash_append(h, safe_cast(msg.itype())); - if (msg.has_ltype()) - hash_append(h, safe_cast(msg.ltype())); - - if (msg.has_ledgerhash()) - hash_append(h, msg.ledgerhash()); - - if (msg.has_ledgerseq()) - hash_append(h, msg.ledgerseq()); - - for (auto const& nodeId : msg.nodeids()) - hash_append(h, nodeId); - hash_append(h, msg.nodeids_size()); - - // Do NOT include the request cookie. It does not affect the content of the - // request, but only where to route the results. - // if (msg.has_requestcookie()) - // hash_append(h, msg.requestcookie()); - - if (msg.has_querytype()) - hash_append(h, safe_cast(msg.querytype())); - - if (msg.has_querydepth()) - hash_append(h, msg.querydepth()); -} - -template -void -hash_append(Hasher& h, TMLedgerData const& msg) -{ - using beast::hash_append; - using namespace ripple; - hash_append(h, safe_cast(protocolMessageType(msg))); - hash_append(h, msg.ledgerhash()); - hash_append(h, msg.ledgerseq()); - hash_append(h, safe_cast(msg.type())); - for (auto const& node : msg.nodes()) - { - hash_append(h, node.nodedata()); - if (node.has_nodeid()) - hash_append(h, node.nodeid()); - } - hash_append(h, msg.nodes_size()); - if (msg.has_requestcookie()) - hash_append(h, msg.requestcookie()); - if (msg.has_error()) - hash_append(h, safe_cast(msg.error())); -} - -} // namespace protocol - #endif diff --git a/src/xrpld/overlay/detail/ProtocolVersion.cpp b/src/xrpld/overlay/detail/ProtocolVersion.cpp index ce6c1e6fa3..0fecb301f7 100644 --- a/src/xrpld/overlay/detail/ProtocolVersion.cpp +++ b/src/xrpld/overlay/detail/ProtocolVersion.cpp @@ -37,9 +37,7 @@ namespace ripple { constexpr ProtocolVersion const supportedProtocolList[] { {2, 1}, - {2, 2}, - // Adds TMLedgerData::responseCookies and directResponse - {2, 3} + {2, 2} }; // clang-format on