diff --git a/include/xrpl/basics/CanProcess.h b/include/xrpl/basics/CanProcess.h new file mode 100644 index 0000000000..3ee49d0087 --- /dev/null +++ b/include/xrpl/basics/CanProcess.h @@ -0,0 +1,134 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2024 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED +#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED + +#include +#include +#include + +/** RAII class to check if an Item is already being processed on another thread, + * as indicated by it's presence in a Collection. + * + * If the Item is not in the Collection, it will be added under lock in the + * ctor, and removed under lock in the dtor. The object will be considered + * "usable" and evaluate to `true`. + * + * If the Item is in the Collection, no changes will be made to the collection, + * and the CanProcess object will be considered "unusable". + * + * It's up to the caller to decide what "usable" and "unusable" mean. (e.g. + * Process or skip a block of code, or set a flag.) + * + * The current use is to avoid lock contention that would be involved in + * processing something associated with the Item. + * + * Examples: + * + * void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...) + * { + * if (CanProcess check{acquiresMutex_, pendingAcquires_, hash}) + * { + * acquire(hash, ...); + * } + * } + * + * bool + * NetworkOPsImp::recvValidation( + * std::shared_ptr const& val, + * std::string const& source) + * { + * CanProcess check( + * validationsMutex_, pendingValidations_, val->getLedgerHash()); + * BypassAccept bypassAccept = + * check ? BypassAccept::no : BypassAccept::yes; + * handleNewValidation(app_, val, source, bypassAccept, m_journal); + * } + * + */ +class CanProcess +{ +public: + template + CanProcess(Mutex& mtx, Collection& collection, Item const& item) + : cleanup_(insert(mtx, collection, item)) + { + } + + ~CanProcess() + { + if (cleanup_) + cleanup_(); + } + + explicit + operator bool() const + { + return static_cast(cleanup_); + } + +private: + template + std::function + doInsert(Mutex& mtx, Collection& collection, Item const& item) + { + std::unique_lock lock(mtx); + // TODO: Use structured binding once LLVM 16 is the minimum supported + // version. See also: https://github.com/llvm/llvm-project/issues/48582 + // https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c + auto const insertResult = collection.insert(item); + auto const it = insertResult.first; + if (!insertResult.second) + return {}; + if constexpr (useIterator) + return [&, it]() { + std::unique_lock lock(mtx); + collection.erase(it); + }; + else + return [&]() { + std::unique_lock lock(mtx); + collection.erase(item); + }; + } + + // Generic insert() function doesn't use iterators because they may get + // invalidated + template + std::function + insert(Mutex& mtx, Collection& collection, Item const& item) + { + return doInsert(mtx, collection, item); + } + + // Specialize insert() for std::set, which does not invalidate iterators for + // insert and erase + template + std::function + insert(Mutex& mtx, std::set& collection, Item const& item) + { + return doInsert(mtx, collection, item); + } + + // If set, then the item is "usable" + std::function cleanup_; +}; + +#endif diff --git a/include/xrpl/protocol/LedgerHeader.h b/include/xrpl/protocol/LedgerHeader.h index 69368f9e5e..e37bda697b 100644 --- a/include/xrpl/protocol/LedgerHeader.h +++ b/include/xrpl/protocol/LedgerHeader.h @@ -36,6 +36,8 @@ struct LedgerHeader // If validated is false, it means "not yet validated." // Once validated is true, it will never be set false at a later time. + // NOTE: If you are accessing this directly, you are probably doing it + // wrong. Use LedgerMaster::isValidated(). // VFALCO TODO Make this not mutable bool mutable validated = false; bool accepted = false; diff --git a/src/xrpld/app/consensus/RCLConsensus.cpp b/src/xrpld/app/consensus/RCLConsensus.cpp index 7734ab790d..6823101411 100644 --- a/src/xrpld/app/consensus/RCLConsensus.cpp +++ b/src/xrpld/app/consensus/RCLConsensus.cpp @@ -1059,7 +1059,8 @@ void RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const { if (!positions && app_.getOPs().isFull()) - app_.getOPs().setMode(OperatingMode::CONNECTED); + app_.getOPs().setMode( + OperatingMode::CONNECTED, "updateOperatingMode: no positions"); } void diff --git a/src/xrpld/app/ledger/detail/InboundLedger.cpp b/src/xrpld/app/ledger/detail/InboundLedger.cpp index 5bfa9144d3..7ebb21ecc3 100644 --- a/src/xrpld/app/ledger/detail/InboundLedger.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedger.cpp @@ -373,7 +373,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&) if (!wasProgress) { - checkLocal(); + if (checkLocal()) + { + // Done. Something else (probably consensus) built the ledger + // locally while waiting for data (or possibly before requesting) + XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done"); + JLOG(journal_.info()) << "Finished while waiting " << hash_; + return; + } mByHash = true; diff --git a/src/xrpld/app/ledger/detail/InboundLedgers.cpp b/src/xrpld/app/ledger/detail/InboundLedgers.cpp index 7e1ba88094..93f787399d 100644 --- a/src/xrpld/app/ledger/detail/InboundLedgers.cpp +++ b/src/xrpld/app/ledger/detail/InboundLedgers.cpp @@ -5,9 +5,9 @@ #include #include +#include #include #include -#include #include #include @@ -64,12 +64,15 @@ public: (reason != InboundLedger::Reason::CONSENSUS)) return {}; + std::stringstream ss; + bool isNew = true; std::shared_ptr inbound; { ScopedLockType sl(mLock); if (stopping_) { + JLOG(j_.debug()) << "Abort(stopping): " << ss.str(); return {}; } @@ -93,23 +96,29 @@ public: ++mCounter; } } + ss << " IsNew: " << (isNew ? "true" : "false"); if (inbound->isFailed()) + { + JLOG(j_.debug()) << "Abort(failed): " << ss.str(); return {}; + } if (!isNew) inbound->update(seq); if (!inbound->isComplete()) + { + JLOG(j_.debug()) << "InProgress: " << ss.str(); return {}; + } + JLOG(j_.debug()) << "Complete: " << ss.str(); return inbound->getLedger(); }; using namespace std::chrono_literals; - std::shared_ptr ledger = perf::measureDurationAndLog( + return perf::measureDurationAndLog( doAcquire, "InboundLedgersImp::acquire", 500ms, j_); - - return ledger; } void @@ -118,28 +127,25 @@ public: std::uint32_t seq, InboundLedger::Reason reason) override { - std::unique_lock lock(acquiresMutex_); - try + if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash}) { - if (pendingAcquires_.contains(hash)) - return; - pendingAcquires_.insert(hash); - scope_unlock unlock(lock); - acquire(hash, seq, reason); + try + { + acquire(hash, seq, reason); + } + catch (std::exception const& e) + { + JLOG(j_.warn()) + << "Exception thrown for acquiring new inbound ledger " + << hash << ": " << e.what(); + } + catch (...) + { + JLOG(j_.warn()) << "Unknown exception thrown for acquiring new " + "inbound ledger " + << hash; + } } - catch (std::exception const& e) - { - JLOG(j_.warn()) - << "Exception thrown for acquiring new inbound ledger " << hash - << ": " << e.what(); - } - catch (...) - { - JLOG(j_.warn()) - << "Unknown exception thrown for acquiring new inbound ledger " - << hash; - } - pendingAcquires_.erase(hash); } std::shared_ptr diff --git a/src/xrpld/app/ledger/detail/LedgerMaster.cpp b/src/xrpld/app/ledger/detail/LedgerMaster.cpp index 0c3b3266d9..eb86c099dc 100644 --- a/src/xrpld/app/ledger/detail/LedgerMaster.cpp +++ b/src/xrpld/app/ledger/detail/LedgerMaster.cpp @@ -942,8 +942,9 @@ LedgerMaster::checkAccept(std::shared_ptr const& ledger) } JLOG(m_journal.info()) << "Advancing accepted ledger to " - << ledger->info().seq << " with >= " << minVal - << " validations"; + << ledger->info().seq << " (" + << to_short_string(ledger->info().hash) + << ") with >= " << minVal << " validations"; ledger->setValidated(); ledger->setFull(); diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp index 6db280ce8e..19410b77c1 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.cpp +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.cpp @@ -12,7 +12,8 @@ TimeoutCounter::TimeoutCounter( QueueJobParameter&& jobParameter, beast::Journal journal) : app_(app) - , journal_(journal) + , sink_(journal, to_short_string(hash) + " ") + , journal_(sink_) , hash_(hash) , timeouts_(0) , complete_(false) @@ -32,6 +33,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl) { if (isDone()) return; + JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count() + << "ms"; timer_.expires_after(timerInterval_); timer_.async_wait( [wptr = pmDowncast()](boost::system::error_code const& ec) { @@ -40,6 +43,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl) if (auto ptr = wptr.lock()) { + JLOG(ptr->journal_.debug()) + << "timer: ec: " << ec << " (operation_aborted: " + << boost::asio::error::operation_aborted << " - " + << (ec == boost::asio::error::operation_aborted ? "aborted" + : "other") + << ")"; ScopedLockType sl(ptr->mtx_); ptr->queueJob(sl); } diff --git a/src/xrpld/app/ledger/detail/TimeoutCounter.h b/src/xrpld/app/ledger/detail/TimeoutCounter.h index e97882ef1e..956b0e9ea8 100644 --- a/src/xrpld/app/ledger/detail/TimeoutCounter.h +++ b/src/xrpld/app/ledger/detail/TimeoutCounter.h @@ -5,6 +5,7 @@ #include #include +#include #include @@ -104,6 +105,7 @@ protected: // Used in this class for access to boost::asio::io_context and // ripple::Overlay. Used in subtypes for the kitchen sink. Application& app_; + beast::WrappedSink sink_; beast::Journal journal_; mutable std::recursive_mutex mtx_; diff --git a/src/xrpld/app/misc/HashRouter.h b/src/xrpld/app/misc/HashRouter.h index f7ecb153fe..6b8ec58fa8 100644 --- a/src/xrpld/app/misc/HashRouter.h +++ b/src/xrpld/app/misc/HashRouter.h @@ -203,7 +203,7 @@ public: /** Add a suppression peer and get message's relay status. * Return pair: - * element 1: true if the peer is added. + * element 1: true if the key is added. * element 2: optional is seated to the relay time point or * is unseated if has not relayed yet. */ std::pair> diff --git a/src/xrpld/app/misc/NetworkOPs.cpp b/src/xrpld/app/misc/NetworkOPs.cpp index ebe539523b..5686e46458 100644 --- a/src/xrpld/app/misc/NetworkOPs.cpp +++ b/src/xrpld/app/misc/NetworkOPs.cpp @@ -34,10 +34,10 @@ #include #include +#include #include #include #include -#include #include #include #include @@ -408,7 +408,7 @@ public: isFull() override; void - setMode(OperatingMode om) override; + setMode(OperatingMode om, const char* reason) override; bool isBlocked() override; @@ -886,7 +886,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const inline void NetworkOPsImp::setStandAlone() { - setMode(OperatingMode::FULL); + setMode(OperatingMode::FULL, "setStandAlone"); } inline void @@ -1036,7 +1036,9 @@ NetworkOPsImp::processHeartbeatTimer() { if (mMode != OperatingMode::DISCONNECTED) { - setMode(OperatingMode::DISCONNECTED); + setMode( + OperatingMode::DISCONNECTED, + "Heartbeat: insufficient peers"); std::stringstream ss; ss << "Node count (" << numPeers << ") has fallen " << "below required minimum (" << minPeerCount_ << ")."; @@ -1061,7 +1063,7 @@ NetworkOPsImp::processHeartbeatTimer() if (mMode == OperatingMode::DISCONNECTED) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers"); JLOG(m_journal.info()) << "Node count (" << numPeers << ") is sufficient."; CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers @@ -1073,9 +1075,9 @@ NetworkOPsImp::processHeartbeatTimer() auto origMode = mMode.load(); CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true); if (mMode == OperatingMode::SYNCING) - setMode(OperatingMode::SYNCING); + setMode(OperatingMode::SYNCING, "Heartbeat: check syncing"); else if (mMode == OperatingMode::CONNECTED) - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "Heartbeat: check connected"); auto newMode = mMode.load(); if (origMode != newMode) { @@ -1824,7 +1826,7 @@ void NetworkOPsImp::setAmendmentBlocked() { amendmentBlocked_ = true; - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "setAmendmentBlocked"); } inline bool @@ -1855,7 +1857,7 @@ void NetworkOPsImp::setUNLBlocked() { unlBlocked_ = true; - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "setUNLBlocked"); } inline void @@ -1956,7 +1958,7 @@ NetworkOPsImp::checkLastClosedLedger( if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger"); } if (consensus) @@ -2045,8 +2047,9 @@ NetworkOPsImp::beginConsensus( // this shouldn't happen unless we jump ledgers if (mMode == OperatingMode::FULL) { - JLOG(m_journal.warn()) << "Don't have LCL, going to tracking"; - setMode(OperatingMode::TRACKING); + JLOG(m_journal.warn()) + << "beginConsensus Don't have LCL, going to tracking"; + setMode(OperatingMode::TRACKING, "beginConsensus: No LCL"); CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. "; } @@ -2182,7 +2185,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr const& clog) // validations we have for LCL. If the ledger is good enough, go to // TRACKING - TODO if (!needNetworkLedger_) - setMode(OperatingMode::TRACKING); + setMode(OperatingMode::TRACKING, "endConsensus: check tracking"); } if (((mMode == OperatingMode::CONNECTED) || @@ -2196,7 +2199,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr const& clog) if (app_.timeKeeper().now() < (current->info().parentCloseTime + 2 * current->info().closeTimeResolution)) { - setMode(OperatingMode::FULL); + setMode(OperatingMode::FULL, "endConsensus: check full"); } } @@ -2208,7 +2211,7 @@ NetworkOPsImp::consensusViewChange() { if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING)) { - setMode(OperatingMode::CONNECTED); + setMode(OperatingMode::CONNECTED, "consensusViewChange"); } } @@ -2527,7 +2530,7 @@ NetworkOPsImp::pubPeerStatus(std::function const& func) } void -NetworkOPsImp::setMode(OperatingMode om) +NetworkOPsImp::setMode(OperatingMode om, const char* reason) { using namespace std::chrono_literals; if (om == OperatingMode::CONNECTED) @@ -2547,11 +2550,12 @@ NetworkOPsImp::setMode(OperatingMode om) if (mMode == om) return; + auto const sink = om < mMode ? m_journal.warn() : m_journal.info(); mMode = om; accounting_.mode(om); - JLOG(m_journal.info()) << "STATE->" << strOperatingMode(); + JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason; pubServer(); } @@ -2563,34 +2567,28 @@ NetworkOPsImp::recvValidation( JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source; - std::unique_lock lock(validationsMutex_); - BypassAccept bypassAccept = BypassAccept::no; - try { - if (pendingValidations_.contains(val->getLedgerHash())) - bypassAccept = BypassAccept::yes; - else - pendingValidations_.insert(val->getLedgerHash()); - scope_unlock unlock(lock); - handleNewValidation(app_, val, source, bypassAccept, m_journal); + CanProcess const check( + validationsMutex_, pendingValidations_, val->getLedgerHash()); + try + { + BypassAccept bypassAccept = + check ? BypassAccept::no : BypassAccept::yes; + handleNewValidation(app_, val, source, bypassAccept, m_journal); + } + catch (std::exception const& e) + { + JLOG(m_journal.warn()) + << "Exception thrown for handling new validation " + << val->getLedgerHash() << ": " << e.what(); + } + catch (...) + { + JLOG(m_journal.warn()) + << "Unknown exception thrown for handling new validation " + << val->getLedgerHash(); + } } - catch (std::exception const& e) - { - JLOG(m_journal.warn()) - << "Exception thrown for handling new validation " - << val->getLedgerHash() << ": " << e.what(); - } - catch (...) - { - JLOG(m_journal.warn()) - << "Unknown exception thrown for handling new validation " - << val->getLedgerHash(); - } - if (bypassAccept == BypassAccept::no) - { - pendingValidations_.erase(val->getLedgerHash()); - } - lock.unlock(); pubValidation(val); diff --git a/src/xrpld/app/misc/NetworkOPs.h b/src/xrpld/app/misc/NetworkOPs.h index 544e6bfe93..0cbf6b5c7d 100644 --- a/src/xrpld/app/misc/NetworkOPs.h +++ b/src/xrpld/app/misc/NetworkOPs.h @@ -191,7 +191,7 @@ public: virtual bool isFull() = 0; virtual void - setMode(OperatingMode om) = 0; + setMode(OperatingMode om, const char* reason) = 0; virtual bool isBlocked() = 0; virtual bool