diff --git a/src/ripple/app/consensus/RCLConsensus.cpp b/src/ripple/app/consensus/RCLConsensus.cpp index 49d593718..24b695e89 100644 --- a/src/ripple/app/consensus/RCLConsensus.cpp +++ b/src/ripple/app/consensus/RCLConsensus.cpp @@ -134,8 +134,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash) acquiringLedger_ = hash; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() { - app.getInboundLedgers().acquire( + jtADVANCE, + "getConsensusLedger1", + [id = hash, &app = app_, this]() { + JLOG(j_.debug()) + << "JOB advanceLedger getConsensusLedger1 started"; + app.getInboundLedgers().acquireAsync( id, 0, InboundLedger::Reason::CONSENSUS); }); } diff --git a/src/ripple/app/consensus/RCLValidations.cpp b/src/ripple/app/consensus/RCLValidations.cpp index ab9391385..d5512bae1 100644 --- a/src/ripple/app/consensus/RCLValidations.cpp +++ b/src/ripple/app/consensus/RCLValidations.cpp @@ -135,8 +135,10 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash) Application* pApp = &app_; app_.getJobQueue().addJob( - jtADVANCE, "getConsensusLedger", [pApp, hash]() { - pApp->getInboundLedgers().acquire( + jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() { + JLOG(j_.debug()) + << "JOB advanceLedger getConsensusLedger2 started"; + pApp->getInboundLedgers().acquireAsync( hash, 0, InboundLedger::Reason::CONSENSUS); }); return std::nullopt; @@ -152,7 +154,9 @@ void handleNewValidation( Application& app, std::shared_ptr const& val, - std::string const& source) + std::string const& source, + BypassAccept const bypassAccept, + std::optional j) { auto const& signingKey = val->getSignerPublic(); auto const& hash = val->getLedgerHash(); @@ -177,7 +181,23 @@ handleNewValidation( if (outcome == ValStatus::current) { if (val->isTrusted()) - app.getLedgerMaster().checkAccept(hash, seq); + { + // Was: app.getLedgerMaster().checkAccept(hash, seq); + // https://github.com/XRPLF/rippled/commit/fbbea9e6e25795a8a6bd1bf64b780771933a9579 + if (bypassAccept == BypassAccept::yes) + { + assert(j.has_value()); + if (j.has_value()) + { + JLOG(j->trace()) << "Bypassing checkAccept for validation " + << val->getLedgerHash(); + } + } + else + { + app.getLedgerMaster().checkAccept(hash, seq); + } + } return; } diff --git a/src/ripple/app/consensus/RCLValidations.h b/src/ripple/app/consensus/RCLValidations.h index 93628fe16..e141731e1 100644 --- a/src/ripple/app/consensus/RCLValidations.h +++ b/src/ripple/app/consensus/RCLValidations.h @@ -25,12 +25,16 @@ #include #include #include +#include +#include #include namespace ripple { class Application; +enum class BypassAccept : bool { no = false, yes }; + /** Wrapper over STValidation for generic Validation code Wraps an STValidation for compatibility with the generic validation code. @@ -248,7 +252,9 @@ void handleNewValidation( Application& app, std::shared_ptr const& val, - std::string const& source); + std::string const& source, + BypassAccept const bypassAccept = BypassAccept::no, + std::optional j = std::nullopt); } // namespace ripple diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h index dca4a80c5..f9b442b66 100644 --- a/src/ripple/app/ledger/InboundLedgers.h +++ b/src/ripple/app/ledger/InboundLedgers.h @@ -38,10 +38,21 @@ public: virtual ~InboundLedgers() = default; // VFALCO TODO Should this be called findOrAdd ? + // Callers should use this if they possibly need an authoritative + // response immediately. // virtual std::shared_ptr acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0; + // Callers should use this if they are known to be executing on the Job + // Queue. TODO review whether all callers of acquire() can use this + // instead. Inbound ledger acquisition is asynchronous anyway. + virtual void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) = 0; + virtual std::shared_ptr find(LedgerHash const& hash) = 0; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index ee200d273..660c21d53 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -560,7 +560,7 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) return; } - if (auto stream = journal_.trace()) + if (auto stream = journal_.debug()) { if (peer) stream << "Trigger acquiring ledger " << hash_ << " from " << peer; diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 7ee49b454..49fc6cb30 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -28,6 +28,7 @@ #include #include #include +#include #include #include #include @@ -141,6 +142,37 @@ public: return inbound->getLedger(); } + void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) override + { + std::unique_lock lock(acquiresMutex_); + try + { + if (pendingAcquires_.contains(hash)) + return; + pendingAcquires_.insert(hash); + lock.unlock(); + acquire(hash, seq, reason); + } + catch (std::exception const& e) + { + JLOG(j_.warn()) + << "Exception thrown for acquiring new inbound ledger " << hash + << ": " << e.what(); + } + catch (...) + { + JLOG(j_.warn()) + << "Unknown exception thrown for acquiring new inbound ledger " + << hash; + } + lock.lock(); + pendingAcquires_.erase(hash); + } + std::shared_ptr find(uint256 const& hash) override { @@ -426,6 +458,9 @@ private: beast::insight::Counter mCounter; std::unique_ptr mPeerSetBuilder; + + std::set pendingAcquires_; + std::mutex acquiresMutex_; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 179b1ecbc..6f6bfcc1c 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -70,7 +70,9 @@ #include #include +#include #include +#include #include #include #include @@ -776,6 +778,9 @@ private: StateAccounting accounting_{}; + std::set pendingValidations_; + std::mutex validationsMutex_; + private: struct Stats { @@ -1791,7 +1796,8 @@ NetworkOPsImp::checkLastClosedLedger( } JLOG(m_journal.warn()) << "We are not running on the consensus ledger"; - JLOG(m_journal.info()) << "Our LCL: " << getJson({*ourClosed, {}}); + JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash + << getJson({*ourClosed, {}}); JLOG(m_journal.info()) << "Net LCL " << closedLedger; if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) @@ -2345,7 +2351,37 @@ NetworkOPsImp::recvValidation( JLOG(m_journal.trace()) << "recvValidation " << val->getLedgerHash() << " from " << source; - handleNewValidation(app_, val, source); + // handleNewValidation(app_, val, source); + // https://github.com/XRPLF/rippled/commit/fbbea9e6e25795a8a6bd1bf64b780771933a9579 + std::unique_lock lock(validationsMutex_); + BypassAccept bypassAccept = BypassAccept::no; + try + { + if (pendingValidations_.contains(val->getLedgerHash())) + bypassAccept = BypassAccept::yes; + else + pendingValidations_.insert(val->getLedgerHash()); + lock.unlock(); + handleNewValidation(app_, val, source, bypassAccept, m_journal); + } + catch (std::exception const& e) + { + JLOG(m_journal.warn()) + << "Exception thrown for handling new validation " + << val->getLedgerHash() << ": " << e.what(); + } + catch (...) + { + JLOG(m_journal.warn()) + << "Unknown exception thrown for handling new validation " + << val->getLedgerHash(); + } + if (bypassAccept == BypassAccept::no) + { + lock.lock(); + pendingValidations_.erase(val->getLedgerHash()); + lock.unlock(); + } pubValidation(val); diff --git a/src/test/app/LedgerReplay_test.cpp b/src/test/app/LedgerReplay_test.cpp index cff94ee04..7619a0f25 100644 --- a/src/test/app/LedgerReplay_test.cpp +++ b/src/test/app/LedgerReplay_test.cpp @@ -106,6 +106,14 @@ public: return {}; } + virtual void + acquireAsync( + uint256 const& hash, + std::uint32_t seq, + InboundLedger::Reason reason) override + { + } + virtual std::shared_ptr find(LedgerHash const& hash) override {