Merge remote-tracking branch 'upstream/master' into upstream--develop

* upstream/master:
  Set version to 2.2.2
  Allow only 1 job queue slot for each validation ledger check
  Allow only 1 job queue slot for acquiring inbound ledger.
  Track latencies of certain code blocks, and log if they take too long
This commit is contained in:
Ed Hennis
2024-09-03 17:00:51 -04:00
24 changed files with 286 additions and 87 deletions

View File

@@ -32,7 +32,7 @@ Loop: xrpld.core xrpld.net
xrpld.net > xrpld.core
Loop: xrpld.core xrpld.perflog
xrpld.perflog ~= xrpld.core
xrpld.perflog == xrpld.core
Loop: xrpld.net xrpld.rpc
xrpld.rpc ~= xrpld.net

View File

@@ -171,6 +171,7 @@ xrpld.nodestore > xrpl.protocol
xrpld.overlay > xrpl.basics
xrpld.overlay > xrpld.core
xrpld.overlay > xrpld.peerfinder
xrpld.overlay > xrpld.perflog
xrpld.overlay > xrpl.json
xrpld.overlay > xrpl.protocol
xrpld.overlay > xrpl.resource

View File

@@ -106,6 +106,14 @@ public:
return {};
}
virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
}
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) override
{

View File

@@ -255,7 +255,7 @@ public:
setup.dataDir = getDatabasePath();
assert(!setup.useGlobalPragma);
auto dbCon = makeTestWalletDB(setup, dbName);
auto dbCon = makeTestWalletDB(setup, dbName, env.journal);
auto getPopulatedManifests =
[](ManifestCache const& cache) -> std::vector<Manifest const*> {

View File

@@ -134,8 +134,12 @@ RCLConsensus::Adaptor::acquireLedger(LedgerHash const& hash)
acquiringLedger_ = hash;
app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [id = hash, &app = app_]() {
app.getInboundLedgers().acquire(
jtADVANCE,
"getConsensusLedger1",
[id = hash, &app = app_, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger1 started";
app.getInboundLedgers().acquireAsync(
id, 0, InboundLedger::Reason::CONSENSUS);
});
}

View File

@@ -27,6 +27,7 @@
#include <xrpld/consensus/LedgerTiming.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/core/TimeKeeper.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/Log.h>
#include <xrpl/basics/StringUtilities.h>
#include <xrpl/basics/chrono.h>
@@ -126,7 +127,13 @@ RCLValidationsAdaptor::now() const
std::optional<RCLValidatedLedger>
RCLValidationsAdaptor::acquire(LedgerHash const& hash)
{
auto ledger = app_.getLedgerMaster().getLedgerByHash(hash);
using namespace std::chrono_literals;
auto ledger = perf::measureDurationAndLog(
[&]() { return app_.getLedgerMaster().getLedgerByHash(hash); },
"getLedgerByHash",
10ms,
j_);
if (!ledger)
{
JLOG(j_.debug())
@@ -135,8 +142,10 @@ RCLValidationsAdaptor::acquire(LedgerHash const& hash)
Application* pApp = &app_;
app_.getJobQueue().addJob(
jtADVANCE, "getConsensusLedger", [pApp, hash]() {
pApp->getInboundLedgers().acquire(
jtADVANCE, "getConsensusLedger2", [pApp, hash, this]() {
JLOG(j_.debug())
<< "JOB advanceLedger getConsensusLedger2 started";
pApp->getInboundLedgers().acquireAsync(
hash, 0, InboundLedger::Reason::CONSENSUS);
});
return std::nullopt;
@@ -152,7 +161,9 @@ void
handleNewValidation(
Application& app,
std::shared_ptr<STValidation> const& val,
std::string const& source)
std::string const& source,
BypassAccept const bypassAccept,
std::optional<beast::Journal> j)
{
auto const& signingKey = val->getSignerPublic();
auto const& hash = val->getLedgerHash();
@@ -177,7 +188,21 @@ handleNewValidation(
if (outcome == ValStatus::current)
{
if (val->isTrusted())
app.getLedgerMaster().checkAccept(hash, seq);
{
if (bypassAccept == BypassAccept::yes)
{
assert(j.has_value());
if (j.has_value())
{
JLOG(j->trace()) << "Bypassing checkAccept for validation "
<< val->getLedgerHash();
}
}
else
{
app.getLedgerMaster().checkAccept(hash, seq);
}
}
return;
}

View File

@@ -25,12 +25,16 @@
#include <xrpl/protocol/Protocol.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <xrpl/protocol/STValidation.h>
#include <optional>
#include <set>
#include <vector>
namespace ripple {
class Application;
enum class BypassAccept : bool { no = false, yes };
/** Wrapper over STValidation for generic Validation code
Wraps an STValidation for compatibility with the generic validation code.
@@ -248,7 +252,9 @@ void
handleNewValidation(
Application& app,
std::shared_ptr<STValidation> const& val,
std::string const& source);
std::string const& source,
BypassAccept const bypassAccept = BypassAccept::no,
std::optional<beast::Journal> j = std::nullopt);
} // namespace ripple

View File

@@ -23,6 +23,7 @@
#include <xrpld/app/ledger/InboundLedger.h>
#include <xrpl/protocol/RippleLedgerHash.h>
#include <memory>
#include <set>
namespace ripple {
@@ -37,11 +38,20 @@ public:
virtual ~InboundLedgers() = default;
// VFALCO TODO Should this be called findOrAdd ?
//
// Callers should use this if they possibly need an authoritative
// response immediately.
virtual std::shared_ptr<Ledger const>
acquire(uint256 const& hash, std::uint32_t seq, InboundLedger::Reason) = 0;
// Callers should use this if they are known to be executing on the Job
// Queue. TODO review whether all callers of acquire() can use this
// instead. Inbound ledger acquisition is asynchronous anyway.
virtual void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) = 0;
virtual std::shared_ptr<InboundLedger>
find(LedgerHash const& hash) = 0;

View File

@@ -495,7 +495,7 @@ InboundLedger::trigger(std::shared_ptr<Peer> const& peer, TriggerReason reason)
return;
}
if (auto stream = journal_.trace())
if (auto stream = journal_.debug())
{
stream << "Trigger acquiring ledger " << hash_;
if (peer)

View File

@@ -22,11 +22,13 @@
#include <xrpld/app/main/Application.h>
#include <xrpld/app/misc/NetworkOPs.h>
#include <xrpld/core/JobQueue.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/DecayingSample.h>
#include <xrpl/basics/Log.h>
#include <xrpl/beast/container/aged_map.h>
#include <xrpl/beast/core/LexicalCast.h>
#include <xrpl/protocol/jss.h>
#include <exception>
#include <memory>
#include <mutex>
#include <vector>
@@ -68,54 +70,92 @@ public:
std::uint32_t seq,
InboundLedger::Reason reason) override
{
assert(hash.isNonZero());
auto doAcquire = [&, seq, reason]() -> std::shared_ptr<Ledger const> {
assert(hash.isNonZero());
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
return {};
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
// probably not the right rule
if (app_.getOPs().isNeedNetworkLedger() &&
(reason != InboundLedger::Reason::GENERIC) &&
(reason != InboundLedger::Reason::CONSENSUS))
return {};
bool isNew = true;
std::shared_ptr<InboundLedger> inbound;
{
ScopedLockType sl(mLock);
if (stopping_)
{
return {};
}
auto it = mLedgers.find(hash);
if (it != mLedgers.end())
{
isNew = false;
inbound = it->second;
}
else
{
inbound = std::make_shared<InboundLedger>(
app_,
hash,
seq,
reason,
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
++mCounter;
}
}
auto it = mLedgers.find(hash);
if (it != mLedgers.end())
{
isNew = false;
inbound = it->second;
}
else
{
inbound = std::make_shared<InboundLedger>(
app_,
hash,
seq,
reason,
std::ref(m_clock),
mPeerSetBuilder->build());
mLedgers.emplace(hash, inbound);
inbound->init(sl);
++mCounter;
}
if (inbound->isFailed())
return {};
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
return {};
return inbound->getLedger();
};
using namespace std::chrono_literals;
std::shared_ptr<Ledger const> ledger = perf::measureDurationAndLog(
doAcquire, "InboundLedgersImp::acquire", 500ms, j_);
return ledger;
}
void
acquireAsync(
uint256 const& hash,
std::uint32_t seq,
InboundLedger::Reason reason) override
{
std::unique_lock lock(acquiresMutex_);
try
{
if (pendingAcquires_.contains(hash))
return;
pendingAcquires_.insert(hash);
lock.unlock();
acquire(hash, seq, reason);
}
if (inbound->isFailed())
return {};
if (!isNew)
inbound->update(seq);
if (!inbound->isComplete())
return {};
return inbound->getLedger();
catch (std::exception const& e)
{
JLOG(j_.warn())
<< "Exception thrown for acquiring new inbound ledger " << hash
<< ": " << e.what();
}
catch (...)
{
JLOG(j_.warn())
<< "Unknown exception thrown for acquiring new inbound ledger "
<< hash;
}
lock.lock();
pendingAcquires_.erase(hash);
}
std::shared_ptr<InboundLedger>
@@ -410,6 +450,9 @@ private:
beast::insight::Counter mCounter;
std::unique_ptr<PeerSetBuilder> mPeerSetBuilder;
std::set<uint256> pendingAcquires_;
std::mutex acquiresMutex_;
};
//------------------------------------------------------------------------------

View File

@@ -839,7 +839,7 @@ public:
auto setup = setup_DatabaseCon(*config_, m_journal);
setup.useGlobalPragma = false;
mWalletDB = makeWalletDB(setup);
mWalletDB = makeWalletDB(setup, m_journal);
}
catch (std::exception const& e)
{

View File

@@ -592,7 +592,7 @@ run(int argc, char** argv)
try
{
auto setup = setup_DatabaseCon(*config);
if (!doVacuumDB(setup))
if (!doVacuumDB(setup, config->journal()))
return -1;
}
catch (std::exception const& e)

View File

@@ -69,8 +69,10 @@
#include <boost/asio/steady_timer.hpp>
#include <algorithm>
#include <exception>
#include <mutex>
#include <optional>
#include <set>
#include <string>
#include <tuple>
#include <unordered_map>
@@ -769,6 +771,9 @@ private:
StateAccounting accounting_{};
std::set<uint256> pendingValidations_;
std::mutex validationsMutex_;
private:
struct Stats
{
@@ -1718,7 +1723,8 @@ NetworkOPsImp::checkLastClosedLedger(
}
JLOG(m_journal.warn()) << "We are not running on the consensus ledger";
JLOG(m_journal.info()) << "Our LCL: " << getJson({*ourClosed, {}});
JLOG(m_journal.info()) << "Our LCL: " << ourClosed->info().hash
<< getJson({*ourClosed, {}});
JLOG(m_journal.info()) << "Net LCL " << closedLedger;
if ((mMode == OperatingMode::TRACKING) || (mMode == OperatingMode::FULL))
@@ -2295,7 +2301,35 @@ NetworkOPsImp::recvValidation(
JLOG(m_journal.trace())
<< "recvValidation " << val->getLedgerHash() << " from " << source;
handleNewValidation(app_, val, source);
std::unique_lock lock(validationsMutex_);
BypassAccept bypassAccept = BypassAccept::no;
try
{
if (pendingValidations_.contains(val->getLedgerHash()))
bypassAccept = BypassAccept::yes;
else
pendingValidations_.insert(val->getLedgerHash());
lock.unlock();
handleNewValidation(app_, val, source, bypassAccept, m_journal);
}
catch (std::exception const& e)
{
JLOG(m_journal.warn())
<< "Exception thrown for handling new validation "
<< val->getLedgerHash() << ": " << e.what();
}
catch (...)
{
JLOG(m_journal.warn())
<< "Unknown exception thrown for handling new validation "
<< val->getLedgerHash();
}
if (bypassAccept == BypassAccept::no)
{
lock.lock();
pendingValidations_.erase(val->getLedgerHash());
lock.unlock();
}
pubValidation(val);

View File

@@ -27,10 +27,11 @@ namespace ripple {
/**
* @brief doVacuumDB Creates, initialises, and performs cleanup on a database.
* @param setup Path to the database and other opening parameters.
* @param j Journal.
* @return True if the vacuum process completed successfully.
*/
bool
doVacuumDB(DatabaseCon::Setup const& setup);
doVacuumDB(DatabaseCon::Setup const& setup, beast::Journal j);
} // namespace ripple

View File

@@ -32,19 +32,24 @@ namespace ripple {
/**
* @brief makeWalletDB Opens the wallet database and returns it.
* @param setup Path to the database and other opening parameters.
* @param j Journal.
* @return Unique pointer to the database descriptor.
*/
std::unique_ptr<DatabaseCon>
makeWalletDB(DatabaseCon::Setup const& setup);
makeWalletDB(DatabaseCon::Setup const& setup, beast::Journal j);
/**
* @brief makeTestWalletDB Opens a test wallet database with an arbitrary name.
* @param setup Path to the database and other opening parameters.
* @param dbname Name of the database.
* @param j Journal.
* @return Unique pointer to the database descriptor.
*/
std::unique_ptr<DatabaseCon>
makeTestWalletDB(DatabaseCon::Setup const& setup, std::string const& dbname);
makeTestWalletDB(
DatabaseCon::Setup const& setup,
std::string const& dbname,
beast::Journal j);
/**
* @brief getManifests Loads a manifest from the wallet database and stores it

View File

@@ -67,11 +67,12 @@ DatabasePairValid
makeLedgerDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup)
DatabaseCon::CheckpointerSetup const& checkpointerSetup,
beast::Journal j)
{
// ledger database
auto lgr{std::make_unique<DatabaseCon>(
setup, LgrDBName, LgrDBPragma, LgrDBInit, checkpointerSetup)};
setup, LgrDBName, LgrDBPragma, LgrDBInit, checkpointerSetup, j)};
lgr->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::lgrDBCache)));
@@ -80,7 +81,7 @@ makeLedgerDBs(
{
// transaction database
auto tx{std::make_unique<DatabaseCon>(
setup, TxDBName, TxDBPragma, TxDBInit, checkpointerSetup)};
setup, TxDBName, TxDBPragma, TxDBInit, checkpointerSetup, j)};
tx->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config.getValueFor(SizedItem::txnDBCache)));

View File

@@ -47,6 +47,7 @@ struct DatabasePairValid
* @param config Config object.
* @param setup Path to database and opening parameters.
* @param checkpointerSetup Database checkpointer setup.
* @param j Journal.
* @return Struct DatabasePairValid which contain unique pointers to ledger
* and transaction databases and flag if opening was successfull.
*/
@@ -54,7 +55,8 @@ DatabasePairValid
makeLedgerDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup);
DatabaseCon::CheckpointerSetup const& checkpointerSetup,
beast::Journal j);
/**
* @brief getMinLedgerSeq Returns minimum ledger sequence in given table.

View File

@@ -246,7 +246,7 @@ SQLiteDatabaseImp::makeLedgerDBs(
DatabaseCon::CheckpointerSetup const& checkpointerSetup)
{
auto [lgr, tx, res] =
detail::makeLedgerDBs(config, setup, checkpointerSetup);
detail::makeLedgerDBs(config, setup, checkpointerSetup, j_);
txdb_ = std::move(tx);
lgrdb_ = std::move(lgr);
return res;

View File

@@ -23,7 +23,7 @@
namespace ripple {
bool
doVacuumDB(DatabaseCon::Setup const& setup)
doVacuumDB(DatabaseCon::Setup const& setup, beast::Journal j)
{
boost::filesystem::path dbPath = setup.dataDir / TxDBName;
@@ -41,7 +41,7 @@ doVacuumDB(DatabaseCon::Setup const& setup)
}
auto txnDB =
std::make_unique<DatabaseCon>(setup, TxDBName, TxDBPragma, TxDBInit);
std::make_unique<DatabaseCon>(setup, TxDBName, TxDBPragma, TxDBInit, j);
auto& session = txnDB->getSession();
std::uint32_t pageSize;

View File

@@ -23,19 +23,22 @@
namespace ripple {
std::unique_ptr<DatabaseCon>
makeWalletDB(DatabaseCon::Setup const& setup)
makeWalletDB(DatabaseCon::Setup const& setup, beast::Journal j)
{
// wallet database
return std::make_unique<DatabaseCon>(
setup, WalletDBName, std::array<char const*, 0>(), WalletDBInit);
setup, WalletDBName, std::array<char const*, 0>(), WalletDBInit, j);
}
std::unique_ptr<DatabaseCon>
makeTestWalletDB(DatabaseCon::Setup const& setup, std::string const& dbname)
makeTestWalletDB(
DatabaseCon::Setup const& setup,
std::string const& dbname,
beast::Journal j)
{
// wallet database
return std::make_unique<DatabaseCon>(
setup, dbname.data(), std::array<char const*, 0>(), WalletDBInit);
setup, dbname.data(), std::array<char const*, 0>(), WalletDBInit, j);
}
void

View File

@@ -376,6 +376,12 @@ public:
int
getValueFor(SizedItem item, std::optional<std::size_t> node = std::nullopt)
const;
beast::Journal
journal() const
{
return j_;
}
};
FeeSetup

View File

@@ -23,6 +23,7 @@
#include <xrpld/app/main/DBInit.h>
#include <xrpld/core/Config.h>
#include <xrpld/core/SociDB.h>
#include <xrpld/perflog/PerfLog.h>
#include <boost/filesystem/path.hpp>
#include <mutex>
#include <optional>
@@ -114,7 +115,8 @@ public:
Setup const& setup,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
std::array<char const*, M> const& initSQL,
beast::Journal journal)
// Use temporary files or regular DB files?
: DatabaseCon(
setup.standAlone && setup.startUp != Config::LOAD &&
@@ -124,7 +126,8 @@ public:
: (setup.dataDir / dbName),
setup.commonPragma(),
pragma,
initSQL)
initSQL,
journal)
{
}
@@ -135,8 +138,9 @@ public:
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL,
CheckpointerSetup const& checkpointerSetup)
: DatabaseCon(setup, dbName, pragma, initSQL)
CheckpointerSetup const& checkpointerSetup,
beast::Journal journal)
: DatabaseCon(setup, dbName, pragma, initSQL, journal)
{
setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs);
}
@@ -146,8 +150,9 @@ public:
boost::filesystem::path const& dataDir,
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
: DatabaseCon(dataDir / dbName, nullptr, pragma, initSQL)
std::array<char const*, M> const& initSQL,
beast::Journal journal)
: DatabaseCon(dataDir / dbName, nullptr, pragma, initSQL, journal)
{
}
@@ -158,8 +163,9 @@ public:
std::string const& dbName,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL,
CheckpointerSetup const& checkpointerSetup)
: DatabaseCon(dataDir, dbName, pragma, initSQL)
CheckpointerSetup const& checkpointerSetup,
beast::Journal journal)
: DatabaseCon(dataDir, dbName, pragma, initSQL, journal)
{
setupCheckpointing(checkpointerSetup.jobQueue, *checkpointerSetup.logs);
}
@@ -175,7 +181,14 @@ public:
LockedSociSession
checkoutDb()
{
return LockedSociSession(session_, lock_);
using namespace std::chrono_literals;
LockedSociSession session = perf::measureDurationAndLog(
[&]() { return LockedSociSession(session_, lock_); },
"checkoutDb",
10ms,
j_);
return session;
}
private:
@@ -187,8 +200,9 @@ private:
boost::filesystem::path const& pPath,
std::vector<std::string> const* commonPragma,
std::array<char const*, N> const& pragma,
std::array<char const*, M> const& initSQL)
: session_(std::make_shared<soci::session>())
std::array<char const*, M> const& initSQL,
beast::Journal journal)
: session_(std::make_shared<soci::session>()), j_(journal)
{
open(*session_, "sqlite", pPath.string());
@@ -222,6 +236,8 @@ private:
// shared_ptr in this class. session_ will never be null.
std::shared_ptr<soci::session> const session_;
std::shared_ptr<Checkpointer> checkpointer_;
beast::Journal const j_;
};
// Return the checkpointer from its id. If the checkpointer no longer exists, an

View File

@@ -32,11 +32,13 @@
#include <xrpld/overlay/detail/PeerImp.h>
#include <xrpld/overlay/detail/Tuning.h>
#include <xrpld/overlay/predicates.h>
#include <xrpld/perflog/PerfLog.h>
#include <xrpl/basics/UptimeClock.h>
#include <xrpl/basics/base64.h>
#include <xrpl/basics/random.h>
#include <xrpl/basics/safe_cast.h>
#include <xrpl/beast/core/LexicalCast.h>
// #include <xrpl/beast/core/SemanticVersion.h>
#include <xrpl/protocol/digest.h>
#include <boost/algorithm/string/predicate.hpp>
@@ -894,8 +896,16 @@ PeerImp::onReadMessage(error_code ec, std::size_t bytes_transferred)
while (read_buffer_.size() > 0)
{
std::size_t bytes_consumed;
std::tie(bytes_consumed, ec) =
invokeProtocolMessage(read_buffer_.data(), *this, hint);
using namespace std::chrono_literals;
std::tie(bytes_consumed, ec) = perf::measureDurationAndLog(
[&]() {
return invokeProtocolMessage(read_buffer_.data(), *this, hint);
},
"invokeProtocolMessage",
350ms,
journal_);
if (ec)
return fail("onReadMessage", ec);
if (!socket_.is_open())

View File

@@ -179,6 +179,30 @@ make_PerfLog(
beast::Journal journal,
std::function<void()>&& signalStop);
template <typename Func, class Rep, class Period>
auto
measureDurationAndLog(
Func&& func,
const std::string& actionDescription,
std::chrono::duration<Rep, Period> maxDelay,
const beast::Journal& journal)
{
auto start_time = std::chrono::high_resolution_clock::now();
auto result = func();
auto end_time = std::chrono::high_resolution_clock::now();
auto duration = std::chrono::duration_cast<std::chrono::milliseconds>(
end_time - start_time);
if (duration > maxDelay)
{
JLOG(journal.warn())
<< actionDescription << " took " << duration.count() << " ms";
}
return result;
}
} // namespace perf
} // namespace ripple