From 0a1fb4e6cad12c6939c96497f835537ff60cc219 Mon Sep 17 00:00:00 2001 From: John Freeman Date: Mon, 14 Dec 2020 19:49:31 -0600 Subject: [PATCH] Reduce nesting and remove dead code --- src/ripple/basics/impl/PerfLogImp.cpp | 213 +++++++++---------- src/ripple/basics/impl/PerfLogImp.h | 88 ++++---- src/ripple/core/JobTypes.h | 6 + src/ripple/nodestore/DummyScheduler.h | 2 - src/ripple/nodestore/impl/DummyScheduler.cpp | 5 - src/ripple/nodestore/impl/ManagerImp.cpp | 13 -- src/test/basics/PerfLog_test.cpp | 2 +- src/test/nodestore/Timing_test.cpp | 21 +- 8 files changed, 163 insertions(+), 187 deletions(-) diff --git a/src/ripple/basics/impl/PerfLogImp.cpp b/src/ripple/basics/impl/PerfLogImp.cpp index f3bb986d25..f553b38138 100644 --- a/src/ripple/basics/impl/PerfLogImp.cpp +++ b/src/ripple/basics/impl/PerfLogImp.cpp @@ -21,6 +21,7 @@ #include #include #include +#include #include #include #include @@ -29,6 +30,7 @@ #include #include #include +#include #include #include #include @@ -58,10 +60,9 @@ PerfLogImp::Counters::Counters( { // populateJq jq_.reserve(jobTypes.size()); - for (auto const& [jobType, jobTypeInfo] : jobTypes) + for (auto const& [jobType, _] : jobTypes) { - auto const inserted = - jq_.emplace(jobType, Jq(jobTypeInfo.name())).second; + auto const inserted = jq_.emplace(jobType, Jq()).second; if (!inserted) { // Ensure that no other function populates this entry. @@ -79,89 +80,82 @@ PerfLogImp::Counters::countersJson() const Rpc totalRpc; for (auto const& proc : rpc_) { - Json::Value p(Json::objectValue); + Rpc value; { - auto const sync = - [&proc]() -> boost::optional { - std::lock_guard lock(proc.second.mut); - if (!proc.second.sync.started && !proc.second.sync.finished && - !proc.second.sync.errored) - { - return boost::none; - } - return proc.second.sync; - }(); - if (!sync) + std::lock_guard lock(proc.second.mutex); + if (!proc.second.value.started && !proc.second.value.finished && + !proc.second.value.errored) + { continue; - - p[jss::started] = std::to_string(sync->started); - totalRpc.sync.started += sync->started; - p[jss::finished] = std::to_string(sync->finished); - totalRpc.sync.finished += sync->finished; - p[jss::errored] = std::to_string(sync->errored); - totalRpc.sync.errored += sync->errored; - p[jss::duration_us] = std::to_string(sync->duration.count()); - totalRpc.sync.duration += sync->duration; + } + value = proc.second.value; } + + Json::Value p(Json::objectValue); + p[jss::started] = std::to_string(value.started); + totalRpc.started += value.started; + p[jss::finished] = std::to_string(value.finished); + totalRpc.finished += value.finished; + p[jss::errored] = std::to_string(value.errored); + totalRpc.errored += value.errored; + p[jss::duration_us] = std::to_string(value.duration.count()); + totalRpc.duration += value.duration; rpcobj[proc.first] = p; } - if (totalRpc.sync.started) + if (totalRpc.started) { Json::Value totalRpcJson(Json::objectValue); - totalRpcJson[jss::started] = std::to_string(totalRpc.sync.started); - totalRpcJson[jss::finished] = std::to_string(totalRpc.sync.finished); - totalRpcJson[jss::errored] = std::to_string(totalRpc.sync.errored); + totalRpcJson[jss::started] = std::to_string(totalRpc.started); + totalRpcJson[jss::finished] = std::to_string(totalRpc.finished); + totalRpcJson[jss::errored] = std::to_string(totalRpc.errored); totalRpcJson[jss::duration_us] = - std::to_string(totalRpc.sync.duration.count()); + std::to_string(totalRpc.duration.count()); rpcobj[jss::total] = totalRpcJson; } Json::Value jqobj(Json::objectValue); // totalJq represents all jobs. All enqueued, started, finished, etc. - Jq totalJq("total"); + Jq totalJq; for (auto const& proc : jq_) { - Json::Value j(Json::objectValue); + Jq value; { - auto const sync = [&proc]() -> boost::optional { - std::lock_guard lock(proc.second.mut); - if (!proc.second.sync.queued && !proc.second.sync.started && - !proc.second.sync.finished) - { - return boost::none; - } - return proc.second.sync; - }(); - if (!sync) + std::lock_guard lock(proc.second.mutex); + if (!proc.second.value.queued && !proc.second.value.started && + !proc.second.value.finished) + { continue; - - j[jss::queued] = std::to_string(sync->queued); - totalJq.sync.queued += sync->queued; - j[jss::started] = std::to_string(sync->started); - totalJq.sync.started += sync->started; - j[jss::finished] = std::to_string(sync->finished); - totalJq.sync.finished += sync->finished; - j[jss::queued_duration_us] = - std::to_string(sync->queuedDuration.count()); - totalJq.sync.queuedDuration += sync->queuedDuration; - j[jss::running_duration_us] = - std::to_string(sync->runningDuration.count()); - totalJq.sync.runningDuration += sync->runningDuration; + } + value = proc.second.value; } - jqobj[proc.second.label] = j; + + Json::Value j(Json::objectValue); + j[jss::queued] = std::to_string(value.queued); + totalJq.queued += value.queued; + j[jss::started] = std::to_string(value.started); + totalJq.started += value.started; + j[jss::finished] = std::to_string(value.finished); + totalJq.finished += value.finished; + j[jss::queued_duration_us] = + std::to_string(value.queuedDuration.count()); + totalJq.queuedDuration += value.queuedDuration; + j[jss::running_duration_us] = + std::to_string(value.runningDuration.count()); + totalJq.runningDuration += value.runningDuration; + jqobj[JobTypes::name(proc.first)] = j; } - if (totalJq.sync.queued) + if (totalJq.queued) { Json::Value totalJqJson(Json::objectValue); - totalJqJson[jss::queued] = std::to_string(totalJq.sync.queued); - totalJqJson[jss::started] = std::to_string(totalJq.sync.started); - totalJqJson[jss::finished] = std::to_string(totalJq.sync.finished); + totalJqJson[jss::queued] = std::to_string(totalJq.queued); + totalJqJson[jss::started] = std::to_string(totalJq.started); + totalJqJson[jss::finished] = std::to_string(totalJq.finished); totalJqJson[jss::queued_duration_us] = - std::to_string(totalJq.sync.queuedDuration.count()); + std::to_string(totalJq.queuedDuration.count()); totalJqJson[jss::running_duration_us] = - std::to_string(totalJq.sync.runningDuration.count()); + std::to_string(totalJq.runningDuration.count()); jqobj[jss::total] = totalJqJson; } @@ -189,14 +183,7 @@ PerfLogImp::Counters::currentJson() const if (j.first == jtINVALID) continue; Json::Value jobj(Json::objectValue); - auto const e = jq_.find(j.first); - if (e == jq_.end()) - { - assert(false); - continue; - } - // label is const and created before multi-threading so needs no lock. - jobj[jss::job] = e->second.label; + jobj[jss::job] = JobTypes::name(j.first); jobj[jss::duration_us] = std::to_string( std::chrono::duration_cast(present - j.second) .count()); @@ -232,35 +219,35 @@ PerfLogImp::Counters::currentJson() const void PerfLogImp::openLog() { - if (!setup_.perfLog.empty()) + if (setup_.perfLog.empty()) + return; + + if (logFile_.is_open()) + logFile_.close(); + + auto logDir = setup_.perfLog.parent_path(); + if (!boost::filesystem::is_directory(logDir)) { - if (logFile_.is_open()) - logFile_.close(); - - auto logDir = setup_.perfLog.parent_path(); - if (!boost::filesystem::is_directory(logDir)) + boost::system::error_code ec; + boost::filesystem::create_directories(logDir, ec); + if (ec) { - boost::system::error_code ec; - boost::filesystem::create_directories(logDir, ec); - if (ec) - { - JLOG(j_.fatal()) << "Unable to create performance log " - "directory " - << logDir << ": " << ec.message(); - signalStop_(); - return; - } - } - - logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app); - - if (!logFile_) - { - JLOG(j_.fatal()) - << "Unable to open performance log " << setup_.perfLog << "."; + JLOG(j_.fatal()) << "Unable to create performance log " + "directory " + << logDir << ": " << ec.message(); signalStop_(); + return; } } + + logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app); + + if (!logFile_) + { + JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog + << "."; + signalStop_(); + } } void @@ -273,7 +260,8 @@ PerfLogImp::run() { { std::unique_lock lock(mutex_); - if (stop_) + if (cond_.wait_until( + lock, lastLog_ + setup_.logInterval, [&] { return stop_; })) { return; } @@ -282,7 +270,6 @@ PerfLogImp::run() openLog(); rotate_ = false; } - cond_.wait_until(lock, lastLog_ + setup_.logInterval); } report(); } @@ -302,10 +289,13 @@ PerfLogImp::report() Json::Value report(Json::objectValue); report[jss::time] = to_string(date::floor(present)); - report[jss::workers] = counters_.workers_; + { + std::lock_guard lock{counters_.jobsMutex_}; + report[jss::workers] = + static_cast(counters_.jobs_.size()); + } report[jss::hostid] = hostname_; report[jss::counters] = counters_.countersJson(); - auto cur = counters_.currentJson(); report[jss::current_activities] = counters_.currentJson(); logFile_ << Json::Compact{std::move(report)} << std::endl; @@ -340,8 +330,8 @@ PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId) } { - std::lock_guard lock(counter->second.mut); - ++counter->second.sync.started; + std::lock_guard lock(counter->second.mutex); + ++counter->second.value.started; } std::lock_guard lock(counters_.methodsMutex_); counters_.methods_[requestId] = { @@ -374,12 +364,12 @@ PerfLogImp::rpcEnd( assert(false); } } - std::lock_guard lock(counter->second.mut); + std::lock_guard lock(counter->second.mutex); if (finish) - ++counter->second.sync.finished; + ++counter->second.value.finished; else - ++counter->second.sync.errored; - counter->second.sync.duration += std::chrono::duration_cast( + ++counter->second.value.errored; + counter->second.value.duration += std::chrono::duration_cast( steady_clock::now() - startTime); } @@ -392,8 +382,8 @@ PerfLogImp::jobQueue(JobType const type) assert(false); return; } - std::lock_guard lock(counter->second.mut); - ++counter->second.sync.queued; + std::lock_guard lock(counter->second.mutex); + ++counter->second.value.queued; } void @@ -410,9 +400,9 @@ PerfLogImp::jobStart( return; } { - std::lock_guard lock(counter->second.mut); - ++counter->second.sync.started; - counter->second.sync.queuedDuration += dur; + std::lock_guard lock(counter->second.mutex); + ++counter->second.value.started; + counter->second.value.queuedDuration += dur; } std::lock_guard lock(counters_.jobsMutex_); if (instance >= 0 && instance < counters_.jobs_.size()) @@ -429,9 +419,9 @@ PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance) return; } { - std::lock_guard lock(counter->second.mut); - ++counter->second.sync.finished; - counter->second.sync.runningDuration += dur; + std::lock_guard lock(counter->second.mutex); + ++counter->second.value.finished; + counter->second.value.runningDuration += dur; } std::lock_guard lock(counters_.jobsMutex_); if (instance >= 0 && instance < counters_.jobs_.size()) @@ -442,7 +432,6 @@ void PerfLogImp::resizeJobs(int const resize) { std::lock_guard lock(counters_.jobsMutex_); - counters_.workers_ = resize; if (resize > counters_.jobs_.size()) counters_.jobs_.resize(resize, {jtINVALID, steady_time_point()}); } diff --git a/src/ripple/basics/impl/PerfLogImp.h b/src/ripple/basics/impl/PerfLogImp.h index 8fa430ec16..0edf6b5989 100644 --- a/src/ripple/basics/impl/PerfLogImp.h +++ b/src/ripple/basics/impl/PerfLogImp.h @@ -42,6 +42,28 @@ namespace ripple { namespace perf { +/** A box coupling data with a mutex for locking access to it. */ +template +struct Locked +{ + T value; + mutable std::mutex mutex; + + Locked() = default; + Locked(T const& value) : value(value) + { + } + Locked(T&& value) : value(std::move(value)) + { + } + Locked(Locked const& rhs) : value(rhs.value) + { + } + Locked(Locked&& rhs) : value(std::move(rhs.value)) + { + } +}; + /** * Implementation class for PerfLog. */ @@ -59,27 +81,13 @@ class PerfLogImp : public PerfLog, Stoppable */ struct Rpc { - // Keep all items that need to be synchronized in one place - // to minimize copy overhead while locked. - struct Sync - { - // Counters for each time a method starts and then either - // finishes successfully or with an exception. - std::uint64_t started{0}; - std::uint64_t finished{0}; - std::uint64_t errored{0}; - // Cumulative duration of all finished and errored method calls. - microseconds duration{0}; - }; - - Sync sync; - mutable std::mutex mut; - - Rpc() = default; - - Rpc(Rpc const& orig) : sync(orig.sync) - { - } + // Counters for each time a method starts and then either + // finishes successfully or with an exception. + std::uint64_t started{0}; + std::uint64_t finished{0}; + std::uint64_t errored{0}; + // Cumulative duration of all finished and errored method calls. + microseconds duration{0}; }; /** @@ -87,39 +95,21 @@ class PerfLogImp : public PerfLog, Stoppable */ struct Jq { - // Keep all items that need to be synchronized in one place - // to minimize copy overhead while locked. - struct Sync - { - // Counters for each time a job is enqueued, begins to run, - // finishes. - std::uint64_t queued{0}; - std::uint64_t started{0}; - std::uint64_t finished{0}; - // Cumulative duration of all jobs' queued and running times. - microseconds queuedDuration{0}; - microseconds runningDuration{0}; - }; - - Sync sync; - std::string const label; - mutable std::mutex mut; - - Jq(std::string const& labelArg) : label(labelArg) - { - } - - Jq(Jq const& orig) : sync(orig.sync), label(orig.label) - { - } + // Counters for each time a job is enqueued, begins to run, + // finishes. + std::uint64_t queued{0}; + std::uint64_t started{0}; + std::uint64_t finished{0}; + // Cumulative duration of all jobs' queued and running times. + microseconds queuedDuration{0}; + microseconds runningDuration{0}; }; // rpc_ and jq_ do not need mutex protection because all // keys and values are created before more threads are started. - std::unordered_map rpc_; - std::unordered_map, Jq> jq_; + std::unordered_map> rpc_; + std::unordered_map> jq_; std::vector> jobs_; - int workers_{0}; mutable std::mutex jobsMutex_; std::unordered_map methods_; mutable std::mutex methodsMutex_; diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 684c7883cd..3cd7794f6f 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -106,6 +106,12 @@ public: return types; } + static std::string const& + name(JobType jt) + { + return instance().get(jt).name(); + } + JobTypeInfo const& get(JobType jt) const { diff --git a/src/ripple/nodestore/DummyScheduler.h b/src/ripple/nodestore/DummyScheduler.h index d5c13b8084..cfe7c3fd5d 100644 --- a/src/ripple/nodestore/DummyScheduler.h +++ b/src/ripple/nodestore/DummyScheduler.h @@ -34,8 +34,6 @@ public: void scheduleTask(Task& task) override; void - scheduledTasksStopped(); - void onFetch(FetchReport const& report) override; void onBatchWrite(BatchWriteReport const& report) override; diff --git a/src/ripple/nodestore/impl/DummyScheduler.cpp b/src/ripple/nodestore/impl/DummyScheduler.cpp index b15ec25eb6..ee44e02fb8 100644 --- a/src/ripple/nodestore/impl/DummyScheduler.cpp +++ b/src/ripple/nodestore/impl/DummyScheduler.cpp @@ -29,11 +29,6 @@ DummyScheduler::scheduleTask(Task& task) task.performScheduledTask(); } -void -DummyScheduler::scheduledTasksStopped() -{ -} - void DummyScheduler::onFetch(const FetchReport& report) { diff --git a/src/ripple/nodestore/impl/ManagerImp.cpp b/src/ripple/nodestore/impl/ManagerImp.cpp index f27812f2ac..4a823ea825 100644 --- a/src/ripple/nodestore/impl/ManagerImp.cpp +++ b/src/ripple/nodestore/impl/ManagerImp.cpp @@ -130,18 +130,5 @@ Manager::instance() return ManagerImp::instance(); } -//------------------------------------------------------------------------------ - -std::unique_ptr -make_Backend( - Section const& config, - std::size_t burstSize, - Scheduler& scheduler, - beast::Journal journal) -{ - return Manager::instance().make_Backend( - config, burstSize, scheduler, journal); -} - } // namespace NodeStore } // namespace ripple diff --git a/src/test/basics/PerfLog_test.cpp b/src/test/basics/PerfLog_test.cpp index 452057ef15..e259fc7b7c 100644 --- a/src/test/basics/PerfLog_test.cpp +++ b/src/test/basics/PerfLog_test.cpp @@ -415,7 +415,7 @@ public: } { // Examine current PerfLog::counterJson() values. - Json::Value const countersJson{perfLog->countersJson()["rpc"]}; + Json::Value const countersJson{perfLog->countersJson()[jss::rpc]}; BEAST_EXPECT(countersJson.size() == labels.size() + 1); for (auto& label : labels) { diff --git a/src/test/nodestore/Timing_test.cpp b/src/test/nodestore/Timing_test.cpp index c3dfd79b6f..8694631a0c 100644 --- a/src/test/nodestore/Timing_test.cpp +++ b/src/test/nodestore/Timing_test.cpp @@ -18,6 +18,7 @@ //============================================================================== #include +#include #include #include #include @@ -48,6 +49,16 @@ namespace ripple { namespace NodeStore { +std::unique_ptr +make_Backend( + Section const& config, + Scheduler& scheduler, + beast::Journal journal) +{ + return Manager::instance().make_Backend( + config, megabytes(4), scheduler, journal); +} + // Fill memory with random bits template static void @@ -261,7 +272,7 @@ public: beast::Journal journal) { DummyScheduler scheduler; - auto backend = make_Backend(config, megabytes(4), scheduler, journal); + auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); @@ -318,7 +329,7 @@ public: beast::Journal journal) { DummyScheduler scheduler; - auto backend = make_Backend(config, megabytes(4), scheduler, journal); + auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); @@ -389,7 +400,7 @@ public: beast::Journal journal) { DummyScheduler scheduler; - auto backend = make_Backend(config, megabytes(4), scheduler, journal); + auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); @@ -462,7 +473,7 @@ public: beast::Journal journal) { DummyScheduler scheduler; - auto backend = make_Backend(config, megabytes(4), scheduler, journal); + auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); @@ -551,7 +562,7 @@ public: do_work(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; - auto backend = make_Backend(config, megabytes(4), scheduler, journal); + auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->setDeletePath(); backend->open();