From 7c12f0135897361398917ad2c8cda888249d42ae Mon Sep 17 00:00:00 2001 From: Mark Travis Date: Tue, 14 Dec 2021 17:08:59 -0800 Subject: [PATCH] Parallel ledger loader & I/O performance improvements: - Only duplicate records from archive to writable during online_delete. - Log duration of nodestore reads. - Include nodestore counters in perf_log output. - Remove gratuitous nodestore activity counting. - Report initial sync duration in server_info and perfLog. - Report state_accounting in perfLog. - Make state_accounting durations more accurate. - Parallel ledger loader. - Config parameter to load ledgers on start. --- Builds/levelization/results/loops.txt | 5 +- Builds/levelization/results/ordering.txt | 1 - cfg/rippled-example.cfg | 6 ++ src/ripple/app/ledger/Ledger.cpp | 7 +- src/ripple/app/ledger/Ledger.h | 2 +- src/ripple/app/main/Application.cpp | 17 +++- src/ripple/app/main/Main.cpp | 14 ++-- src/ripple/app/misc/NetworkOPs.cpp | 71 +++++++++------- src/ripple/app/misc/NetworkOPs.h | 3 + src/ripple/app/misc/SHAMapStoreImp.cpp | 8 +- src/ripple/app/misc/SHAMapStoreImp.h | 5 +- src/ripple/basics/PerfLog.h | 2 + src/ripple/basics/impl/PerfLogImp.cpp | 14 +++- src/ripple/basics/impl/PerfLogImp.h | 2 + src/ripple/core/Config.h | 3 + src/ripple/core/impl/Config.cpp | 3 + src/ripple/nodestore/Database.h | 6 +- src/ripple/nodestore/impl/Database.cpp | 10 ++- src/ripple/nodestore/impl/DatabaseNodeImp.cpp | 12 +-- src/ripple/nodestore/impl/DatabaseNodeImp.h | 3 +- .../nodestore/impl/DatabaseRotatingImp.cpp | 10 +-- .../nodestore/impl/DatabaseRotatingImp.h | 3 +- .../nodestore/impl/DatabaseShardImp.cpp | 3 +- src/ripple/nodestore/impl/DatabaseShardImp.h | 3 +- src/ripple/protocol/jss.h | 17 ++-- src/ripple/shamap/SHAMap.h | 4 + src/ripple/shamap/impl/SHAMapDelta.cpp | 81 +++++++++++++++++++ src/test/basics/PerfLog_test.cpp | 19 ++--- 28 files changed, 245 insertions(+), 89 deletions(-) diff --git a/Builds/levelization/results/loops.txt b/Builds/levelization/results/loops.txt index 7d7215441..c6c561721 100644 --- a/Builds/levelization/results/loops.txt +++ b/Builds/levelization/results/loops.txt @@ -8,7 +8,7 @@ Loop: ripple.app ripple.net ripple.app > ripple.net Loop: ripple.app ripple.nodestore - ripple.nodestore ~= ripple.app + ripple.app > ripple.nodestore Loop: ripple.app ripple.overlay ripple.overlay ~= ripple.app @@ -28,6 +28,9 @@ Loop: ripple.basics ripple.core Loop: ripple.basics ripple.json ripple.json ~= ripple.basics +Loop: ripple.basics ripple.nodestore + ripple.nodestore > ripple.basics + Loop: ripple.basics ripple.protocol ripple.protocol > ripple.basics diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index de0d6f7c0..65281daad 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -29,7 +29,6 @@ ripple.net > ripple.beast ripple.net > ripple.json ripple.net > ripple.protocol ripple.net > ripple.resource -ripple.nodestore > ripple.basics ripple.nodestore > ripple.beast ripple.nodestore > ripple.core ripple.nodestore > ripple.json diff --git a/cfg/rippled-example.cfg b/cfg/rippled-example.cfg index 4c8b22d50..b9407d0da 100644 --- a/cfg/rippled-example.cfg +++ b/cfg/rippled-example.cfg @@ -1073,6 +1073,12 @@ # Note: the cache will not be created if online_delete # is specified, or if shards are used. # +# fast_load Boolean. If set, load the last persisted ledger +# from disk upon process start before syncing to +# the network. This is likely to improve performance +# if sufficient IOPS capacity is available. +# Default 0. +# # Optional keys for NuDB or RocksDB: # # earliest_seq The default is 32570 to match the XRP ledger diff --git a/src/ripple/app/ledger/Ledger.cpp b/src/ripple/app/ledger/Ledger.cpp index 0757bb4f4..4a12a3999 100644 --- a/src/ripple/app/ledger/Ledger.cpp +++ b/src/ripple/app/ledger/Ledger.cpp @@ -760,7 +760,7 @@ Ledger::updateNegativeUNL() //------------------------------------------------------------------------------ bool -Ledger::walkLedger(beast::Journal j) const +Ledger::walkLedger(beast::Journal j, bool parallel) const { std::vector missingNodes1; std::vector missingNodes2; @@ -773,7 +773,10 @@ Ledger::walkLedger(beast::Journal j) const } else { - stateMap_->walkMap(missingNodes1, 32); + if (parallel) + stateMap_->walkMapParallel(missingNodes1, 32); + else + stateMap_->walkMap(missingNodes1, 32); } if (!missingNodes1.empty()) diff --git a/src/ripple/app/ledger/Ledger.h b/src/ripple/app/ledger/Ledger.h index d40e73feb..caf68b3ea 100644 --- a/src/ripple/app/ledger/Ledger.h +++ b/src/ripple/app/ledger/Ledger.h @@ -337,7 +337,7 @@ public: updateSkipList(); bool - walkLedger(beast::Journal j) const; + walkLedger(beast::Journal j, bool parallel = false) const; bool assertSensible(beast::Journal ledgerJ) const; diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 60381b158..8040decea 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -279,6 +279,7 @@ public: perf::setup_PerfLog( config_->section("perf"), config_->CONFIG_DIR), + *this, logs_->journal("PerfLog"), [this] { signalStop(); })) @@ -1312,6 +1313,7 @@ ApplicationImp::setup() Pathfinder::initPathTable(); auto const startUp = config_->START_UP; + JLOG(m_journal.debug()) << "startUp: " << startUp; if (!config_->reporting()) { if (startUp == Config::FRESH) @@ -1333,7 +1335,18 @@ ApplicationImp::setup() { JLOG(m_journal.error()) << "The specified ledger could not be loaded."; - return false; + if (config_->FAST_LOAD) + { + // Fall back to syncing from the network, such as + // when there's no existing data. + if (startUp == Config::NETWORK && !config_->standalone()) + m_networkOPs->setNeedNetworkLedger(); + startGenesisLedger(); + } + else + { + return false; + } } } else if (startUp == Config::NETWORK) @@ -2007,7 +2020,7 @@ ApplicationImp::loadOldLedger( return false; } - if (!loadLedger->walkLedger(journal("Ledger"))) + if (!loadLedger->walkLedger(journal("Ledger"), true)) { JLOG(m_journal.fatal()) << "Ledger is missing nodes."; assert(false); diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index 214233953..34ec2989b 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -632,17 +632,12 @@ run(int argc, char** argv) config->START_LEDGER = vm["ledgerfile"].as(); config->START_UP = Config::LOAD_FILE; } - else if (vm.count("load")) + else if (vm.count("load") || config->FAST_LOAD) { config->START_UP = Config::LOAD; } - if (vm.count("valid")) - { - config->START_VALID = true; - } - - if (vm.count("net")) + if (vm.count("net") && !config->FAST_LOAD) { if ((config->START_UP == Config::LOAD) || (config->START_UP == Config::REPLAY)) @@ -655,6 +650,11 @@ run(int argc, char** argv) config->START_UP = Config::NETWORK; } + if (vm.count("valid")) + { + config->START_VALID = true; + } + // Override the RPC destination IP address. This must // happen after the config file is loaded. if (vm.count("rpc_ip")) diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index ed93caa0f..574ce040b 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -134,15 +134,17 @@ class NetworkOPsImp final : public NetworkOPs { explicit Counters() = default; - std::uint32_t transitions = 0; + std::uint64_t transitions = 0; std::chrono::microseconds dur = std::chrono::microseconds(0); }; OperatingMode mode_ = OperatingMode::DISCONNECTED; std::array counters_; mutable std::mutex mutex_; - std::chrono::system_clock::time_point start_ = - std::chrono::system_clock::now(); + std::chrono::steady_clock::time_point start_ = + std::chrono::steady_clock::now(); + std::chrono::steady_clock::time_point const processStart_ = start_; + std::uint64_t initialSyncUs_{0}; static std::array const states_; public: @@ -161,33 +163,27 @@ class NetworkOPsImp final : public NetworkOPs void mode(OperatingMode om); - /** - * Json-formatted state accounting data. - * 1st member: state accounting object. - * 2nd member: duration in current state. - */ - using StateCountersJson = std::pair; - /** * Output state counters in JSON format. * - * @return JSON object. + * @obj Json object to which to add state accounting data. */ - StateCountersJson - json() const; + void + json(Json::Value& obj) const; struct CounterData { decltype(counters_) counters; decltype(mode_) mode; decltype(start_) start; + decltype(initialSyncUs_) initialSyncUs; }; CounterData getCounterData() const { std::lock_guard lock(mutex_); - return {counters_, mode_, start_}; + return {counters_, mode_, start_, initialSyncUs_}; } }; @@ -595,6 +591,9 @@ public: waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); } + void + stateAccounting(Json::Value& obj) override; + private: void setTimer( @@ -2613,8 +2612,7 @@ NetworkOPsImp::getServerInfo(bool human, bool admin, bool counters) info[jss::published_ledger] = lpPublished->info().seq; } - std::tie(info[jss::state_accounting], info[jss::server_state_duration_us]) = - accounting_.json(); + accounting_.json(info); info[jss::uptime] = UptimeClock::now().time_since_epoch().count(); if (!app_.config().reporting()) { @@ -3925,6 +3923,12 @@ NetworkOPsImp::subValidations(InfoSub::ref isrListener) .second; } +void +NetworkOPsImp::stateAccounting(Json::Value& obj) +{ + accounting_.json(obj); +} + // <-- bool: true=erased, false=was not there bool NetworkOPsImp::unsubValidations(std::uint64_t uSeq) @@ -4360,9 +4364,9 @@ NetworkOPsImp::getBookPage( inline void NetworkOPsImp::collect_metrics() { - auto [counters, mode, start] = accounting_.getCounterData(); + auto [counters, mode, start, initialSync] = accounting_.getCounterData(); auto const current = std::chrono::duration_cast( - std::chrono::system_clock::now() - start); + std::chrono::steady_clock::now() - start); counters[static_cast(mode)].dur += current; std::lock_guard lock(m_statsMutex); @@ -4398,10 +4402,17 @@ NetworkOPsImp::collect_metrics() void NetworkOPsImp::StateAccounting::mode(OperatingMode om) { - auto now = std::chrono::system_clock::now(); + auto now = std::chrono::steady_clock::now(); std::lock_guard lock(mutex_); ++counters_[static_cast(om)].transitions; + if (om == OperatingMode::FULL && + counters_[static_cast(om)].transitions == 1) + { + initialSyncUs_ = std::chrono::duration_cast( + now - processStart_) + .count(); + } counters_[static_cast(mode_)].dur += std::chrono::duration_cast(now - start_); @@ -4409,27 +4420,27 @@ NetworkOPsImp::StateAccounting::mode(OperatingMode om) start_ = now; } -NetworkOPsImp::StateAccounting::StateCountersJson -NetworkOPsImp::StateAccounting::json() const +void +NetworkOPsImp::StateAccounting::json(Json::Value& obj) const { - auto [counters, mode, start] = getCounterData(); + auto [counters, mode, start, initialSync] = getCounterData(); auto const current = std::chrono::duration_cast( - std::chrono::system_clock::now() - start); + std::chrono::steady_clock::now() - start); counters[static_cast(mode)].dur += current; - Json::Value ret = Json::objectValue; - + obj[jss::state_accounting] = Json::objectValue; for (std::size_t i = static_cast(OperatingMode::DISCONNECTED); i <= static_cast(OperatingMode::FULL); ++i) { - ret[states_[i]] = Json::objectValue; - auto& state = ret[states_[i]]; - state[jss::transitions] = counters[i].transitions; + obj[jss::state_accounting][states_[i]] = Json::objectValue; + auto& state = obj[jss::state_accounting][states_[i]]; + state[jss::transitions] = std::to_string(counters[i].transitions); state[jss::duration_us] = std::to_string(counters[i].dur.count()); } - - return {ret, std::to_string(current.count())}; + obj[jss::server_state_duration_us] = std::to_string(current.count()); + if (initialSync) + obj[jss::initial_sync_duration_us] = std::to_string(initialSync); } //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/NetworkOPs.h b/src/ripple/app/misc/NetworkOPs.h index 5e00c4cc6..1cf53f126 100644 --- a/src/ripple/app/misc/NetworkOPs.h +++ b/src/ripple/app/misc/NetworkOPs.h @@ -269,6 +269,9 @@ public: forwardProposedTransaction(Json::Value const& jvObj) = 0; virtual void forwardProposedAccountTransaction(Json::Value const& jvObj) = 0; + + virtual void + stateAccounting(Json::Value& obj) = 0; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 5fce4cd72..42a6bc188 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -27,6 +27,8 @@ #include #include +#include + #include namespace ripple { @@ -242,7 +244,11 @@ bool SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node) { // Copy a single record from node to dbRotating_ - dbRotating_->fetchNodeObject(node.getHash().as_uint256()); + dbRotating_->fetchNodeObject( + node.getHash().as_uint256(), + 0, + NodeStore::FetchType::synchronous, + true); if (!(++nodeCount % checkHealthInterval_)) { if (health()) diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 7119bd3af..e3528faaa 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -26,6 +26,8 @@ #include #include #include + +#include #include #include #include @@ -197,7 +199,8 @@ private: for (auto const& key : cache.getKeys()) { - dbRotating_->fetchNodeObject(key); + dbRotating_->fetchNodeObject( + key, 0, NodeStore::FetchType::synchronous, true); if (!(++check % checkHealthInterval_) && health()) return true; } diff --git a/src/ripple/basics/PerfLog.h b/src/ripple/basics/PerfLog.h index f62fd4f78..3d1cb3717 100644 --- a/src/ripple/basics/PerfLog.h +++ b/src/ripple/basics/PerfLog.h @@ -35,6 +35,7 @@ class Journal; } namespace ripple { +class Application; namespace perf { /** @@ -174,6 +175,7 @@ setup_PerfLog(Section const& section, boost::filesystem::path const& configDir); std::unique_ptr make_PerfLog( PerfLog::Setup const& setup, + Application& app, beast::Journal journal, std::function&& signalStop); diff --git a/src/ripple/basics/impl/PerfLogImp.cpp b/src/ripple/basics/impl/PerfLogImp.cpp index c09545d5f..17d2242ec 100644 --- a/src/ripple/basics/impl/PerfLogImp.cpp +++ b/src/ripple/basics/impl/PerfLogImp.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -296,16 +297,23 @@ PerfLogImp::report() } report[jss::hostid] = hostname_; report[jss::counters] = counters_.countersJson(); + report[jss::nodestore] = Json::objectValue; + if (app_.getShardStore()) + app_.getShardStore()->getCountsJson(report[jss::nodestore]); + else + app_.getNodeStore().getCountsJson(report[jss::nodestore]); report[jss::current_activities] = counters_.currentJson(); + app_.getOPs().stateAccounting(report); logFile_ << Json::Compact{std::move(report)} << std::endl; } PerfLogImp::PerfLogImp( Setup const& setup, + Application& app, beast::Journal journal, std::function&& signalStop) - : setup_(setup), j_(journal), signalStop_(std::move(signalStop)) + : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop)) { openLog(); } @@ -491,10 +499,12 @@ setup_PerfLog(Section const& section, boost::filesystem::path const& configDir) std::unique_ptr make_PerfLog( PerfLog::Setup const& setup, + Application& app, beast::Journal journal, std::function&& signalStop) { - return std::make_unique(setup, journal, std::move(signalStop)); + return std::make_unique( + setup, app, journal, std::move(signalStop)); } } // namespace perf diff --git a/src/ripple/basics/impl/PerfLogImp.h b/src/ripple/basics/impl/PerfLogImp.h index 412f4e2e7..493c1dc1a 100644 --- a/src/ripple/basics/impl/PerfLogImp.h +++ b/src/ripple/basics/impl/PerfLogImp.h @@ -123,6 +123,7 @@ class PerfLogImp : public PerfLog }; Setup const setup_; + Application& app_; beast::Journal const j_; std::function const signalStop_; Counters counters_{ripple::RPC::getHandlerNames(), JobTypes::instance()}; @@ -150,6 +151,7 @@ class PerfLogImp : public PerfLog public: PerfLogImp( Setup const& setup, + Application& app, beast::Journal journal, std::function&& signalStop); diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index 25f160558..034f81e38 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -271,6 +271,9 @@ public: // Enable the beta API version bool BETA_RPC_API = false; + // First, attempt to load the latest ledger directly from disk. + bool FAST_LOAD = false; + public: Config(); diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index ae65709a1..50b487f8b 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -426,6 +426,9 @@ Config::setup( std::string ledgerTxDbType; Section ledgerTxTablesSection = section("ledger_tx_tables"); get_if_exists(ledgerTxTablesSection, "use_tx_tables", USE_TX_TABLES); + + Section& nodeDbSection{section(ConfigSection::nodeDatabase())}; + get_if_exists(nodeDbSection, "fast_load", FAST_LOAD); } void diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index c2c5b5b88..f9e8c2418 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -141,7 +141,8 @@ public: fetchNodeObject( uint256 const& hash, std::uint32_t ledgerSeq = 0, - FetchType fetchType = FetchType::synchronous); + FetchType fetchType = FetchType::synchronous, + bool duplicate = false); /** Fetch an object without waiting. If I/O is required to determine whether or not the object is present, @@ -375,7 +376,8 @@ private: fetchNodeObject( uint256 const& hash, std::uint32_t ledgerSeq, - FetchReport& fetchReport) = 0; + FetchReport& fetchReport, + bool duplicate) = 0; /** Visit every object in the database This is usually called during import. diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index de56c768c..da062a682 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -158,14 +158,17 @@ std::shared_ptr Database::fetchNodeObject( uint256 const& hash, std::uint32_t ledgerSeq, - FetchType fetchType) + FetchType fetchType, + bool duplicate) { FetchReport fetchReport(fetchType); using namespace std::chrono; auto const begin{steady_clock::now()}; - auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport)}; + auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)}; + auto dur = steady_clock::now() - begin; + fetchDurationUs_ += duration_cast(dur).count(); if (nodeObject) { ++fetchHitCount_; @@ -173,8 +176,7 @@ Database::fetchNodeObject( } ++fetchTotalCount_; - fetchReport.elapsed = - duration_cast(steady_clock::now() - begin); + fetchReport.elapsed = duration_cast(dur); scheduler_.onFetch(fetchReport); return nodeObject; } diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp index e28144b4c..b5369f56d 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp @@ -47,7 +47,8 @@ std::shared_ptr DatabaseNodeImp::fetchNodeObject( uint256 const& hash, std::uint32_t, - FetchReport& fetchReport) + FetchReport& fetchReport, + bool duplicate) { std::shared_ptr nodeObject{ cache_ ? cache_->fetch(hash) : nullptr}; @@ -70,13 +71,8 @@ DatabaseNodeImp::fetchNodeObject( switch (status) { case ok: - ++fetchHitCount_; - if (nodeObject) - { - fetchSz_ += nodeObject->getData().size(); - if (cache_) - cache_->canonicalize_replace_client(hash, nodeObject); - } + if (nodeObject && cache_) + cache_->canonicalize_replace_client(hash, nodeObject); break; case notFound: break; diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index ae0a99fdb..3eaecd897 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -140,7 +140,8 @@ private: fetchNodeObject( uint256 const& hash, std::uint32_t, - FetchReport& fetchReport) override; + FetchReport& fetchReport, + bool duplicate) override; void for_each(std::function)> f) override diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index 56ec0a35a..267b4ee58 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -125,7 +125,8 @@ std::shared_ptr DatabaseRotatingImp::fetchNodeObject( uint256 const& hash, std::uint32_t, - FetchReport& fetchReport) + FetchReport& fetchReport, + bool duplicate) { auto fetch = [&](std::shared_ptr const& backend) { Status status; @@ -143,10 +144,6 @@ DatabaseRotatingImp::fetchNodeObject( switch (status) { case ok: - ++fetchHitCount_; - if (nodeObject) - fetchSz_ += nodeObject->getData().size(); - break; case notFound: break; case dataCorrupt: @@ -183,7 +180,8 @@ DatabaseRotatingImp::fetchNodeObject( } // Update writable backend with data from the archive backend - writable->store(nodeObject); + if (duplicate) + writable->store(nodeObject); } } diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.h b/src/ripple/nodestore/impl/DatabaseRotatingImp.h index dc095adcc..b2807eeab 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.h +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.h @@ -88,7 +88,8 @@ private: fetchNodeObject( uint256 const& hash, std::uint32_t, - FetchReport& fetchReport) override; + FetchReport& fetchReport, + bool duplicate) override; void for_each(std::function)> f) override; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 1dc5dab6c..32efaecdb 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -1384,7 +1384,8 @@ std::shared_ptr DatabaseShardImp::fetchNodeObject( uint256 const& hash, std::uint32_t ledgerSeq, - FetchReport& fetchReport) + FetchReport& fetchReport, + bool duplicate) { auto const shardIndex{seqToShardIndex(ledgerSeq)}; std::shared_ptr shard; diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 327ab4865..5cf1f3701 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -303,7 +303,8 @@ private: fetchNodeObject( uint256 const& hash, std::uint32_t ledgerSeq, - FetchReport& fetchReport) override; + FetchReport& fetchReport, + bool duplicate) override; void for_each(std::function)> f) override diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 7283f16c9..a227af429 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -287,14 +287,15 @@ JSS(index); // in: LedgerEntry, DownloadShard // out: STLedgerEntry, // LedgerEntry, TxHistory, LedgerData JSS(info); // out: ServerInfo, ConsensusInfo, FetchInfo -JSS(internal_command); // in: Internal -JSS(invalid_API_version); // out: Many, when a request has an invalid - // version -JSS(io_latency_ms); // out: NetworkOPs -JSS(ip); // in: Connect, out: OverlayImpl -JSS(issuer); // in: RipplePathFind, Subscribe, - // Unsubscribe, BookOffers - // out: STPathSet, STAmount +JSS(initial_sync_duration_us); +JSS(internal_command); // in: Internal +JSS(invalid_API_version); // out: Many, when a request has an invalid + // version +JSS(io_latency_ms); // out: NetworkOPs +JSS(ip); // in: Connect, out: OverlayImpl +JSS(issuer); // in: RipplePathFind, Subscribe, + // Unsubscribe, BookOffers + // out: STPathSet, STAmount JSS(job); JSS(job_queue); JSS(jobs); diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index c4dde4ac9..f5a108523 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -328,6 +328,10 @@ public: void walkMap(std::vector& missingNodes, int maxMissing) const; + void + walkMapParallel( + std::vector& missingNodes, + int maxMissing) const; bool deepCompare(SHAMap& other) const; // Intended for debug/test only diff --git a/src/ripple/shamap/impl/SHAMapDelta.cpp b/src/ripple/shamap/impl/SHAMapDelta.cpp index b8db81c60..13f28ddf9 100644 --- a/src/ripple/shamap/impl/SHAMapDelta.cpp +++ b/src/ripple/shamap/impl/SHAMapDelta.cpp @@ -20,6 +20,10 @@ #include #include +#include +#include +#include + namespace ripple { // This code is used to compare another node's transaction tree @@ -286,4 +290,81 @@ SHAMap::walkMap(std::vector& missingNodes, int maxMissing) } } +void +SHAMap::walkMapParallel( + std::vector& missingNodes, + int maxMissing) const +{ + if (!root_->isInner()) // root_ is only node, and we have it + return; + + using StackEntry = std::shared_ptr; + std::array, 16> topChildren; + { + auto const& innerRoot = + std::static_pointer_cast(root_); + for (int i = 0; i < 16; ++i) + { + if (!innerRoot->isEmptyBranch(i)) + topChildren[i] = descendNoStore(innerRoot, i); + } + } + std::vector workers; + workers.reserve(16); + + std::array>, 16> nodeStacks; + + for (int rootChildIndex = 0; rootChildIndex < 16; ++rootChildIndex) + { + auto const& child = topChildren[rootChildIndex]; + if (!child || !child->isInner()) + continue; + + nodeStacks[rootChildIndex].push( + std::static_pointer_cast(child)); + + JLOG(journal_.debug()) << "starting worker " << rootChildIndex; + std::mutex m; + workers.push_back(std::thread( + [&m, &missingNodes, &maxMissing, this]( + std::stack> nodeStack) { + while (!nodeStack.empty()) + { + std::shared_ptr node = + std::move(nodeStack.top()); + assert(node); + nodeStack.pop(); + + for (int i = 0; i < 16; ++i) + { + if (node->isEmptyBranch(i)) + continue; + std::shared_ptr nextNode = + descendNoStore(node, i); + + if (nextNode) + { + if (nextNode->isInner()) + nodeStack.push( + std::static_pointer_cast( + nextNode)); + } + else + { + std::lock_guard l{m}; + missingNodes.emplace_back( + type_, node->getChildHash(i)); + if (--maxMissing <= 0) + return; + } + } + } + }, + std::move(nodeStacks[rootChildIndex]))); + } + + for (std::thread& worker : workers) + worker.join(); +} + } // namespace ripple diff --git a/src/test/basics/PerfLog_test.cpp b/src/test/basics/PerfLog_test.cpp index 0f9005d62..a79dded90 100644 --- a/src/test/basics/PerfLog_test.cpp +++ b/src/test/basics/PerfLog_test.cpp @@ -49,10 +49,11 @@ class PerfLog_test : public beast::unit_test::suite struct Fixture { + Application& app_; beast::Journal j_; bool stopSignaled{false}; - explicit Fixture(beast::Journal j) : j_(j) + explicit Fixture(Application& app, beast::Journal j) : app_(app), j_(j) { } @@ -103,7 +104,7 @@ class PerfLog_test : public beast::unit_test::suite perf::PerfLog::Setup const setup{ withFile == WithFile::no ? "" : logFile(), logInterval()}; return perf::make_PerfLog( - setup, j_, [this]() { return signalStop(); }); + setup, app_, j_, [this]() { return signalStop(); }); } // Block until the log file has grown in size, indicating that the @@ -192,7 +193,7 @@ public: { // Verify a PerfLog creates its file when constructed. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; BEAST_EXPECT(!exists(fixture.logFile())); auto perfLog{fixture.perfLog(WithFile::yes)}; @@ -204,7 +205,7 @@ public: // Create a file where PerfLog wants to put its directory. // Make sure that PerfLog tries to shutdown the server since it // can't open its file. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; if (!BEAST_EXPECT(!exists(fixture.logDir()))) return; @@ -238,7 +239,7 @@ public: // Put a write protected file where PerfLog wants to write its // file. Make sure that PerfLog tries to shutdown the server // since it can't open its file. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; if (!BEAST_EXPECT(!exists(fixture.logDir()))) return; @@ -297,7 +298,7 @@ public: { // Exercise the rpc interfaces of PerfLog. // Start up the PerfLog that we'll use for testing. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; auto perfLog{fixture.perfLog(withFile)}; perfLog->start(); @@ -502,7 +503,7 @@ public: // Exercise the jobs interfaces of PerfLog. // Start up the PerfLog that we'll use for testing. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; auto perfLog{fixture.perfLog(withFile)}; perfLog->start(); @@ -849,7 +850,7 @@ public: // the PerLog behaves as well as possible if an invalid ID is passed. // Start up the PerfLog that we'll use for testing. - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; auto perfLog{fixture.perfLog(withFile)}; perfLog->start(); @@ -989,7 +990,7 @@ public: // the interface and see that it doesn't crash. using namespace boost::filesystem; - Fixture fixture{j_}; + Fixture fixture{env_.app(), j_}; BEAST_EXPECT(!exists(fixture.logDir())); auto perfLog{fixture.perfLog(withFile)};