Reduce nesting and remove dead code

This commit is contained in:
John Freeman
2020-12-14 19:49:31 -06:00
committed by manojsdoshi
parent 7e97bfce10
commit 0a1fb4e6ca
8 changed files with 163 additions and 187 deletions

View File

@@ -21,6 +21,7 @@
#include <ripple/basics/impl/PerfLogImp.h>
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/core/JobTypes.h>
#include <ripple/json/json_writer.h>
#include <ripple/json/to_string.h>
#include <boost/optional.hpp>
@@ -29,6 +30,7 @@
#include <cstdlib>
#include <iostream>
#include <iterator>
#include <mutex>
#include <sstream>
#include <stdexcept>
#include <string>
@@ -58,10 +60,9 @@ PerfLogImp::Counters::Counters(
{
// populateJq
jq_.reserve(jobTypes.size());
for (auto const& [jobType, jobTypeInfo] : jobTypes)
for (auto const& [jobType, _] : jobTypes)
{
auto const inserted =
jq_.emplace(jobType, Jq(jobTypeInfo.name())).second;
auto const inserted = jq_.emplace(jobType, Jq()).second;
if (!inserted)
{
// Ensure that no other function populates this entry.
@@ -79,89 +80,82 @@ PerfLogImp::Counters::countersJson() const
Rpc totalRpc;
for (auto const& proc : rpc_)
{
Json::Value p(Json::objectValue);
Rpc value;
{
auto const sync =
[&proc]() -> boost::optional<Counters::Rpc::Sync> {
std::lock_guard lock(proc.second.mut);
if (!proc.second.sync.started && !proc.second.sync.finished &&
!proc.second.sync.errored)
{
return boost::none;
}
return proc.second.sync;
}();
if (!sync)
std::lock_guard lock(proc.second.mutex);
if (!proc.second.value.started && !proc.second.value.finished &&
!proc.second.value.errored)
{
continue;
p[jss::started] = std::to_string(sync->started);
totalRpc.sync.started += sync->started;
p[jss::finished] = std::to_string(sync->finished);
totalRpc.sync.finished += sync->finished;
p[jss::errored] = std::to_string(sync->errored);
totalRpc.sync.errored += sync->errored;
p[jss::duration_us] = std::to_string(sync->duration.count());
totalRpc.sync.duration += sync->duration;
}
value = proc.second.value;
}
Json::Value p(Json::objectValue);
p[jss::started] = std::to_string(value.started);
totalRpc.started += value.started;
p[jss::finished] = std::to_string(value.finished);
totalRpc.finished += value.finished;
p[jss::errored] = std::to_string(value.errored);
totalRpc.errored += value.errored;
p[jss::duration_us] = std::to_string(value.duration.count());
totalRpc.duration += value.duration;
rpcobj[proc.first] = p;
}
if (totalRpc.sync.started)
if (totalRpc.started)
{
Json::Value totalRpcJson(Json::objectValue);
totalRpcJson[jss::started] = std::to_string(totalRpc.sync.started);
totalRpcJson[jss::finished] = std::to_string(totalRpc.sync.finished);
totalRpcJson[jss::errored] = std::to_string(totalRpc.sync.errored);
totalRpcJson[jss::started] = std::to_string(totalRpc.started);
totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
totalRpcJson[jss::duration_us] =
std::to_string(totalRpc.sync.duration.count());
std::to_string(totalRpc.duration.count());
rpcobj[jss::total] = totalRpcJson;
}
Json::Value jqobj(Json::objectValue);
// totalJq represents all jobs. All enqueued, started, finished, etc.
Jq totalJq("total");
Jq totalJq;
for (auto const& proc : jq_)
{
Json::Value j(Json::objectValue);
Jq value;
{
auto const sync = [&proc]() -> boost::optional<Counters::Jq::Sync> {
std::lock_guard lock(proc.second.mut);
if (!proc.second.sync.queued && !proc.second.sync.started &&
!proc.second.sync.finished)
{
return boost::none;
}
return proc.second.sync;
}();
if (!sync)
std::lock_guard lock(proc.second.mutex);
if (!proc.second.value.queued && !proc.second.value.started &&
!proc.second.value.finished)
{
continue;
j[jss::queued] = std::to_string(sync->queued);
totalJq.sync.queued += sync->queued;
j[jss::started] = std::to_string(sync->started);
totalJq.sync.started += sync->started;
j[jss::finished] = std::to_string(sync->finished);
totalJq.sync.finished += sync->finished;
j[jss::queued_duration_us] =
std::to_string(sync->queuedDuration.count());
totalJq.sync.queuedDuration += sync->queuedDuration;
j[jss::running_duration_us] =
std::to_string(sync->runningDuration.count());
totalJq.sync.runningDuration += sync->runningDuration;
}
value = proc.second.value;
}
jqobj[proc.second.label] = j;
Json::Value j(Json::objectValue);
j[jss::queued] = std::to_string(value.queued);
totalJq.queued += value.queued;
j[jss::started] = std::to_string(value.started);
totalJq.started += value.started;
j[jss::finished] = std::to_string(value.finished);
totalJq.finished += value.finished;
j[jss::queued_duration_us] =
std::to_string(value.queuedDuration.count());
totalJq.queuedDuration += value.queuedDuration;
j[jss::running_duration_us] =
std::to_string(value.runningDuration.count());
totalJq.runningDuration += value.runningDuration;
jqobj[JobTypes::name(proc.first)] = j;
}
if (totalJq.sync.queued)
if (totalJq.queued)
{
Json::Value totalJqJson(Json::objectValue);
totalJqJson[jss::queued] = std::to_string(totalJq.sync.queued);
totalJqJson[jss::started] = std::to_string(totalJq.sync.started);
totalJqJson[jss::finished] = std::to_string(totalJq.sync.finished);
totalJqJson[jss::queued] = std::to_string(totalJq.queued);
totalJqJson[jss::started] = std::to_string(totalJq.started);
totalJqJson[jss::finished] = std::to_string(totalJq.finished);
totalJqJson[jss::queued_duration_us] =
std::to_string(totalJq.sync.queuedDuration.count());
std::to_string(totalJq.queuedDuration.count());
totalJqJson[jss::running_duration_us] =
std::to_string(totalJq.sync.runningDuration.count());
std::to_string(totalJq.runningDuration.count());
jqobj[jss::total] = totalJqJson;
}
@@ -189,14 +183,7 @@ PerfLogImp::Counters::currentJson() const
if (j.first == jtINVALID)
continue;
Json::Value jobj(Json::objectValue);
auto const e = jq_.find(j.first);
if (e == jq_.end())
{
assert(false);
continue;
}
// label is const and created before multi-threading so needs no lock.
jobj[jss::job] = e->second.label;
jobj[jss::job] = JobTypes::name(j.first);
jobj[jss::duration_us] = std::to_string(
std::chrono::duration_cast<microseconds>(present - j.second)
.count());
@@ -232,35 +219,35 @@ PerfLogImp::Counters::currentJson() const
void
PerfLogImp::openLog()
{
if (!setup_.perfLog.empty())
if (setup_.perfLog.empty())
return;
if (logFile_.is_open())
logFile_.close();
auto logDir = setup_.perfLog.parent_path();
if (!boost::filesystem::is_directory(logDir))
{
if (logFile_.is_open())
logFile_.close();
auto logDir = setup_.perfLog.parent_path();
if (!boost::filesystem::is_directory(logDir))
boost::system::error_code ec;
boost::filesystem::create_directories(logDir, ec);
if (ec)
{
boost::system::error_code ec;
boost::filesystem::create_directories(logDir, ec);
if (ec)
{
JLOG(j_.fatal()) << "Unable to create performance log "
"directory "
<< logDir << ": " << ec.message();
signalStop_();
return;
}
}
logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
if (!logFile_)
{
JLOG(j_.fatal())
<< "Unable to open performance log " << setup_.perfLog << ".";
JLOG(j_.fatal()) << "Unable to create performance log "
"directory "
<< logDir << ": " << ec.message();
signalStop_();
return;
}
}
logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
if (!logFile_)
{
JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog
<< ".";
signalStop_();
}
}
void
@@ -273,7 +260,8 @@ PerfLogImp::run()
{
{
std::unique_lock<std::mutex> lock(mutex_);
if (stop_)
if (cond_.wait_until(
lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
{
return;
}
@@ -282,7 +270,6 @@ PerfLogImp::run()
openLog();
rotate_ = false;
}
cond_.wait_until(lock, lastLog_ + setup_.logInterval);
}
report();
}
@@ -302,10 +289,13 @@ PerfLogImp::report()
Json::Value report(Json::objectValue);
report[jss::time] = to_string(date::floor<microseconds>(present));
report[jss::workers] = counters_.workers_;
{
std::lock_guard lock{counters_.jobsMutex_};
report[jss::workers] =
static_cast<unsigned int>(counters_.jobs_.size());
}
report[jss::hostid] = hostname_;
report[jss::counters] = counters_.countersJson();
auto cur = counters_.currentJson();
report[jss::current_activities] = counters_.currentJson();
logFile_ << Json::Compact{std::move(report)} << std::endl;
@@ -340,8 +330,8 @@ PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
}
{
std::lock_guard lock(counter->second.mut);
++counter->second.sync.started;
std::lock_guard lock(counter->second.mutex);
++counter->second.value.started;
}
std::lock_guard lock(counters_.methodsMutex_);
counters_.methods_[requestId] = {
@@ -374,12 +364,12 @@ PerfLogImp::rpcEnd(
assert(false);
}
}
std::lock_guard lock(counter->second.mut);
std::lock_guard lock(counter->second.mutex);
if (finish)
++counter->second.sync.finished;
++counter->second.value.finished;
else
++counter->second.sync.errored;
counter->second.sync.duration += std::chrono::duration_cast<microseconds>(
++counter->second.value.errored;
counter->second.value.duration += std::chrono::duration_cast<microseconds>(
steady_clock::now() - startTime);
}
@@ -392,8 +382,8 @@ PerfLogImp::jobQueue(JobType const type)
assert(false);
return;
}
std::lock_guard lock(counter->second.mut);
++counter->second.sync.queued;
std::lock_guard lock(counter->second.mutex);
++counter->second.value.queued;
}
void
@@ -410,9 +400,9 @@ PerfLogImp::jobStart(
return;
}
{
std::lock_guard lock(counter->second.mut);
++counter->second.sync.started;
counter->second.sync.queuedDuration += dur;
std::lock_guard lock(counter->second.mutex);
++counter->second.value.started;
counter->second.value.queuedDuration += dur;
}
std::lock_guard lock(counters_.jobsMutex_);
if (instance >= 0 && instance < counters_.jobs_.size())
@@ -429,9 +419,9 @@ PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
return;
}
{
std::lock_guard lock(counter->second.mut);
++counter->second.sync.finished;
counter->second.sync.runningDuration += dur;
std::lock_guard lock(counter->second.mutex);
++counter->second.value.finished;
counter->second.value.runningDuration += dur;
}
std::lock_guard lock(counters_.jobsMutex_);
if (instance >= 0 && instance < counters_.jobs_.size())
@@ -442,7 +432,6 @@ void
PerfLogImp::resizeJobs(int const resize)
{
std::lock_guard lock(counters_.jobsMutex_);
counters_.workers_ = resize;
if (resize > counters_.jobs_.size())
counters_.jobs_.resize(resize, {jtINVALID, steady_time_point()});
}