mirror of
				https://github.com/Xahau/xahaud.git
				synced 2025-11-04 10:45:50 +00:00 
			
		
		
		
	Parallel ledger loader & I/O performance improvements:
- Only duplicate records from archive to writable during online_delete. - Log duration of nodestore reads. - Include nodestore counters in perf_log output. - Remove gratuitous nodestore activity counting. - Report initial sync duration in server_info and perfLog. - Report state_accounting in perfLog. - Make state_accounting durations more accurate. - Parallel ledger loader. - Config parameter to load ledgers on start.
This commit is contained in:
		
				
					committed by
					
						
						Nik Bougalis
					
				
			
			
				
	
			
			
			
						parent
						
							5a4654a0da
						
					
				
				
					commit
					7c12f01358
				
			@@ -8,7 +8,7 @@ Loop: ripple.app ripple.net
 | 
			
		||||
  ripple.app > ripple.net
 | 
			
		||||
 | 
			
		||||
Loop: ripple.app ripple.nodestore
 | 
			
		||||
  ripple.nodestore ~= ripple.app
 | 
			
		||||
  ripple.app > ripple.nodestore
 | 
			
		||||
 | 
			
		||||
Loop: ripple.app ripple.overlay
 | 
			
		||||
  ripple.overlay ~= ripple.app
 | 
			
		||||
@@ -28,6 +28,9 @@ Loop: ripple.basics ripple.core
 | 
			
		||||
Loop: ripple.basics ripple.json
 | 
			
		||||
  ripple.json ~= ripple.basics
 | 
			
		||||
 | 
			
		||||
Loop: ripple.basics ripple.nodestore
 | 
			
		||||
  ripple.nodestore > ripple.basics
 | 
			
		||||
 | 
			
		||||
Loop: ripple.basics ripple.protocol
 | 
			
		||||
  ripple.protocol > ripple.basics
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -29,7 +29,6 @@ ripple.net > ripple.beast
 | 
			
		||||
ripple.net > ripple.json
 | 
			
		||||
ripple.net > ripple.protocol
 | 
			
		||||
ripple.net > ripple.resource
 | 
			
		||||
ripple.nodestore > ripple.basics
 | 
			
		||||
ripple.nodestore > ripple.beast
 | 
			
		||||
ripple.nodestore > ripple.core
 | 
			
		||||
ripple.nodestore > ripple.json
 | 
			
		||||
 
 | 
			
		||||
@@ -1073,6 +1073,12 @@
 | 
			
		||||
#                           Note: the cache will not be created if online_delete
 | 
			
		||||
#                           is specified, or if shards are used.
 | 
			
		||||
#
 | 
			
		||||
#       fast_load           Boolean. If set, load the last persisted ledger
 | 
			
		||||
#                           from disk upon process start before syncing to
 | 
			
		||||
#                           the network. This is likely to improve performance
 | 
			
		||||
#                           if sufficient IOPS capacity is available.
 | 
			
		||||
#                           Default 0.
 | 
			
		||||
#
 | 
			
		||||
#   Optional keys for NuDB or RocksDB:
 | 
			
		||||
#
 | 
			
		||||
#       earliest_seq        The default is 32570 to match the XRP ledger
 | 
			
		||||
 
 | 
			
		||||
@@ -760,7 +760,7 @@ Ledger::updateNegativeUNL()
 | 
			
		||||
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
bool
 | 
			
		||||
Ledger::walkLedger(beast::Journal j) const
 | 
			
		||||
Ledger::walkLedger(beast::Journal j, bool parallel) const
 | 
			
		||||
{
 | 
			
		||||
    std::vector<SHAMapMissingNode> missingNodes1;
 | 
			
		||||
    std::vector<SHAMapMissingNode> missingNodes2;
 | 
			
		||||
@@ -773,7 +773,10 @@ Ledger::walkLedger(beast::Journal j) const
 | 
			
		||||
    }
 | 
			
		||||
    else
 | 
			
		||||
    {
 | 
			
		||||
        stateMap_->walkMap(missingNodes1, 32);
 | 
			
		||||
        if (parallel)
 | 
			
		||||
            stateMap_->walkMapParallel(missingNodes1, 32);
 | 
			
		||||
        else
 | 
			
		||||
            stateMap_->walkMap(missingNodes1, 32);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (!missingNodes1.empty())
 | 
			
		||||
 
 | 
			
		||||
@@ -337,7 +337,7 @@ public:
 | 
			
		||||
    updateSkipList();
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    walkLedger(beast::Journal j) const;
 | 
			
		||||
    walkLedger(beast::Journal j, bool parallel = false) const;
 | 
			
		||||
 | 
			
		||||
    bool
 | 
			
		||||
    assertSensible(beast::Journal ledgerJ) const;
 | 
			
		||||
 
 | 
			
		||||
@@ -279,6 +279,7 @@ public:
 | 
			
		||||
              perf::setup_PerfLog(
 | 
			
		||||
                  config_->section("perf"),
 | 
			
		||||
                  config_->CONFIG_DIR),
 | 
			
		||||
              *this,
 | 
			
		||||
              logs_->journal("PerfLog"),
 | 
			
		||||
              [this] { signalStop(); }))
 | 
			
		||||
 | 
			
		||||
@@ -1312,6 +1313,7 @@ ApplicationImp::setup()
 | 
			
		||||
    Pathfinder::initPathTable();
 | 
			
		||||
 | 
			
		||||
    auto const startUp = config_->START_UP;
 | 
			
		||||
    JLOG(m_journal.debug()) << "startUp: " << startUp;
 | 
			
		||||
    if (!config_->reporting())
 | 
			
		||||
    {
 | 
			
		||||
        if (startUp == Config::FRESH)
 | 
			
		||||
@@ -1333,7 +1335,18 @@ ApplicationImp::setup()
 | 
			
		||||
            {
 | 
			
		||||
                JLOG(m_journal.error())
 | 
			
		||||
                    << "The specified ledger could not be loaded.";
 | 
			
		||||
                return false;
 | 
			
		||||
                if (config_->FAST_LOAD)
 | 
			
		||||
                {
 | 
			
		||||
                    // Fall back to syncing from the network, such as
 | 
			
		||||
                    // when there's no existing data.
 | 
			
		||||
                    if (startUp == Config::NETWORK && !config_->standalone())
 | 
			
		||||
                        m_networkOPs->setNeedNetworkLedger();
 | 
			
		||||
                    startGenesisLedger();
 | 
			
		||||
                }
 | 
			
		||||
                else
 | 
			
		||||
                {
 | 
			
		||||
                    return false;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
        else if (startUp == Config::NETWORK)
 | 
			
		||||
@@ -2007,7 +2020,7 @@ ApplicationImp::loadOldLedger(
 | 
			
		||||
            return false;
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (!loadLedger->walkLedger(journal("Ledger")))
 | 
			
		||||
        if (!loadLedger->walkLedger(journal("Ledger"), true))
 | 
			
		||||
        {
 | 
			
		||||
            JLOG(m_journal.fatal()) << "Ledger is missing nodes.";
 | 
			
		||||
            assert(false);
 | 
			
		||||
 
 | 
			
		||||
@@ -632,17 +632,12 @@ run(int argc, char** argv)
 | 
			
		||||
        config->START_LEDGER = vm["ledgerfile"].as<std::string>();
 | 
			
		||||
        config->START_UP = Config::LOAD_FILE;
 | 
			
		||||
    }
 | 
			
		||||
    else if (vm.count("load"))
 | 
			
		||||
    else if (vm.count("load") || config->FAST_LOAD)
 | 
			
		||||
    {
 | 
			
		||||
        config->START_UP = Config::LOAD;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (vm.count("valid"))
 | 
			
		||||
    {
 | 
			
		||||
        config->START_VALID = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (vm.count("net"))
 | 
			
		||||
    if (vm.count("net") && !config->FAST_LOAD)
 | 
			
		||||
    {
 | 
			
		||||
        if ((config->START_UP == Config::LOAD) ||
 | 
			
		||||
            (config->START_UP == Config::REPLAY))
 | 
			
		||||
@@ -655,6 +650,11 @@ run(int argc, char** argv)
 | 
			
		||||
        config->START_UP = Config::NETWORK;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (vm.count("valid"))
 | 
			
		||||
    {
 | 
			
		||||
        config->START_VALID = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    // Override the RPC destination IP address. This must
 | 
			
		||||
    // happen after the config file is loaded.
 | 
			
		||||
    if (vm.count("rpc_ip"))
 | 
			
		||||
 
 | 
			
		||||
@@ -134,15 +134,17 @@ class NetworkOPsImp final : public NetworkOPs
 | 
			
		||||
        {
 | 
			
		||||
            explicit Counters() = default;
 | 
			
		||||
 | 
			
		||||
            std::uint32_t transitions = 0;
 | 
			
		||||
            std::uint64_t transitions = 0;
 | 
			
		||||
            std::chrono::microseconds dur = std::chrono::microseconds(0);
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        OperatingMode mode_ = OperatingMode::DISCONNECTED;
 | 
			
		||||
        std::array<Counters, 5> counters_;
 | 
			
		||||
        mutable std::mutex mutex_;
 | 
			
		||||
        std::chrono::system_clock::time_point start_ =
 | 
			
		||||
            std::chrono::system_clock::now();
 | 
			
		||||
        std::chrono::steady_clock::time_point start_ =
 | 
			
		||||
            std::chrono::steady_clock::now();
 | 
			
		||||
        std::chrono::steady_clock::time_point const processStart_ = start_;
 | 
			
		||||
        std::uint64_t initialSyncUs_{0};
 | 
			
		||||
        static std::array<Json::StaticString const, 5> const states_;
 | 
			
		||||
 | 
			
		||||
    public:
 | 
			
		||||
@@ -161,33 +163,27 @@ class NetworkOPsImp final : public NetworkOPs
 | 
			
		||||
        void
 | 
			
		||||
        mode(OperatingMode om);
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         * Json-formatted state accounting data.
 | 
			
		||||
         * 1st member: state accounting object.
 | 
			
		||||
         * 2nd member: duration in current state.
 | 
			
		||||
         */
 | 
			
		||||
        using StateCountersJson = std::pair<Json::Value, std::string>;
 | 
			
		||||
 | 
			
		||||
        /**
 | 
			
		||||
         * Output state counters in JSON format.
 | 
			
		||||
         *
 | 
			
		||||
         * @return JSON object.
 | 
			
		||||
         * @obj Json object to which to add state accounting data.
 | 
			
		||||
         */
 | 
			
		||||
        StateCountersJson
 | 
			
		||||
        json() const;
 | 
			
		||||
        void
 | 
			
		||||
        json(Json::Value& obj) const;
 | 
			
		||||
 | 
			
		||||
        struct CounterData
 | 
			
		||||
        {
 | 
			
		||||
            decltype(counters_) counters;
 | 
			
		||||
            decltype(mode_) mode;
 | 
			
		||||
            decltype(start_) start;
 | 
			
		||||
            decltype(initialSyncUs_) initialSyncUs;
 | 
			
		||||
        };
 | 
			
		||||
 | 
			
		||||
        CounterData
 | 
			
		||||
        getCounterData() const
 | 
			
		||||
        {
 | 
			
		||||
            std::lock_guard lock(mutex_);
 | 
			
		||||
            return {counters_, mode_, start_};
 | 
			
		||||
            return {counters_, mode_, start_, initialSyncUs_};
 | 
			
		||||
        }
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
@@ -595,6 +591,9 @@ public:
 | 
			
		||||
        waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    stateAccounting(Json::Value& obj) override;
 | 
			
		||||
 | 
			
		||||
private:
 | 
			
		||||
    void
 | 
			
		||||
    setTimer(
 | 
			
		||||
@@ -2613,8 +2612,7 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters)
 | 
			
		||||
            info[jss::published_ledger] = lpPublished->info().seq;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    std::tie(info[jss::state_accounting], info[jss::server_state_duration_us]) =
 | 
			
		||||
        accounting_.json();
 | 
			
		||||
    accounting_.json(info);
 | 
			
		||||
    info[jss::uptime] = UptimeClock::now().time_since_epoch().count();
 | 
			
		||||
    if (!app_.config().reporting())
 | 
			
		||||
    {
 | 
			
		||||
@@ -3925,6 +3923,12 @@ NetworkOPsImp::subValidations(InfoSub::ref isrListener)
 | 
			
		||||
        .second;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
NetworkOPsImp::stateAccounting(Json::Value& obj)
 | 
			
		||||
{
 | 
			
		||||
    accounting_.json(obj);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// <-- bool: true=erased, false=was not there
 | 
			
		||||
bool
 | 
			
		||||
NetworkOPsImp::unsubValidations(std::uint64_t uSeq)
 | 
			
		||||
@@ -4360,9 +4364,9 @@ NetworkOPsImp::getBookPage(
 | 
			
		||||
inline void
 | 
			
		||||
NetworkOPsImp::collect_metrics()
 | 
			
		||||
{
 | 
			
		||||
    auto [counters, mode, start] = accounting_.getCounterData();
 | 
			
		||||
    auto [counters, mode, start, initialSync] = accounting_.getCounterData();
 | 
			
		||||
    auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
 | 
			
		||||
        std::chrono::system_clock::now() - start);
 | 
			
		||||
        std::chrono::steady_clock::now() - start);
 | 
			
		||||
    counters[static_cast<std::size_t>(mode)].dur += current;
 | 
			
		||||
 | 
			
		||||
    std::lock_guard lock(m_statsMutex);
 | 
			
		||||
@@ -4398,10 +4402,17 @@ NetworkOPsImp::collect_metrics()
 | 
			
		||||
void
 | 
			
		||||
NetworkOPsImp::StateAccounting::mode(OperatingMode om)
 | 
			
		||||
{
 | 
			
		||||
    auto now = std::chrono::system_clock::now();
 | 
			
		||||
    auto now = std::chrono::steady_clock::now();
 | 
			
		||||
 | 
			
		||||
    std::lock_guard lock(mutex_);
 | 
			
		||||
    ++counters_[static_cast<std::size_t>(om)].transitions;
 | 
			
		||||
    if (om == OperatingMode::FULL &&
 | 
			
		||||
        counters_[static_cast<std::size_t>(om)].transitions == 1)
 | 
			
		||||
    {
 | 
			
		||||
        initialSyncUs_ = std::chrono::duration_cast<std::chrono::microseconds>(
 | 
			
		||||
                             now - processStart_)
 | 
			
		||||
                             .count();
 | 
			
		||||
    }
 | 
			
		||||
    counters_[static_cast<std::size_t>(mode_)].dur +=
 | 
			
		||||
        std::chrono::duration_cast<std::chrono::microseconds>(now - start_);
 | 
			
		||||
 | 
			
		||||
@@ -4409,27 +4420,27 @@ NetworkOPsImp::StateAccounting::mode(OperatingMode om)
 | 
			
		||||
    start_ = now;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
NetworkOPsImp::StateAccounting::StateCountersJson
 | 
			
		||||
NetworkOPsImp::StateAccounting::json() const
 | 
			
		||||
void
 | 
			
		||||
NetworkOPsImp::StateAccounting::json(Json::Value& obj) const
 | 
			
		||||
{
 | 
			
		||||
    auto [counters, mode, start] = getCounterData();
 | 
			
		||||
    auto [counters, mode, start, initialSync] = getCounterData();
 | 
			
		||||
    auto const current = std::chrono::duration_cast<std::chrono::microseconds>(
 | 
			
		||||
        std::chrono::system_clock::now() - start);
 | 
			
		||||
        std::chrono::steady_clock::now() - start);
 | 
			
		||||
    counters[static_cast<std::size_t>(mode)].dur += current;
 | 
			
		||||
 | 
			
		||||
    Json::Value ret = Json::objectValue;
 | 
			
		||||
 | 
			
		||||
    obj[jss::state_accounting] = Json::objectValue;
 | 
			
		||||
    for (std::size_t i = static_cast<std::size_t>(OperatingMode::DISCONNECTED);
 | 
			
		||||
         i <= static_cast<std::size_t>(OperatingMode::FULL);
 | 
			
		||||
         ++i)
 | 
			
		||||
    {
 | 
			
		||||
        ret[states_[i]] = Json::objectValue;
 | 
			
		||||
        auto& state = ret[states_[i]];
 | 
			
		||||
        state[jss::transitions] = counters[i].transitions;
 | 
			
		||||
        obj[jss::state_accounting][states_[i]] = Json::objectValue;
 | 
			
		||||
        auto& state = obj[jss::state_accounting][states_[i]];
 | 
			
		||||
        state[jss::transitions] = std::to_string(counters[i].transitions);
 | 
			
		||||
        state[jss::duration_us] = std::to_string(counters[i].dur.count());
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    return {ret, std::to_string(current.count())};
 | 
			
		||||
    obj[jss::server_state_duration_us] = std::to_string(current.count());
 | 
			
		||||
    if (initialSync)
 | 
			
		||||
        obj[jss::initial_sync_duration_us] = std::to_string(initialSync);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
 
 | 
			
		||||
@@ -269,6 +269,9 @@ public:
 | 
			
		||||
    forwardProposedTransaction(Json::Value const& jvObj) = 0;
 | 
			
		||||
    virtual void
 | 
			
		||||
    forwardProposedAccountTransaction(Json::Value const& jvObj) = 0;
 | 
			
		||||
 | 
			
		||||
    virtual void
 | 
			
		||||
    stateAccounting(Json::Value& obj) = 0;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
//------------------------------------------------------------------------------
 | 
			
		||||
 
 | 
			
		||||
@@ -27,6 +27,8 @@
 | 
			
		||||
#include <ripple/core/Pg.h>
 | 
			
		||||
#include <ripple/nodestore/impl/DatabaseRotatingImp.h>
 | 
			
		||||
 | 
			
		||||
#include <ripple/nodestore/Scheduler.h>
 | 
			
		||||
 | 
			
		||||
#include <boost/algorithm/string/predicate.hpp>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
@@ -242,7 +244,11 @@ bool
 | 
			
		||||
SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node)
 | 
			
		||||
{
 | 
			
		||||
    // Copy a single record from node to dbRotating_
 | 
			
		||||
    dbRotating_->fetchNodeObject(node.getHash().as_uint256());
 | 
			
		||||
    dbRotating_->fetchNodeObject(
 | 
			
		||||
        node.getHash().as_uint256(),
 | 
			
		||||
        0,
 | 
			
		||||
        NodeStore::FetchType::synchronous,
 | 
			
		||||
        true);
 | 
			
		||||
    if (!(++nodeCount % checkHealthInterval_))
 | 
			
		||||
    {
 | 
			
		||||
        if (health())
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,8 @@
 | 
			
		||||
#include <ripple/app/rdb/RelationalDBInterface_global.h>
 | 
			
		||||
#include <ripple/core/DatabaseCon.h>
 | 
			
		||||
#include <ripple/nodestore/DatabaseRotating.h>
 | 
			
		||||
 | 
			
		||||
#include <ripple/nodestore/Scheduler.h>
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <condition_variable>
 | 
			
		||||
@@ -197,7 +199,8 @@ private:
 | 
			
		||||
 | 
			
		||||
        for (auto const& key : cache.getKeys())
 | 
			
		||||
        {
 | 
			
		||||
            dbRotating_->fetchNodeObject(key);
 | 
			
		||||
            dbRotating_->fetchNodeObject(
 | 
			
		||||
                key, 0, NodeStore::FetchType::synchronous, true);
 | 
			
		||||
            if (!(++check % checkHealthInterval_) && health())
 | 
			
		||||
                return true;
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -35,6 +35,7 @@ class Journal;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
class Application;
 | 
			
		||||
namespace perf {
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
@@ -174,6 +175,7 @@ setup_PerfLog(Section const& section, boost::filesystem::path const& configDir);
 | 
			
		||||
std::unique_ptr<PerfLog>
 | 
			
		||||
make_PerfLog(
 | 
			
		||||
    PerfLog::Setup const& setup,
 | 
			
		||||
    Application& app,
 | 
			
		||||
    beast::Journal journal,
 | 
			
		||||
    std::function<void()>&& signalStop);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@
 | 
			
		||||
#include <ripple/core/JobTypes.h>
 | 
			
		||||
#include <ripple/json/json_writer.h>
 | 
			
		||||
#include <ripple/json/to_string.h>
 | 
			
		||||
#include <ripple/nodestore/DatabaseShard.h>
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <cstdlib>
 | 
			
		||||
@@ -296,16 +297,23 @@ PerfLogImp::report()
 | 
			
		||||
    }
 | 
			
		||||
    report[jss::hostid] = hostname_;
 | 
			
		||||
    report[jss::counters] = counters_.countersJson();
 | 
			
		||||
    report[jss::nodestore] = Json::objectValue;
 | 
			
		||||
    if (app_.getShardStore())
 | 
			
		||||
        app_.getShardStore()->getCountsJson(report[jss::nodestore]);
 | 
			
		||||
    else
 | 
			
		||||
        app_.getNodeStore().getCountsJson(report[jss::nodestore]);
 | 
			
		||||
    report[jss::current_activities] = counters_.currentJson();
 | 
			
		||||
    app_.getOPs().stateAccounting(report);
 | 
			
		||||
 | 
			
		||||
    logFile_ << Json::Compact{std::move(report)} << std::endl;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
PerfLogImp::PerfLogImp(
 | 
			
		||||
    Setup const& setup,
 | 
			
		||||
    Application& app,
 | 
			
		||||
    beast::Journal journal,
 | 
			
		||||
    std::function<void()>&& signalStop)
 | 
			
		||||
    : setup_(setup), j_(journal), signalStop_(std::move(signalStop))
 | 
			
		||||
    : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
 | 
			
		||||
{
 | 
			
		||||
    openLog();
 | 
			
		||||
}
 | 
			
		||||
@@ -491,10 +499,12 @@ setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
 | 
			
		||||
std::unique_ptr<PerfLog>
 | 
			
		||||
make_PerfLog(
 | 
			
		||||
    PerfLog::Setup const& setup,
 | 
			
		||||
    Application& app,
 | 
			
		||||
    beast::Journal journal,
 | 
			
		||||
    std::function<void()>&& signalStop)
 | 
			
		||||
{
 | 
			
		||||
    return std::make_unique<PerfLogImp>(setup, journal, std::move(signalStop));
 | 
			
		||||
    return std::make_unique<PerfLogImp>(
 | 
			
		||||
        setup, app, journal, std::move(signalStop));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace perf
 | 
			
		||||
 
 | 
			
		||||
@@ -123,6 +123,7 @@ class PerfLogImp : public PerfLog
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    Setup const setup_;
 | 
			
		||||
    Application& app_;
 | 
			
		||||
    beast::Journal const j_;
 | 
			
		||||
    std::function<void()> const signalStop_;
 | 
			
		||||
    Counters counters_{ripple::RPC::getHandlerNames(), JobTypes::instance()};
 | 
			
		||||
@@ -150,6 +151,7 @@ class PerfLogImp : public PerfLog
 | 
			
		||||
public:
 | 
			
		||||
    PerfLogImp(
 | 
			
		||||
        Setup const& setup,
 | 
			
		||||
        Application& app,
 | 
			
		||||
        beast::Journal journal,
 | 
			
		||||
        std::function<void()>&& signalStop);
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -271,6 +271,9 @@ public:
 | 
			
		||||
    // Enable the beta API version
 | 
			
		||||
    bool BETA_RPC_API = false;
 | 
			
		||||
 | 
			
		||||
    // First, attempt to load the latest ledger directly from disk.
 | 
			
		||||
    bool FAST_LOAD = false;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    Config();
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -426,6 +426,9 @@ Config::setup(
 | 
			
		||||
    std::string ledgerTxDbType;
 | 
			
		||||
    Section ledgerTxTablesSection = section("ledger_tx_tables");
 | 
			
		||||
    get_if_exists(ledgerTxTablesSection, "use_tx_tables", USE_TX_TABLES);
 | 
			
		||||
 | 
			
		||||
    Section& nodeDbSection{section(ConfigSection::nodeDatabase())};
 | 
			
		||||
    get_if_exists(nodeDbSection, "fast_load", FAST_LOAD);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
 
 | 
			
		||||
@@ -141,7 +141,8 @@ public:
 | 
			
		||||
    fetchNodeObject(
 | 
			
		||||
        uint256 const& hash,
 | 
			
		||||
        std::uint32_t ledgerSeq = 0,
 | 
			
		||||
        FetchType fetchType = FetchType::synchronous);
 | 
			
		||||
        FetchType fetchType = FetchType::synchronous,
 | 
			
		||||
        bool duplicate = false);
 | 
			
		||||
 | 
			
		||||
    /** Fetch an object without waiting.
 | 
			
		||||
        If I/O is required to determine whether or not the object is present,
 | 
			
		||||
@@ -375,7 +376,8 @@ private:
 | 
			
		||||
    fetchNodeObject(
 | 
			
		||||
        uint256 const& hash,
 | 
			
		||||
        std::uint32_t ledgerSeq,
 | 
			
		||||
        FetchReport& fetchReport) = 0;
 | 
			
		||||
        FetchReport& fetchReport,
 | 
			
		||||
        bool duplicate) = 0;
 | 
			
		||||
 | 
			
		||||
    /** Visit every object in the database
 | 
			
		||||
        This is usually called during import.
 | 
			
		||||
 
 | 
			
		||||
@@ -158,14 +158,17 @@ std::shared_ptr<NodeObject>
 | 
			
		||||
Database::fetchNodeObject(
 | 
			
		||||
    uint256 const& hash,
 | 
			
		||||
    std::uint32_t ledgerSeq,
 | 
			
		||||
    FetchType fetchType)
 | 
			
		||||
    FetchType fetchType,
 | 
			
		||||
    bool duplicate)
 | 
			
		||||
{
 | 
			
		||||
    FetchReport fetchReport(fetchType);
 | 
			
		||||
 | 
			
		||||
    using namespace std::chrono;
 | 
			
		||||
    auto const begin{steady_clock::now()};
 | 
			
		||||
 | 
			
		||||
    auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport)};
 | 
			
		||||
    auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
 | 
			
		||||
    auto dur = steady_clock::now() - begin;
 | 
			
		||||
    fetchDurationUs_ += duration_cast<microseconds>(dur).count();
 | 
			
		||||
    if (nodeObject)
 | 
			
		||||
    {
 | 
			
		||||
        ++fetchHitCount_;
 | 
			
		||||
@@ -173,8 +176,7 @@ Database::fetchNodeObject(
 | 
			
		||||
    }
 | 
			
		||||
    ++fetchTotalCount_;
 | 
			
		||||
 | 
			
		||||
    fetchReport.elapsed =
 | 
			
		||||
        duration_cast<milliseconds>(steady_clock::now() - begin);
 | 
			
		||||
    fetchReport.elapsed = duration_cast<milliseconds>(dur);
 | 
			
		||||
    scheduler_.onFetch(fetchReport);
 | 
			
		||||
    return nodeObject;
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -47,7 +47,8 @@ std::shared_ptr<NodeObject>
 | 
			
		||||
DatabaseNodeImp::fetchNodeObject(
 | 
			
		||||
    uint256 const& hash,
 | 
			
		||||
    std::uint32_t,
 | 
			
		||||
    FetchReport& fetchReport)
 | 
			
		||||
    FetchReport& fetchReport,
 | 
			
		||||
    bool duplicate)
 | 
			
		||||
{
 | 
			
		||||
    std::shared_ptr<NodeObject> nodeObject{
 | 
			
		||||
        cache_ ? cache_->fetch(hash) : nullptr};
 | 
			
		||||
@@ -70,13 +71,8 @@ DatabaseNodeImp::fetchNodeObject(
 | 
			
		||||
        switch (status)
 | 
			
		||||
        {
 | 
			
		||||
            case ok:
 | 
			
		||||
                ++fetchHitCount_;
 | 
			
		||||
                if (nodeObject)
 | 
			
		||||
                {
 | 
			
		||||
                    fetchSz_ += nodeObject->getData().size();
 | 
			
		||||
                    if (cache_)
 | 
			
		||||
                        cache_->canonicalize_replace_client(hash, nodeObject);
 | 
			
		||||
                }
 | 
			
		||||
                if (nodeObject && cache_)
 | 
			
		||||
                    cache_->canonicalize_replace_client(hash, nodeObject);
 | 
			
		||||
                break;
 | 
			
		||||
            case notFound:
 | 
			
		||||
                break;
 | 
			
		||||
 
 | 
			
		||||
@@ -140,7 +140,8 @@ private:
 | 
			
		||||
    fetchNodeObject(
 | 
			
		||||
        uint256 const& hash,
 | 
			
		||||
        std::uint32_t,
 | 
			
		||||
        FetchReport& fetchReport) override;
 | 
			
		||||
        FetchReport& fetchReport,
 | 
			
		||||
        bool duplicate) override;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
 | 
			
		||||
 
 | 
			
		||||
@@ -125,7 +125,8 @@ std::shared_ptr<NodeObject>
 | 
			
		||||
DatabaseRotatingImp::fetchNodeObject(
 | 
			
		||||
    uint256 const& hash,
 | 
			
		||||
    std::uint32_t,
 | 
			
		||||
    FetchReport& fetchReport)
 | 
			
		||||
    FetchReport& fetchReport,
 | 
			
		||||
    bool duplicate)
 | 
			
		||||
{
 | 
			
		||||
    auto fetch = [&](std::shared_ptr<Backend> const& backend) {
 | 
			
		||||
        Status status;
 | 
			
		||||
@@ -143,10 +144,6 @@ DatabaseRotatingImp::fetchNodeObject(
 | 
			
		||||
        switch (status)
 | 
			
		||||
        {
 | 
			
		||||
            case ok:
 | 
			
		||||
                ++fetchHitCount_;
 | 
			
		||||
                if (nodeObject)
 | 
			
		||||
                    fetchSz_ += nodeObject->getData().size();
 | 
			
		||||
                break;
 | 
			
		||||
            case notFound:
 | 
			
		||||
                break;
 | 
			
		||||
            case dataCorrupt:
 | 
			
		||||
@@ -183,7 +180,8 @@ DatabaseRotatingImp::fetchNodeObject(
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            // Update writable backend with data from the archive backend
 | 
			
		||||
            writable->store(nodeObject);
 | 
			
		||||
            if (duplicate)
 | 
			
		||||
                writable->store(nodeObject);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -88,7 +88,8 @@ private:
 | 
			
		||||
    fetchNodeObject(
 | 
			
		||||
        uint256 const& hash,
 | 
			
		||||
        std::uint32_t,
 | 
			
		||||
        FetchReport& fetchReport) override;
 | 
			
		||||
        FetchReport& fetchReport,
 | 
			
		||||
        bool duplicate) override;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override;
 | 
			
		||||
 
 | 
			
		||||
@@ -1384,7 +1384,8 @@ std::shared_ptr<NodeObject>
 | 
			
		||||
DatabaseShardImp::fetchNodeObject(
 | 
			
		||||
    uint256 const& hash,
 | 
			
		||||
    std::uint32_t ledgerSeq,
 | 
			
		||||
    FetchReport& fetchReport)
 | 
			
		||||
    FetchReport& fetchReport,
 | 
			
		||||
    bool duplicate)
 | 
			
		||||
{
 | 
			
		||||
    auto const shardIndex{seqToShardIndex(ledgerSeq)};
 | 
			
		||||
    std::shared_ptr<Shard> shard;
 | 
			
		||||
 
 | 
			
		||||
@@ -303,7 +303,8 @@ private:
 | 
			
		||||
    fetchNodeObject(
 | 
			
		||||
        uint256 const& hash,
 | 
			
		||||
        std::uint32_t ledgerSeq,
 | 
			
		||||
        FetchReport& fetchReport) override;
 | 
			
		||||
        FetchReport& fetchReport,
 | 
			
		||||
        bool duplicate) override;
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    for_each(std::function<void(std::shared_ptr<NodeObject>)> f) override
 | 
			
		||||
 
 | 
			
		||||
@@ -287,14 +287,15 @@ JSS(index);                 // in: LedgerEntry, DownloadShard
 | 
			
		||||
                            // out: STLedgerEntry,
 | 
			
		||||
                            //      LedgerEntry, TxHistory, LedgerData
 | 
			
		||||
JSS(info);                  // out: ServerInfo, ConsensusInfo, FetchInfo
 | 
			
		||||
JSS(internal_command);      // in: Internal
 | 
			
		||||
JSS(invalid_API_version);   // out: Many, when a request has an invalid
 | 
			
		||||
                            //      version
 | 
			
		||||
JSS(io_latency_ms);         // out: NetworkOPs
 | 
			
		||||
JSS(ip);                    // in: Connect, out: OverlayImpl
 | 
			
		||||
JSS(issuer);                // in: RipplePathFind, Subscribe,
 | 
			
		||||
                            //     Unsubscribe, BookOffers
 | 
			
		||||
                            // out: STPathSet, STAmount
 | 
			
		||||
JSS(initial_sync_duration_us);
 | 
			
		||||
JSS(internal_command);     // in: Internal
 | 
			
		||||
JSS(invalid_API_version);  // out: Many, when a request has an invalid
 | 
			
		||||
                           //      version
 | 
			
		||||
JSS(io_latency_ms);        // out: NetworkOPs
 | 
			
		||||
JSS(ip);                   // in: Connect, out: OverlayImpl
 | 
			
		||||
JSS(issuer);               // in: RipplePathFind, Subscribe,
 | 
			
		||||
                           //     Unsubscribe, BookOffers
 | 
			
		||||
                           // out: STPathSet, STAmount
 | 
			
		||||
JSS(job);
 | 
			
		||||
JSS(job_queue);
 | 
			
		||||
JSS(jobs);
 | 
			
		||||
 
 | 
			
		||||
@@ -328,6 +328,10 @@ public:
 | 
			
		||||
 | 
			
		||||
    void
 | 
			
		||||
    walkMap(std::vector<SHAMapMissingNode>& missingNodes, int maxMissing) const;
 | 
			
		||||
    void
 | 
			
		||||
    walkMapParallel(
 | 
			
		||||
        std::vector<SHAMapMissingNode>& missingNodes,
 | 
			
		||||
        int maxMissing) const;
 | 
			
		||||
    bool
 | 
			
		||||
    deepCompare(SHAMap& other) const;  // Intended for debug/test only
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -20,6 +20,10 @@
 | 
			
		||||
#include <ripple/basics/contract.h>
 | 
			
		||||
#include <ripple/shamap/SHAMap.h>
 | 
			
		||||
 | 
			
		||||
#include <array>
 | 
			
		||||
#include <stack>
 | 
			
		||||
#include <vector>
 | 
			
		||||
 | 
			
		||||
namespace ripple {
 | 
			
		||||
 | 
			
		||||
// This code is used to compare another node's transaction tree
 | 
			
		||||
@@ -286,4 +290,81 @@ SHAMap::walkMap(std::vector<SHAMapMissingNode>& missingNodes, int maxMissing)
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
SHAMap::walkMapParallel(
 | 
			
		||||
    std::vector<SHAMapMissingNode>& missingNodes,
 | 
			
		||||
    int maxMissing) const
 | 
			
		||||
{
 | 
			
		||||
    if (!root_->isInner())  // root_ is only node, and we have it
 | 
			
		||||
        return;
 | 
			
		||||
 | 
			
		||||
    using StackEntry = std::shared_ptr<SHAMapInnerNode>;
 | 
			
		||||
    std::array<std::shared_ptr<SHAMapTreeNode>, 16> topChildren;
 | 
			
		||||
    {
 | 
			
		||||
        auto const& innerRoot =
 | 
			
		||||
            std::static_pointer_cast<SHAMapInnerNode>(root_);
 | 
			
		||||
        for (int i = 0; i < 16; ++i)
 | 
			
		||||
        {
 | 
			
		||||
            if (!innerRoot->isEmptyBranch(i))
 | 
			
		||||
                topChildren[i] = descendNoStore(innerRoot, i);
 | 
			
		||||
        }
 | 
			
		||||
    }
 | 
			
		||||
    std::vector<std::thread> workers;
 | 
			
		||||
    workers.reserve(16);
 | 
			
		||||
 | 
			
		||||
    std::array<std::stack<StackEntry, std::vector<StackEntry>>, 16> nodeStacks;
 | 
			
		||||
 | 
			
		||||
    for (int rootChildIndex = 0; rootChildIndex < 16; ++rootChildIndex)
 | 
			
		||||
    {
 | 
			
		||||
        auto const& child = topChildren[rootChildIndex];
 | 
			
		||||
        if (!child || !child->isInner())
 | 
			
		||||
            continue;
 | 
			
		||||
 | 
			
		||||
        nodeStacks[rootChildIndex].push(
 | 
			
		||||
            std::static_pointer_cast<SHAMapInnerNode>(child));
 | 
			
		||||
 | 
			
		||||
        JLOG(journal_.debug()) << "starting worker " << rootChildIndex;
 | 
			
		||||
        std::mutex m;
 | 
			
		||||
        workers.push_back(std::thread(
 | 
			
		||||
            [&m, &missingNodes, &maxMissing, this](
 | 
			
		||||
                std::stack<StackEntry, std::vector<StackEntry>> nodeStack) {
 | 
			
		||||
                while (!nodeStack.empty())
 | 
			
		||||
                {
 | 
			
		||||
                    std::shared_ptr<SHAMapInnerNode> node =
 | 
			
		||||
                        std::move(nodeStack.top());
 | 
			
		||||
                    assert(node);
 | 
			
		||||
                    nodeStack.pop();
 | 
			
		||||
 | 
			
		||||
                    for (int i = 0; i < 16; ++i)
 | 
			
		||||
                    {
 | 
			
		||||
                        if (node->isEmptyBranch(i))
 | 
			
		||||
                            continue;
 | 
			
		||||
                        std::shared_ptr<SHAMapTreeNode> nextNode =
 | 
			
		||||
                            descendNoStore(node, i);
 | 
			
		||||
 | 
			
		||||
                        if (nextNode)
 | 
			
		||||
                        {
 | 
			
		||||
                            if (nextNode->isInner())
 | 
			
		||||
                                nodeStack.push(
 | 
			
		||||
                                    std::static_pointer_cast<SHAMapInnerNode>(
 | 
			
		||||
                                        nextNode));
 | 
			
		||||
                        }
 | 
			
		||||
                        else
 | 
			
		||||
                        {
 | 
			
		||||
                            std::lock_guard l{m};
 | 
			
		||||
                            missingNodes.emplace_back(
 | 
			
		||||
                                type_, node->getChildHash(i));
 | 
			
		||||
                            if (--maxMissing <= 0)
 | 
			
		||||
                                return;
 | 
			
		||||
                        }
 | 
			
		||||
                    }
 | 
			
		||||
                }
 | 
			
		||||
            },
 | 
			
		||||
            std::move(nodeStacks[rootChildIndex])));
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    for (std::thread& worker : workers)
 | 
			
		||||
        worker.join();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
}  // namespace ripple
 | 
			
		||||
 
 | 
			
		||||
@@ -49,10 +49,11 @@ class PerfLog_test : public beast::unit_test::suite
 | 
			
		||||
 | 
			
		||||
    struct Fixture
 | 
			
		||||
    {
 | 
			
		||||
        Application& app_;
 | 
			
		||||
        beast::Journal j_;
 | 
			
		||||
        bool stopSignaled{false};
 | 
			
		||||
 | 
			
		||||
        explicit Fixture(beast::Journal j) : j_(j)
 | 
			
		||||
        explicit Fixture(Application& app, beast::Journal j) : app_(app), j_(j)
 | 
			
		||||
        {
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
@@ -103,7 +104,7 @@ class PerfLog_test : public beast::unit_test::suite
 | 
			
		||||
            perf::PerfLog::Setup const setup{
 | 
			
		||||
                withFile == WithFile::no ? "" : logFile(), logInterval()};
 | 
			
		||||
            return perf::make_PerfLog(
 | 
			
		||||
                setup, j_, [this]() { return signalStop(); });
 | 
			
		||||
                setup, app_, j_, [this]() { return signalStop(); });
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        // Block until the log file has grown in size, indicating that the
 | 
			
		||||
@@ -192,7 +193,7 @@ public:
 | 
			
		||||
 | 
			
		||||
        {
 | 
			
		||||
            // Verify a PerfLog creates its file when constructed.
 | 
			
		||||
            Fixture fixture{j_};
 | 
			
		||||
            Fixture fixture{env_.app(), j_};
 | 
			
		||||
            BEAST_EXPECT(!exists(fixture.logFile()));
 | 
			
		||||
 | 
			
		||||
            auto perfLog{fixture.perfLog(WithFile::yes)};
 | 
			
		||||
@@ -204,7 +205,7 @@ public:
 | 
			
		||||
            // Create a file where PerfLog wants to put its directory.
 | 
			
		||||
            // Make sure that PerfLog tries to shutdown the server since it
 | 
			
		||||
            // can't open its file.
 | 
			
		||||
            Fixture fixture{j_};
 | 
			
		||||
            Fixture fixture{env_.app(), j_};
 | 
			
		||||
            if (!BEAST_EXPECT(!exists(fixture.logDir())))
 | 
			
		||||
                return;
 | 
			
		||||
 | 
			
		||||
@@ -238,7 +239,7 @@ public:
 | 
			
		||||
            // Put a write protected file where PerfLog wants to write its
 | 
			
		||||
            // file.  Make sure that PerfLog tries to shutdown the server
 | 
			
		||||
            // since it can't open its file.
 | 
			
		||||
            Fixture fixture{j_};
 | 
			
		||||
            Fixture fixture{env_.app(), j_};
 | 
			
		||||
            if (!BEAST_EXPECT(!exists(fixture.logDir())))
 | 
			
		||||
                return;
 | 
			
		||||
 | 
			
		||||
@@ -297,7 +298,7 @@ public:
 | 
			
		||||
    {
 | 
			
		||||
        // Exercise the rpc interfaces of PerfLog.
 | 
			
		||||
        // Start up the PerfLog that we'll use for testing.
 | 
			
		||||
        Fixture fixture{j_};
 | 
			
		||||
        Fixture fixture{env_.app(), j_};
 | 
			
		||||
        auto perfLog{fixture.perfLog(withFile)};
 | 
			
		||||
        perfLog->start();
 | 
			
		||||
 | 
			
		||||
@@ -502,7 +503,7 @@ public:
 | 
			
		||||
 | 
			
		||||
        // Exercise the jobs interfaces of PerfLog.
 | 
			
		||||
        // Start up the PerfLog that we'll use for testing.
 | 
			
		||||
        Fixture fixture{j_};
 | 
			
		||||
        Fixture fixture{env_.app(), j_};
 | 
			
		||||
        auto perfLog{fixture.perfLog(withFile)};
 | 
			
		||||
        perfLog->start();
 | 
			
		||||
 | 
			
		||||
@@ -849,7 +850,7 @@ public:
 | 
			
		||||
        // the PerLog behaves as well as possible if an invalid ID is passed.
 | 
			
		||||
 | 
			
		||||
        // Start up the PerfLog that we'll use for testing.
 | 
			
		||||
        Fixture fixture{j_};
 | 
			
		||||
        Fixture fixture{env_.app(), j_};
 | 
			
		||||
        auto perfLog{fixture.perfLog(withFile)};
 | 
			
		||||
        perfLog->start();
 | 
			
		||||
 | 
			
		||||
@@ -989,7 +990,7 @@ public:
 | 
			
		||||
        // the interface and see that it doesn't crash.
 | 
			
		||||
        using namespace boost::filesystem;
 | 
			
		||||
 | 
			
		||||
        Fixture fixture{j_};
 | 
			
		||||
        Fixture fixture{env_.app(), j_};
 | 
			
		||||
        BEAST_EXPECT(!exists(fixture.logDir()));
 | 
			
		||||
 | 
			
		||||
        auto perfLog{fixture.perfLog(withFile)};
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user