Compare commits

...

5 Commits

Author SHA1 Message Date
Wietse Wind
f2293ce440 Oops 2024-09-06 01:37:22 +02:00
Wietse Wind
f31a23017f clang-format 2024-09-06 00:59:52 +02:00
Wietse Wind
243a357b28 Merge branch 'merge/fbbea9e6e25795a8a6bd1bf64b780771933a9579' into merge/2.2.2-jobqueue 2024-09-06 00:46:56 +02:00
Wietse Wind
ce187dad18 Merge 7741483894 2024-09-06 00:45:59 +02:00
Wietse Wind
0a05243d80 Merge fbbea9e6e2 2024-09-05 12:53:07 +02:00
8 changed files with 130 additions and 10 deletions

View File

@@ -134,8 +134,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
acquiringLedger_ = hash; acquiringLedger_ = hash;
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() { jtADVANCE,
app.getInboundLedgers().acquire( "getConsensusLedger1",
[id = hash, &app = app_, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
id, 0, InboundLedger::Reason::CONSENSUS); id, 0, InboundLedger::Reason::CONSENSUS);
}); });
} }

View File

@@ -135,8 +135,10 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_; Application* pApp = &app_;
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [pApp, hash]() { jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
pApp->getInboundLedgers().acquire( JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS); hash, 0, InboundLedger::Reason::CONSENSUS);
}); });
return std::nullopt; return std::nullopt;
@@ -152,7 +154,9 @@ void
handleNewValidation( handleNewValidation(
Application& app, Application& app,
std::shared_ptr<STValidation> const& val, std::shared_ptr<STValidation> const& val,
std::string const& source) std::string const& source,
BypassAccept const bypassAccept,
std::optional<beast::Journal> j)
{ {
auto const& signingKey = val->getSignerPublic(); auto const& signingKey = val->getSignerPublic();
auto const& hash = val->getLedgerHash(); auto const& hash = val->getLedgerHash();
@@ -177,7 +181,23 @@ handleNewValidation(
if (outcome == ValStatus::current) if (outcome == ValStatus::current)
{ {
if (val->isTrusted()) if (val->isTrusted())
app.getLedgerMaster().checkAccept(hash, seq); {
// Was: app.getLedgerMaster().checkAccept(hash, seq);
// https://github.com/XRPLF/rippled/commit/fbbea9e6e25795a8a6bd1bf64b780771933a9579
if (bypassAccept == BypassAccept::yes)
{
assert(j.has_value());
if (j.has_value())
{
JLOG(j->trace()) << "Bypassing checkAccept for validation "
<< val->getLedgerHash();
}
}
else
{
app.getLedgerMaster().checkAccept(hash, seq);
}
}
return; return;
} }

View File

@@ -25,12 +25,16 @@
#include <ripple/protocol/Protocol.h> #include <ripple/protocol/Protocol.h>
#include <ripple/protocol/RippleLedgerHash.h> #include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STValidation.h> #include <ripple/protocol/STValidation.h>
#include <optional>
#include <set>
#include <vector> #include <vector>
namespace ripple { namespace ripple {
class Application; class Application;
enum class BypassAccept : bool { no = false, yes };
/** Wrapper over STValidation for generic Validation code /** Wrapper over STValidation for generic Validation code
Wraps an STValidation for compatibility with the generic validation code. Wraps an STValidation for compatibility with the generic validation code.
@@ -248,7 +252,9 @@ void
handleNewValidation( handleNewValidation(
Application& app, Application& app,
std::shared_ptr<STValidation> const& val, std::shared_ptr<STValidation> const& val,
std::string const& source); std::string const& source,
BypassAccept const bypassAccept = BypassAccept::no,
std::optional<beast::Journal> j = std::nullopt);
} // namespace ripple } // namespace ripple

View File

@@ -38,10 +38,21 @@ public:
virtual ~InboundLedgers() = default; virtual ~InboundLedgers() = default;
// VFALCO TODO Should this be called findOrAdd ? // VFALCO TODO Should this be called findOrAdd ?
// Callers should use this if they possibly need an authoritative
// response immediately.
// //
virtual std::shared_ptr<Ledger const> virtual std::shared_ptr<Ledger const>
acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0; acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0;
// Callers should use this if they are known to be executing on the Job
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger> virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0; find(LedgerHash const& hash) = 0;

View File

@@ -560,7 +560,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
return; return;
} }
if (auto stream = journal_.trace()) if (auto stream = journal_.debug())
{ {
if (peer) if (peer)
stream << "Trigger acquiring ledger " << hash_ << " from " << peer; stream << "Trigger acquiring ledger " << hash_ << " from " << peer;

View File

@@ -28,6 +28,7 @@
#include <ripple/core/JobQueue.h> #include <ripple/core/JobQueue.h>
#include <ripple/nodestore/DatabaseShard.h> #include <ripple/nodestore/DatabaseShard.h>
#include <ripple/protocol/jss.h> #include <ripple/protocol/jss.h>
#include <exception>
#include <memory> #include <memory>
#include <mutex> #include <mutex>
#include <vector> #include <vector>
@@ -141,6 +142,37 @@ public:
return inbound->getLedger(); return inbound->getLedger();
} }
void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
lock.unlock();
acquire(hash, seq, reason);
}
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new inbound ledger "
<< hash;
}
lock.lock();
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger> std::shared_ptr<InboundLedger>
find(uint256 const& hash) override find(uint256 const& hash) override
{ {
@@ -426,6 +458,9 @@ private:
beast::insight::Counter mCounter; beast::insight::Counter mCounter;
std::unique_ptr<PeerSetBuilder> mPeerSetBuilder; std::unique_ptr<PeerSetBuilder> mPeerSetBuilder;
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -70,7 +70,9 @@
#include <boost/asio/ip/host_name.hpp> #include <boost/asio/ip/host_name.hpp>
#include <boost/asio/steady_timer.hpp> #include <boost/asio/steady_timer.hpp>
#include <exception>
#include <mutex> #include <mutex>
#include <set>
#include <string> #include <string>
#include <tuple> #include <tuple>
#include <unordered_map> #include <unordered_map>
@@ -776,6 +778,9 @@ private:
StateAccounting accounting_{}; StateAccounting accounting_{};
std::set<uint256> pendingValidations_;
std::mutex validationsMutex_;
private: private:
struct Stats struct Stats
{ {
@@ -1791,7 +1796,8 @@ NetworkOPsImp::checkLastClosedLedger(
} }
JLOG(m_journal.warn()) << "We are not running on the consensus ledger"; JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
JLOG(m_journal.info()) << "Our LCL: " << getJson({*ourClosed, {}}); JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash
<< getJson({*ourClosed, {}});
JLOG(m_journal.info()) << "Net LCL " << closedLedger; JLOG(m_journal.info()) << "Net LCL " << closedLedger;
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL)) if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
@@ -2345,7 +2351,37 @@ NetworkOPsImp::recvValidation(
JLOG(m_journal.trace()) JLOG(m_journal.trace())
<< "recvValidation " << val->getLedgerHash() << " from " << source; << "recvValidation " << val->getLedgerHash() << " from " << source;
handleNewValidation(app_, val, source); // handleNewValidation(app_, val, source);
// https://github.com/XRPLF/rippled/commit/fbbea9e6e25795a8a6bd1bf64b780771933a9579
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
lock.unlock();
handleNewValidation(app_, val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
lock.lock();
pendingValidations_.erase(val->getLedgerHash());
lock.unlock();
}
pubValidation(val); pubValidation(val);

View File

@@ -106,6 +106,14 @@ public:
return {}; return {};
} }
virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}
virtual std::shared_ptr<InboundLedger> virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) override find(LedgerHash const& hash) override
{ {