Improve job queue collision checks and logging

- Improve logging related to ledger acquisition and operating mode
  changes
- Class "CanProcess" to keep track of processing of distinct items
This commit is contained in:
Ed Hennis
2025-04-25 11:23:49 -04:00
parent 33309480d4
commit b186516d0a
11 changed files with 234 additions and 74 deletions

View File

@@ -0,0 +1,134 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2024 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#define RIPPLE_BASICS_CANPROCESS_H_INCLUDED
#include <functional>
#include <mutex>
#include <set>
/** RAII class to check if an Item is already being processed on another thread,
* as indicated by it's presence in a Collection.
*
* If the Item is not in the Collection, it will be added under lock in the
* ctor, and removed under lock in the dtor. The object will be considered
* "usable" and evaluate to `true`.
*
* If the Item is in the Collection, no changes will be made to the collection,
* and the CanProcess object will be considered "unusable".
*
* It's up to the caller to decide what "usable" and "unusable" mean. (e.g.
* Process or skip a block of code, or set a flag.)
*
* The current use is to avoid lock contention that would be involved in
* processing something associated with the Item.
*
* Examples:
*
* void IncomingLedgers::acquireAsync(LedgerHash const& hash, ...)
* {
* if (CanProcess check{acquiresMutex_, pendingAcquires_, hash})
* {
* acquire(hash, ...);
* }
* }
*
* bool
* NetworkOPsImp::recvValidation(
* std::shared_ptr<STValidation> const& val,
* std::string const& source)
* {
* CanProcess check(
* validationsMutex_, pendingValidations_, val->getLedgerHash());
* BypassAccept bypassAccept =
* check ? BypassAccept::no : BypassAccept::yes;
* handleNewValidation(app_, val, source, bypassAccept, m_journal);
* }
*
*/
class CanProcess
{
public:
template <class Mutex, class Collection, class Item>
CanProcess(Mutex& mtx, Collection& collection, Item const& item)
: cleanup_(insert(mtx, collection, item))
{
}
~CanProcess()
{
if (cleanup_)
cleanup_();
}
explicit
operator bool() const
{
return static_cast<bool>(cleanup_);
}
private:
template <bool useIterator, class Mutex, class Collection, class Item>
std::function<void()>
doInsert(Mutex& mtx, Collection& collection, Item const& item)
{
std::unique_lock<Mutex> lock(mtx);
// TODO: Use structured binding once LLVM 16 is the minimum supported
// version. See also: https://github.com/llvm/llvm-project/issues/48582
// https://github.com/llvm/llvm-project/commit/127bf44385424891eb04cff8e52d3f157fc2cb7c
auto const insertResult = collection.insert(item);
auto const it = insertResult.first;
if (!insertResult.second)
return {};
if constexpr (useIterator)
return [&, it]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(it);
};
else
return [&]() {
std::unique_lock<Mutex> lock(mtx);
collection.erase(item);
};
}
// Generic insert() function doesn't use iterators because they may get
// invalidated
template <class Mutex, class Collection, class Item>
std::function<void()>
insert(Mutex& mtx, Collection& collection, Item const& item)
{
return doInsert<false>(mtx, collection, item);
}
// Specialize insert() for std::set, which does not invalidate iterators for
// insert and erase
template <class Mutex, class Item>
std::function<void()>
insert(Mutex& mtx, std::set<Item>& collection, Item const& item)
{
return doInsert<true>(mtx, collection, item);
}
// If set, then the item is "usable"
std::function<void()> cleanup_;
};
#endif

View File

@@ -36,6 +36,8 @@ struct LedgerHeader
// If validated is false, it means "not yet validated." // If validated is false, it means "not yet validated."
// Once validated is true, it will never be set false at a later time. // Once validated is true, it will never be set false at a later time.
// NOTE: If you are accessing this directly, you are probably doing it
// wrong. Use LedgerMaster::isValidated().
// VFALCO TODO Make this not mutable // VFALCO TODO Make this not mutable
bool mutable validated = false; bool mutable validated = false;
bool accepted = false; bool accepted = false;

View File

@@ -1059,7 +1059,8 @@ void
RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const RCLConsensus::Adaptor::updateOperatingMode(std::size_t const positions) const
{ {
if (!positions && app_.getOPs().isFull()) if (!positions && app_.getOPs().isFull())
app_.getOPs().setMode(OperatingMode::CONNECTED); app_.getOPs().setMode(
OperatingMode::CONNECTED, "updateOperatingMode: no positions");
} }
void void

View File

@@ -373,7 +373,14 @@ InboundLedger::onTimer(bool wasProgress, ScopedLockType&)
if (!wasProgress) if (!wasProgress)
{ {
checkLocal(); if (checkLocal())
{
// Done. Something else (probably consensus) built the ledger
// locally while waiting for data (or possibly before requesting)
XRPL_ASSERT(isDone(), "ripple::InboundLedger::onTimer : done");
JLOG(journal_.info()) << "Finished while waiting " << hash_;
return;
}
mByHash = true; mByHash = true;

View File

@@ -5,9 +5,9 @@
#include <xrpld/core/JobQueue.h> #include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h> #include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/DecayingSample.h> #include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h> #include <xrpl/basics/Log.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/container/aged_map.h> #include <xrpl/beast/container/aged_map.h>
#include <xrpl/protocol/jss.h> #include <xrpl/protocol/jss.h>
@@ -64,12 +64,15 @@ public:
(reason != InboundLedger::Reason::CONSENSUS)) (reason != InboundLedger::Reason::CONSENSUS))
return {}; return {};
std::stringstream ss;
bool isNew = true; bool isNew = true;
std::shared_ptr<InboundLedger> inbound; std::shared_ptr<InboundLedger> inbound;
{ {
ScopedLockType sl(mLock); ScopedLockType sl(mLock);
if (stopping_) if (stopping_)
{ {
JLOG(j_.debug()) << "Abort(stopping): " << ss.str();
return {}; return {};
} }
@@ -93,23 +96,29 @@ public:
++mCounter; ++mCounter;
} }
} }
ss << " IsNew: " << (isNew ? "true" : "false");
if (inbound->isFailed()) if (inbound->isFailed())
{
JLOG(j_.debug()) << "Abort(failed): " << ss.str();
return {}; return {};
}
if (!isNew) if (!isNew)
inbound->update(seq); inbound->update(seq);
if (!inbound->isComplete()) if (!inbound->isComplete())
{
JLOG(j_.debug()) << "InProgress: " << ss.str();
return {}; return {};
}
JLOG(j_.debug()) << "Complete: " << ss.str();
return inbound->getLedger(); return inbound->getLedger();
}; };
using namespace std::chrono_literals; using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog( return perf::measureDurationAndLog(
doAcquire, "InboundLedgersImp::acquire", 500ms, j_); doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
} }
void void
@@ -118,28 +127,25 @@ public:
std::uint32_t seq, std::uint32_t seq,
InboundLedger::Reason reason) override InboundLedger::Reason reason) override
{ {
std::unique_lock lock(acquiresMutex_); if (CanProcess const check{acquiresMutex_, pendingAcquires_, hash})
{
try try
{ {
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
scope_unlock unlock(lock);
acquire(hash, seq, reason); acquire(hash, seq, reason);
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
JLOG(j_.warn()) JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash << "Exception thrown for acquiring new inbound ledger "
<< ": " << e.what(); << hash << ": " << e.what();
} }
catch (...) catch (...)
{ {
JLOG(j_.warn()) JLOG(j_.warn()) << "Unknown exception thrown for acquiring new "
<< "Unknown exception thrown for acquiring new inbound ledger " "inbound ledger "
<< hash; << hash;
} }
pendingAcquires_.erase(hash); }
} }
std::shared_ptr<InboundLedger> std::shared_ptr<InboundLedger>

View File

@@ -942,8 +942,9 @@ LedgerMaster::checkAccept(std::shared_ptr<Ledger const> const& ledger)
} }
JLOG(m_journal.info()) << "Advancing accepted ledger to " JLOG(m_journal.info()) << "Advancing accepted ledger to "
<< ledger->info().seq << " with >= " << minVal << ledger->info().seq << " ("
<< " validations"; << to_short_string(ledger->info().hash)
<< ") with >= " << minVal << " validations";
ledger->setValidated(); ledger->setValidated();
ledger->setFull(); ledger->setFull();

View File

@@ -12,7 +12,8 @@ TimeoutCounter::TimeoutCounter(
QueueJobParameter&& jobParameter, QueueJobParameter&& jobParameter,
beast::Journal journal) beast::Journal journal)
: app_(app) : app_(app)
, journal_(journal) , sink_(journal, to_short_string(hash) + " ")
, journal_(sink_)
, hash_(hash) , hash_(hash)
, timeouts_(0) , timeouts_(0)
, complete_(false) , complete_(false)
@@ -32,6 +33,8 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
{ {
if (isDone()) if (isDone())
return; return;
JLOG(journal_.debug()) << "Setting timer for " << timerInterval_.count()
<< "ms";
timer_.expires_after(timerInterval_); timer_.expires_after(timerInterval_);
timer_.async_wait( timer_.async_wait(
[wptr = pmDowncast()](boost::system::error_code const& ec) { [wptr = pmDowncast()](boost::system::error_code const& ec) {
@@ -40,6 +43,12 @@ TimeoutCounter::setTimer(ScopedLockType& sl)
if (auto ptr = wptr.lock()) if (auto ptr = wptr.lock())
{ {
JLOG(ptr->journal_.debug())
<< "timer: ec: " << ec << " (operation_aborted: "
<< boost::asio::error::operation_aborted << " - "
<< (ec == boost::asio::error::operation_aborted ? "aborted"
: "other")
<< ")";
ScopedLockType sl(ptr->mtx_); ScopedLockType sl(ptr->mtx_);
ptr->queueJob(sl); ptr->queueJob(sl);
} }

View File

@@ -5,6 +5,7 @@
#include <xrpld/core/Job.h> #include <xrpld/core/Job.h>
#include <xrpl/beast/utility/Journal.h> #include <xrpl/beast/utility/Journal.h>
#include <xrpl/beast/utility/WrappedSink.h>
#include <boost/asio/basic_waitable_timer.hpp> #include <boost/asio/basic_waitable_timer.hpp>
@@ -104,6 +105,7 @@ protected:
// Used in this class for access to boost::asio::io_context and // Used in this class for access to boost::asio::io_context and
// ripple::Overlay. Used in subtypes for the kitchen sink. // ripple::Overlay. Used in subtypes for the kitchen sink.
Application& app_; Application& app_;
beast::WrappedSink sink_;
beast::Journal journal_; beast::Journal journal_;
mutable std::recursive_mutex mtx_; mutable std::recursive_mutex mtx_;

View File

@@ -203,7 +203,7 @@ public:
/** Add a suppression peer and get message's relay status. /** Add a suppression peer and get message's relay status.
* Return pair: * Return pair:
* element 1: true if the peer is added. * element 1: true if the key is added.
* element 2: optional is seated to the relay time point or * element 2: optional is seated to the relay time point or
* is unseated if has not relayed yet. */ * is unseated if has not relayed yet. */
std::pair<bool, std::optional<Stopwatch::time_point>> std::pair<bool, std::optional<Stopwatch::time_point>>

View File

@@ -34,10 +34,10 @@
#include <xrpld/rpc/MPTokenIssuanceID.h> #include <xrpld/rpc/MPTokenIssuanceID.h>
#include <xrpld/rpc/ServerHandler.h> #include <xrpld/rpc/ServerHandler.h>
#include <xrpl/basics/CanProcess.h>
#include <xrpl/basics/UptimeClock.h> #include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/mulDiv.h> #include <xrpl/basics/mulDiv.h>
#include <xrpl/basics/safe_cast.h> #include <xrpl/basics/safe_cast.h>
#include <xrpl/basics/scope.h>
#include <xrpl/beast/utility/rngfill.h> #include <xrpl/beast/utility/rngfill.h>
#include <xrpl/crypto/RFC1751.h> #include <xrpl/crypto/RFC1751.h>
#include <xrpl/crypto/csprng.h> #include <xrpl/crypto/csprng.h>
@@ -408,7 +408,7 @@ public:
isFull() override; isFull() override;
void void
setMode(OperatingMode om) override; setMode(OperatingMode om, const char* reason) override;
bool bool
isBlocked() override; isBlocked() override;
@@ -886,7 +886,7 @@ NetworkOPsImp::strOperatingMode(bool const admin /* = false */) const
inline void inline void
NetworkOPsImp::setStandAlone() NetworkOPsImp::setStandAlone()
{ {
setMode(OperatingMode::FULL); setMode(OperatingMode::FULL, "setStandAlone");
} }
inline void inline void
@@ -1036,7 +1036,9 @@ NetworkOPsImp::processHeartbeatTimer()
{ {
if (mMode != OperatingMode::DISCONNECTED) if (mMode != OperatingMode::DISCONNECTED)
{ {
setMode(OperatingMode::DISCONNECTED); setMode(
OperatingMode::DISCONNECTED,
"Heartbeat: insufficient peers");
std::stringstream ss; std::stringstream ss;
ss << "Node count (" << numPeers << ") has fallen " ss << "Node count (" << numPeers << ") has fallen "
<< "below required minimum (" << minPeerCount_ << ")."; << "below required minimum (" << minPeerCount_ << ").";
@@ -1061,7 +1063,7 @@ NetworkOPsImp::processHeartbeatTimer()
if (mMode == OperatingMode::DISCONNECTED) if (mMode == OperatingMode::DISCONNECTED)
{ {
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "Heartbeat: sufficient peers");
JLOG(m_journal.info()) JLOG(m_journal.info())
<< "Node count (" << numPeers << ") is sufficient."; << "Node count (" << numPeers << ") is sufficient.";
CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers CLOG(clog.ss()) << "setting mode to CONNECTED based on " << numPeers
@@ -1073,9 +1075,9 @@ NetworkOPsImp::processHeartbeatTimer()
auto origMode = mMode.load(); auto origMode = mMode.load();
CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true); CLOG(clog.ss()) << "mode: " << strOperatingMode(origMode, true);
if (mMode == OperatingMode::SYNCING) if (mMode == OperatingMode::SYNCING)
setMode(OperatingMode::SYNCING); setMode(OperatingMode::SYNCING, "Heartbeat: check syncing");
else if (mMode == OperatingMode::CONNECTED) else if (mMode == OperatingMode::CONNECTED)
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "Heartbeat: check connected");
auto newMode = mMode.load(); auto newMode = mMode.load();
if (origMode != newMode) if (origMode != newMode)
{ {
@@ -1824,7 +1826,7 @@ void
NetworkOPsImp::setAmendmentBlocked() NetworkOPsImp::setAmendmentBlocked()
{ {
amendmentBlocked_ = true; amendmentBlocked_ = true;
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "setAmendmentBlocked");
} }
inline bool inline bool
@@ -1855,7 +1857,7 @@ void
NetworkOPsImp::setUNLBlocked() NetworkOPsImp::setUNLBlocked()
{ {
unlBlocked_ = true; unlBlocked_ = true;
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "setUNLBlocked");
} }
inline void inline void
@@ -1956,7 +1958,7 @@ NetworkOPsImp::checkLastClosedLedger(
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
{ {
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "check LCL: not on consensus ledger");
} }
if (consensus) if (consensus)
@@ -2045,8 +2047,9 @@ NetworkOPsImp::beginConsensus(
// this shouldn't happen unless we jump ledgers // this shouldn't happen unless we jump ledgers
if (mMode == OperatingMode::FULL) if (mMode == OperatingMode::FULL)
{ {
JLOG(m_journal.warn()) << "Don't have LCL, going to tracking"; JLOG(m_journal.warn())
setMode(OperatingMode::TRACKING); << "beginConsensus Don't have LCL, going to tracking";
setMode(OperatingMode::TRACKING, "beginConsensus: No LCL");
CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. "; CLOG(clog) << "beginConsensus Don't have LCL, going to tracking. ";
} }
@@ -2182,7 +2185,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
// validations we have for LCL. If the ledger is good enough, go to // validations we have for LCL. If the ledger is good enough, go to
// TRACKING - TODO // TRACKING - TODO
if (!needNetworkLedger_) if (!needNetworkLedger_)
setMode(OperatingMode::TRACKING); setMode(OperatingMode::TRACKING, "endConsensus: check tracking");
} }
if (((mMode == OperatingMode::CONNECTED) || if (((mMode == OperatingMode::CONNECTED) ||
@@ -2196,7 +2199,7 @@ NetworkOPsImp::endConsensus(std::unique_ptr<std::stringstream> const& clog)
if (app_.timeKeeper().now() < (current->info().parentCloseTime + if (app_.timeKeeper().now() < (current->info().parentCloseTime +
2 * current->info().closeTimeResolution)) 2 * current->info().closeTimeResolution))
{ {
setMode(OperatingMode::FULL); setMode(OperatingMode::FULL, "endConsensus: check full");
} }
} }
@@ -2208,7 +2211,7 @@ NetworkOPsImp::consensusViewChange()
{ {
if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING)) if ((mMode == OperatingMode::FULL) || (mMode == OperatingMode::TRACKING))
{ {
setMode(OperatingMode::CONNECTED); setMode(OperatingMode::CONNECTED, "consensusViewChange");
} }
} }
@@ -2527,7 +2530,7 @@ NetworkOPsImp::pubPeerStatus(std::function<Json::Value(void)> const& func)
} }
void void
NetworkOPsImp::setMode(OperatingMode om) NetworkOPsImp::setMode(OperatingMode om, const char* reason)
{ {
using namespace std::chrono_literals; using namespace std::chrono_literals;
if (om == OperatingMode::CONNECTED) if (om == OperatingMode::CONNECTED)
@@ -2547,11 +2550,12 @@ NetworkOPsImp::setMode(OperatingMode om)
if (mMode == om) if (mMode == om)
return; return;
auto const sink = om < mMode ? m_journal.warn() : m_journal.info();
mMode = om; mMode = om;
accounting_.mode(om); accounting_.mode(om);
JLOG(m_journal.info()) << "STATE->" << strOperatingMode(); JLOG(sink) << "STATE->" << strOperatingMode() << " - " << reason;
pubServer(); pubServer();
} }
@@ -2563,15 +2567,13 @@ NetworkOPsImp::recvValidation(
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< "recvValidation " << val->getLedgerHash() << " from " << source; << "recvValidation " << val->getLedgerHash() << " from " << source;
std::unique_lock lock(validationsMutex_); {
BypassAccept bypassAccept = BypassAccept::no; CanProcess const check(
validationsMutex_, pendingValidations_, val->getLedgerHash());
try try
{ {
if (pendingValidations_.contains(val->getLedgerHash())) BypassAccept bypassAccept =
bypassAccept = BypassAccept::yes; check ? BypassAccept::no : BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
scope_unlock unlock(lock);
handleNewValidation(app_, val, source, bypassAccept, m_journal); handleNewValidation(app_, val, source, bypassAccept, m_journal);
} }
catch (std::exception const& e) catch (std::exception const& e)
@@ -2586,11 +2588,7 @@ NetworkOPsImp::recvValidation(
<< "Unknown exception thrown for handling new validation " << "Unknown exception thrown for handling new validation "
<< val->getLedgerHash(); << val->getLedgerHash();
} }
if (bypassAccept == BypassAccept::no)
{
pendingValidations_.erase(val->getLedgerHash());
} }
lock.unlock();
pubValidation(val); pubValidation(val);

View File

@@ -191,7 +191,7 @@ public:
virtual bool virtual bool
isFull() = 0; isFull() = 0;
virtual void virtual void
setMode(OperatingMode om) = 0; setMode(OperatingMode om, const char* reason) = 0;
virtual bool virtual bool
isBlocked() = 0; isBlocked() = 0;
virtual bool virtual bool