rippled
Loading...
Searching...
No Matches
PerfLogImp.cpp
1#include <xrpld/perflog/detail/PerfLogImp.h>
2
3#include <xrpl/basics/BasicConfig.h>
4#include <xrpl/beast/core/CurrentThreadName.h>
5#include <xrpl/beast/utility/Journal.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/json/json_writer.h>
8
9#include <atomic>
10#include <cstdint>
11#include <cstdlib>
12#include <iterator>
13#include <mutex>
14#include <optional>
15#include <sstream>
16#include <stdexcept>
17#include <string>
18#include <unordered_map>
19#include <utility>
20
21namespace xrpl {
22namespace perf {
23
25{
26 {
27 // populateRpc
28 rpc_.reserve(labels.size());
29 for (std::string const label : labels)
30 {
31 auto const inserted = rpc_.emplace(label, Rpc()).second;
32 if (!inserted)
33 {
34 // Ensure that no other function populates this entry.
35 // LCOV_EXCL_START
36 UNREACHABLE(
37 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
38 "insert label");
39 // LCOV_EXCL_STOP
40 }
41 }
42 }
43 {
44 // populateJq
45 jq_.reserve(jobTypes.size());
46 for (auto const& [jobType, _] : jobTypes)
47 {
48 auto const inserted = jq_.emplace(jobType, Jq()).second;
49 if (!inserted)
50 {
51 // Ensure that no other function populates this entry.
52 // LCOV_EXCL_START
53 UNREACHABLE(
54 "xrpl::perf::PerfLogImp::Counters::Counters : failed to "
55 "insert job type");
56 // LCOV_EXCL_STOP
57 }
58 }
59 }
60}
61
64{
66 // totalRpc represents all rpc methods. All that started, finished, etc.
67 Rpc totalRpc;
68 for (auto const& proc : rpc_)
69 {
70 Rpc value;
71 {
72 std::lock_guard lock(proc.second.mutex);
73 if (!proc.second.value.started && !proc.second.value.finished && !proc.second.value.errored)
74 {
75 continue;
76 }
77 value = proc.second.value;
78 }
79
81 p[jss::started] = std::to_string(value.started);
82 totalRpc.started += value.started;
83 p[jss::finished] = std::to_string(value.finished);
84 totalRpc.finished += value.finished;
85 p[jss::errored] = std::to_string(value.errored);
86 totalRpc.errored += value.errored;
87 p[jss::duration_us] = std::to_string(value.duration.count());
88 totalRpc.duration += value.duration;
89 rpcobj[proc.first] = p;
90 }
91
92 if (totalRpc.started)
93 {
94 Json::Value totalRpcJson(Json::objectValue);
95 totalRpcJson[jss::started] = std::to_string(totalRpc.started);
96 totalRpcJson[jss::finished] = std::to_string(totalRpc.finished);
97 totalRpcJson[jss::errored] = std::to_string(totalRpc.errored);
98 totalRpcJson[jss::duration_us] = std::to_string(totalRpc.duration.count());
99 rpcobj[jss::total] = totalRpcJson;
100 }
101
102 Json::Value jobQueueObj(Json::objectValue);
103 // totalJq represents all jobs. All enqueued, started, finished, etc.
104 Jq totalJq;
105 for (auto const& proc : jq_)
106 {
107 Jq value;
108 {
109 std::lock_guard lock(proc.second.mutex);
110 if (!proc.second.value.queued && !proc.second.value.started && !proc.second.value.finished)
111 {
112 continue;
113 }
114 value = proc.second.value;
115 }
116
118 j[jss::queued] = std::to_string(value.queued);
119 totalJq.queued += value.queued;
120 j[jss::started] = std::to_string(value.started);
121 totalJq.started += value.started;
122 j[jss::finished] = std::to_string(value.finished);
123 totalJq.finished += value.finished;
124 j[jss::queued_duration_us] = std::to_string(value.queuedDuration.count());
125 totalJq.queuedDuration += value.queuedDuration;
126 j[jss::running_duration_us] = std::to_string(value.runningDuration.count());
127 totalJq.runningDuration += value.runningDuration;
128 jobQueueObj[JobTypes::name(proc.first)] = j;
129 }
130
131 if (totalJq.queued)
132 {
133 Json::Value totalJqJson(Json::objectValue);
134 totalJqJson[jss::queued] = std::to_string(totalJq.queued);
135 totalJqJson[jss::started] = std::to_string(totalJq.started);
136 totalJqJson[jss::finished] = std::to_string(totalJq.finished);
137 totalJqJson[jss::queued_duration_us] = std::to_string(totalJq.queuedDuration.count());
138 totalJqJson[jss::running_duration_us] = std::to_string(totalJq.runningDuration.count());
139 jobQueueObj[jss::total] = totalJqJson;
140 }
141
143 // Be kind to reporting tools and let them expect rpc and jq objects
144 // even if empty.
145 counters[jss::rpc] = rpcobj;
146 counters[jss::job_queue] = jobQueueObj;
147 return counters;
148}
149
152{
153 auto const present = steady_clock::now();
154
155 Json::Value jobsArray(Json::arrayValue);
156 auto const jobs = [this] {
157 std::lock_guard lock(jobsMutex_);
158 return jobs_;
159 }();
160
161 for (auto const& j : jobs)
162 {
163 if (j.first == jtINVALID)
164 continue;
166 jobj[jss::job] = JobTypes::name(j.first);
167 jobj[jss::duration_us] = std::to_string(std::chrono::duration_cast<microseconds>(present - j.second).count());
168 jobsArray.append(jobj);
169 }
170
171 Json::Value methodsArray(Json::arrayValue);
173 {
174 std::lock_guard lock(methodsMutex_);
175 methods.reserve(methods_.size());
176 for (auto const& m : methods_)
177 methods.push_back(m.second);
178 }
179 for (auto m : methods)
180 {
182 methodobj[jss::method] = m.first;
183 methodobj[jss::duration_us] =
184 std::to_string(std::chrono::duration_cast<microseconds>(present - m.second).count());
185 methodsArray.append(methodobj);
186 }
187
189 current[jss::jobs] = jobsArray;
190 current[jss::methods] = methodsArray;
191 return current;
192}
193
194//-----------------------------------------------------------------------------
195
196void
198{
199 if (setup_.perfLog.empty())
200 return;
201
202 if (logFile_.is_open())
203 logFile_.close();
204
205 auto logDir = setup_.perfLog.parent_path();
206 if (!boost::filesystem::is_directory(logDir))
207 {
208 boost::system::error_code ec;
209 boost::filesystem::create_directories(logDir, ec);
210 if (ec)
211 {
212 JLOG(j_.fatal()) << "Unable to create performance log "
213 "directory "
214 << logDir << ": " << ec.message();
215 signalStop_();
216 return;
217 }
218 }
219
220 logFile_.open(setup_.perfLog.c_str(), std::ios::out | std::ios::app);
221
222 if (!logFile_)
223 {
224 JLOG(j_.fatal()) << "Unable to open performance log " << setup_.perfLog << ".";
225 signalStop_();
226 }
227}
228
229void
231{
234
235 while (true)
236 {
237 {
239 if (cond_.wait_until(lock, lastLog_ + setup_.logInterval, [&] { return stop_; }))
240 {
241 return;
242 }
243 if (rotate_)
244 {
245 openLog();
246 rotate_ = false;
247 }
248 }
249 report();
250 }
251}
252
253void
255{
256 if (!logFile_)
257 // If logFile_ is not writable do no further work.
258 return;
259
260 auto const present = system_clock::now();
261 if (present < lastLog_ + setup_.logInterval)
262 return;
263 lastLog_ = present;
264
266 report[jss::time] = to_string(std::chrono::floor<microseconds>(present));
267 {
269 report[jss::workers] = static_cast<unsigned int>(counters_.jobs_.size());
270 }
271 report[jss::hostid] = hostname_;
272 report[jss::counters] = counters_.countersJson();
273 report[jss::nodestore] = Json::objectValue;
274 app_.getNodeStore().getCountsJson(report[jss::nodestore]);
275 report[jss::current_activities] = counters_.currentJson();
277
278 logFile_ << Json::Compact{std::move(report)} << std::endl;
279}
280
281PerfLogImp::PerfLogImp(Setup const& setup, Application& app, beast::Journal journal, std::function<void()>&& signalStop)
282 : setup_(setup), app_(app), j_(journal), signalStop_(std::move(signalStop))
283{
284 openLog();
285}
286
288{
289 stop();
290}
291
292void
293PerfLogImp::rpcStart(std::string const& method, std::uint64_t const requestId)
294{
295 auto counter = counters_.rpc_.find(method);
296 if (counter == counters_.rpc_.end())
297 {
298 // LCOV_EXCL_START
299 UNREACHABLE("xrpl::perf::PerfLogImp::rpcStart : valid method input");
300 return;
301 // LCOV_EXCL_STOP
302 }
303
304 {
305 std::lock_guard lock(counter->second.mutex);
306 ++counter->second.value.started;
307 }
309 counters_.methods_[requestId] = {counter->first.c_str(), steady_clock::now()};
310}
311
312void
313PerfLogImp::rpcEnd(std::string const& method, std::uint64_t const requestId, bool finish)
314{
315 auto counter = counters_.rpc_.find(method);
316 if (counter == counters_.rpc_.end())
317 {
318 // LCOV_EXCL_START
319 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid method input");
320 return;
321 // LCOV_EXCL_STOP
322 }
323 steady_time_point startTime;
324 {
326 auto const e = counters_.methods_.find(requestId);
327 if (e != counters_.methods_.end())
328 {
329 startTime = e->second.second;
330 counters_.methods_.erase(e);
331 }
332 else
333 {
334 // LCOV_EXCL_START
335 UNREACHABLE("xrpl::perf::PerfLogImp::rpcEnd : valid requestId input");
336 // LCOV_EXCL_STOP
337 }
338 }
339 std::lock_guard lock(counter->second.mutex);
340 if (finish)
341 ++counter->second.value.finished;
342 else
343 ++counter->second.value.errored;
344 counter->second.value.duration += std::chrono::duration_cast<microseconds>(steady_clock::now() - startTime);
345}
346
347void
349{
350 auto counter = counters_.jq_.find(type);
351 if (counter == counters_.jq_.end())
352 {
353 // LCOV_EXCL_START
354 UNREACHABLE("xrpl::perf::PerfLogImp::jobQueue : valid job type input");
355 return;
356 // LCOV_EXCL_STOP
357 }
358 std::lock_guard lock(counter->second.mutex);
359 ++counter->second.value.queued;
360}
361
362void
363PerfLogImp::jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)
364{
365 auto counter = counters_.jq_.find(type);
366 if (counter == counters_.jq_.end())
367 {
368 // LCOV_EXCL_START
369 UNREACHABLE("xrpl::perf::PerfLogImp::jobStart : valid job type input");
370 return;
371 // LCOV_EXCL_STOP
372 }
373
374 {
375 std::lock_guard lock(counter->second.mutex);
376 ++counter->second.value.started;
377 counter->second.value.queuedDuration += dur;
378 }
380 if (instance >= 0 && instance < counters_.jobs_.size())
381 counters_.jobs_[instance] = {type, startTime};
382}
383
384void
385PerfLogImp::jobFinish(JobType const type, microseconds dur, int instance)
386{
387 auto counter = counters_.jq_.find(type);
388 if (counter == counters_.jq_.end())
389 {
390 // LCOV_EXCL_START
391 UNREACHABLE("xrpl::perf::PerfLogImp::jobFinish : valid job type input");
392 return;
393 // LCOV_EXCL_STOP
394 }
395
396 {
397 std::lock_guard lock(counter->second.mutex);
398 ++counter->second.value.finished;
399 counter->second.value.runningDuration += dur;
400 }
402 if (instance >= 0 && instance < counters_.jobs_.size())
404}
405
406void
407PerfLogImp::resizeJobs(int const resize)
408{
410 if (resize > counters_.jobs_.size())
412}
413
414void
416{
417 if (setup_.perfLog.empty())
418 return;
419
421 rotate_ = true;
423}
424
425void
427{
428 if (setup_.perfLog.size())
430}
431
432void
434{
435 if (thread_.joinable())
436 {
437 {
439 stop_ = true;
441 }
442 thread_.join();
443 }
444}
445
446//-----------------------------------------------------------------------------
447
449setup_PerfLog(Section const& section, boost::filesystem::path const& configDir)
450{
451 PerfLog::Setup setup;
452 std::string perfLog;
453 set(perfLog, "perf_log", section);
454 if (perfLog.size())
455 {
456 setup.perfLog = boost::filesystem::path(perfLog);
457 if (setup.perfLog.is_relative())
458 {
459 setup.perfLog = boost::filesystem::absolute(setup.perfLog, configDir);
460 }
461 }
462
463 std::uint64_t logInterval;
464 if (get_if_exists(section, "log_interval", logInterval))
465 setup.logInterval = std::chrono::seconds(logInterval);
466 return setup;
467}
468
470make_PerfLog(PerfLog::Setup const& setup, Application& app, beast::Journal journal, std::function<void()>&& signalStop)
471{
472 return std::make_unique<PerfLogImp>(setup, app, journal, std::move(signalStop));
473}
474
475} // namespace perf
476} // namespace xrpl
Decorator for streaming out compact json.
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream fatal() const
Definition Journal.h:325
Map::size_type size() const
Definition JobTypes.h:129
static std::string const & name(JobType jt)
Definition JobTypes.h:105
virtual void stateAccounting(Json::Value &obj)=0
void getCountsJson(Json::Value &obj)
Definition Database.cpp:225
Holds a collection of configuration values.
Definition BasicConfig.h:25
virtual NetworkOPs & getOPs()=0
virtual NodeStore::Database & getNodeStore()=0
std::condition_variable cond_
Definition PerfLogImp.h:110
void jobFinish(JobType const type, microseconds dur, int instance) override
Log job finishing.
void start() override
std::ofstream logFile_
Definition PerfLogImp.h:107
void rpcStart(std::string const &method, std::uint64_t const requestId) override
Log start of RPC call.
void jobQueue(JobType const type) override
Log queued job.
void resizeJobs(int const resize) override
Ensure enough room to store each currently executing job.
system_time_point lastLog_
Definition PerfLogImp.h:111
beast::Journal const j_
Definition PerfLogImp.h:104
void rpcEnd(std::string const &method, std::uint64_t const requestId, bool finish)
void stop() override
void rotate() override
Rotate perf log file.
std::function< void()> const signalStop_
Definition PerfLogImp.h:105
PerfLogImp(Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance) override
Log job executing.
std::string const hostname_
Definition PerfLogImp.h:112
std::chrono::time_point< steady_clock > steady_time_point
Definition PerfLog.h:36
T close(T... args)
T endl(T... args)
T is_open(T... args)
T is_same_v
T join(T... args)
T joinable(T... args)
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
std::unique_ptr< PerfLog > make_PerfLog(PerfLog::Setup const &setup, Application &app, beast::Journal journal, std::function< void()> &&signalStop)
PerfLog::Setup setup_PerfLog(Section const &section, boost::filesystem::path const &configDir)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
bool set(T &target, std::string const &name, Section const &section)
Set a value from a configuration Section If the named value is not found or doesn't parse as a T,...
std::string to_string(base_uint< Bits, Tag > const &a)
Definition base_uint.h:598
@ current
This was a new validation and was added.
JobType
Definition Job.h:15
@ jtINVALID
Definition Job.h:17
bool get_if_exists(Section const &section, std::string const &name, T &v)
T open(T... args)
T push_back(T... args)
T reserve(T... args)
T resize(T... args)
T size(T... args)
Job Queue task performance counters.
Definition PerfLogImp.h:75
RPC performance counters.
Definition PerfLogImp.h:61
std::unordered_map< JobType, Locked< Jq > > jq_
Definition PerfLogImp.h:89
std::vector< std::pair< JobType, steady_time_point > > jobs_
Definition PerfLogImp.h:90
Json::Value currentJson() const
std::unordered_map< std::uint64_t, MethodStart > methods_
Definition PerfLogImp.h:92
std::unordered_map< std::string, Locked< Rpc > > rpc_
Definition PerfLogImp.h:88
Json::Value countersJson() const
Counters(std::set< char const * > const &labels, JobTypes const &jobTypes)
Configuration from [perf] section of xrpld.cfg.
Definition PerfLog.h:46
boost::filesystem::path perfLog
Definition PerfLog.h:47
milliseconds logInterval
Definition PerfLog.h:49
T to_string(T... args)