rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/core/JobQueue.h>
21#include <xrpld/perflog/PerfLog.h>
22
23#include <xrpl/basics/contract.h>
24
25#include <mutex>
26
27namespace ripple {
28
30 int threadCount,
31 beast::insight::Collector::ptr const& collector,
32 beast::Journal journal,
33 Logs& logs,
34 perf::PerfLog& perfLog)
35 : m_journal(journal)
36 , m_lastJob(0)
37 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
38 , m_processCount(0)
39 , m_workers(*this, &perfLog, "JobQueue", threadCount)
40 , perfLog_(perfLog)
41 , m_collector(collector)
42{
43 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
44
45 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
46 job_count = m_collector->make_gauge("job_count");
47
48 {
50
51 for (auto const& x : JobTypes::instance())
52 {
53 JobTypeInfo const& jt = x.second;
54
55 // And create dynamic information for all jobs
56 auto const result(m_jobData.emplace(
60 XRPL_ASSERT(
61 result.second == true,
62 "ripple::JobQueue::JobQueue : jobs added");
63 (void)result.second;
64 }
65 }
66}
67
69{
70 // Must unhook before destroying
72}
73
74void
80
81bool
83 JobType type,
84 std::string const& name,
85 JobFunction const& func)
86{
87 XRPL_ASSERT(
88 type != jtINVALID,
89 "ripple::JobQueue::addRefCountedJob : valid input job type");
90
91 auto iter(m_jobData.find(type));
92 XRPL_ASSERT(
93 iter != m_jobData.end(),
94 "ripple::JobQueue::addRefCountedJob : job type found in jobs");
95 if (iter == m_jobData.end())
96 return false;
97
98 JLOG(m_journal.debug())
99 << __func__ << " : Adding job : " << name << " : " << type;
100 JobTypeData& data(iter->second);
101
102 // FIXME: Workaround incorrect client shutdown ordering
103 // do not add jobs to a queue with no threads
104 XRPL_ASSERT(
105 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
107 "ripple::JobQueue::addRefCountedJob : threads available or job "
108 "requires no threads");
109
110 {
112 auto result =
113 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
114 auto const& job = *result.first;
115
116 JobType const type(job.getType());
117 XRPL_ASSERT(
118 type != jtINVALID,
119 "ripple::JobQueue::addRefCountedJob : has valid job type");
120 XRPL_ASSERT(
121 m_jobSet.find(job) != m_jobSet.end(),
122 "ripple::JobQueue::addRefCountedJob : job found");
123 perfLog_.jobQueue(type);
124
125 JobTypeData& data(getJobTypeData(type));
126
127 if (data.waiting + data.running < getJobLimit(type))
128 {
130 }
131 else
132 {
133 // defer the task until we go below the limit
134 ++data.deferred;
135 }
136 ++data.waiting;
137 }
138 return true;
139}
140
141int
143{
145
146 JobDataMap::const_iterator c = m_jobData.find(t);
147
148 return (c == m_jobData.end()) ? 0 : c->second.waiting;
149}
150
151int
153{
155
156 JobDataMap::const_iterator c = m_jobData.find(t);
157
158 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
159}
160
161int
163{
164 // return the number of jobs at this priority level or greater
165 int ret = 0;
166
168
169 for (auto const& x : m_jobData)
170 {
171 if (x.first >= t)
172 ret += x.second.waiting;
173 }
174
175 return ret;
176}
177
180{
181 JobDataMap::iterator iter(m_jobData.find(t));
182 XRPL_ASSERT(
183 iter != m_jobData.end(),
184 "ripple::JobQueue::makeLoadEvent : valid job type input");
185
186 if (iter == m_jobData.end())
187 return {};
188
189 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
190}
191
192void
194{
195 if (isStopped())
196 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
197
198 JobDataMap::iterator iter(m_jobData.find(t));
199 XRPL_ASSERT(
200 iter != m_jobData.end(),
201 "ripple::JobQueue::addLoadEvents : valid job type input");
202 iter->second.load().addSamples(count, elapsed);
203}
204
205bool
207{
208 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
209 return entry.second.load().isOver();
210 });
211}
212
215{
216 using namespace std::chrono_literals;
218
219 ret["threads"] = m_workers.getNumberOfThreads();
220
221 Json::Value priorities = Json::arrayValue;
222
224
225 for (auto& x : m_jobData)
226 {
227 XRPL_ASSERT(
228 x.first != jtINVALID, "ripple::JobQueue::getJson : valid job type");
229
230 if (x.first == jtGENERIC)
231 continue;
232
233 JobTypeData& data(x.second);
234
235 LoadMonitor::Stats stats(data.stats());
236
237 int waiting(data.waiting);
238 int running(data.running);
239
240 if ((stats.count != 0) || (waiting != 0) ||
241 (stats.latencyPeak != 0ms) || (running != 0))
242 {
243 Json::Value& pri = priorities.append(Json::objectValue);
244
245 pri["job_type"] = data.name();
246
247 if (stats.isOverloaded)
248 pri["over_target"] = true;
249
250 if (waiting != 0)
251 pri["waiting"] = waiting;
252
253 if (stats.count != 0)
254 pri["per_second"] = static_cast<int>(stats.count);
255
256 if (stats.latencyPeak != 0ms)
257 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
258
259 if (stats.latencyAvg != 0ms)
260 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
261
262 if (running != 0)
263 pri["in_progress"] = running;
264 }
265 }
266
267 ret["job_types"] = priorities;
268
269 return ret;
270}
271
272void
274{
276 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
277}
278
281{
282 JobDataMap::iterator c(m_jobData.find(type));
283 XRPL_ASSERT(
284 c != m_jobData.end(),
285 "ripple::JobQueue::getJobTypeData : valid job type input");
286
287 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
288 // and use something sane.
289 if (c == m_jobData.end())
290 return m_invalidJobData;
291
292 return c->second;
293}
294
295void
297{
298 stopping_ = true;
299 using namespace std::chrono_literals;
300 jobCounter_.join("JobQueue", 1s, m_journal);
301 {
302 // After the JobCounter is joined, all jobs have finished executing
303 // (i.e. returned from `Job::doJob`) and no more are being accepted,
304 // but there may still be some threads between the return of
305 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
306 // we must wait on the condition variable to make these assertions.
308 cv_.wait(
309 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
310 XRPL_ASSERT(
311 m_processCount == 0,
312 "ripple::JobQueue::stop : all processes completed");
313 XRPL_ASSERT(
314 m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
315 XRPL_ASSERT(
316 nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
317 stopped_ = true;
318 }
319}
320
321bool
323{
324 return stopped_;
325}
326
327void
329{
330 XRPL_ASSERT(
331 !m_jobSet.empty(), "ripple::JobQueue::getNextJob : non-empty jobs");
332
334 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
335 {
336 JobType const type = iter->getType();
337 XRPL_ASSERT(
338 type != jtINVALID, "ripple::JobQueue::getNextJob : valid job type");
339
340 JobTypeData& data(getJobTypeData(type));
341 XRPL_ASSERT(
342 data.running <= getJobLimit(type),
343 "ripple::JobQueue::getNextJob : maximum jobs running");
344
345 // Run this job if we're running below the limit.
346 if (data.running < getJobLimit(data.type()))
347 {
348 XRPL_ASSERT(
349 data.waiting > 0,
350 "ripple::JobQueue::getNextJob : positive data waiting");
351 --data.waiting;
352 ++data.running;
353 break;
354 }
355 }
356
357 XRPL_ASSERT(
358 iter != m_jobSet.end(),
359 "ripple::JobQueue::getNextJob : found next job");
360 job = *iter;
361 m_jobSet.erase(iter);
362}
363
364void
366{
367 XRPL_ASSERT(
368 type != jtINVALID,
369 "ripple::JobQueue::finishJob : valid input job type");
370
371 JobTypeData& data = getJobTypeData(type);
372
373 // Queue a deferred task if possible
374 if (data.deferred > 0)
375 {
376 XRPL_ASSERT(
377 data.running + data.waiting >= getJobLimit(type),
378 "ripple::JobQueue::finishJob : job limit");
379
380 --data.deferred;
382 }
383
384 --data.running;
385}
386
387void
389{
390 JobType type;
391
392 {
393 using namespace std::chrono;
394 Job::clock_type::time_point const start_time(Job::clock_type::now());
395 {
396 Job job;
397 {
399 getNextJob(job);
401 }
402 type = job.getType();
403 JobTypeData& data(getJobTypeData(type));
404 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
405
406 // The amount of time that the job was in the queue
407 auto const q_time =
408 ceil<microseconds>(start_time - job.queue_time());
409 perfLog_.jobStart(type, q_time, start_time, instance);
410
411 job.doJob();
412
413 // The amount of time it took to execute the job
414 auto const x_time =
415 ceil<microseconds>(Job::clock_type::now() - start_time);
416
417 if (x_time >= 10ms || q_time >= 10ms)
418 {
419 getJobTypeData(type).dequeue.notify(q_time);
420 getJobTypeData(type).execute.notify(x_time);
421 }
422 perfLog_.jobFinish(type, x_time, instance);
423 }
424 }
425
426 {
428 // Job should be destroyed before stopping
429 // otherwise destructors with side effects can access
430 // parent objects that are already destroyed.
431 finishJob(type);
432 if (--m_processCount == 0 && m_jobSet.empty())
433 cv_.notify_all();
434 }
435
436 // Note that when Job::~Job is called, the last reference
437 // to the associated LoadEvent object (in the Job) may be destroyed.
438}
439
440int
442{
443 JobTypeInfo const& j(JobTypes::instance().get(type));
444 XRPL_ASSERT(
445 j.type() != jtINVALID,
446 "ripple::JobQueue::getJobLimit : valid job type");
447
448 return j.limit();
449}
450
451} // namespace ripple
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:149
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:60
Stream debug() const
Definition Journal.h:328
Stream info() const
Definition Journal.h:334
Stream trace() const
Severity stream access functions.
Definition Journal.h:322
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:64
A reference to a handler for performing polled collection.
Definition Hook.h:32
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
JobTypeData m_invalidJobData
Definition JobQueue.h:255
int getJobLimit(JobType type)
Definition JobQueue.cpp:441
std::atomic_bool stopped_
Definition JobQueue.h:253
std::uint64_t m_lastJob
Definition JobQueue.h:249
JobDataMap m_jobData
Definition JobQueue.h:254
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:273
JobCounter jobCounter_
Definition JobQueue.h:251
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:152
bool isStopped() const
Definition JobQueue.cpp:322
perf::PerfLog & perfLog_
Definition JobQueue.h:266
Workers m_workers
Definition JobQueue.h:263
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:280
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:162
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:193
Json::Value getJson(int c=0)
Definition JobQueue.cpp:214
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:267
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:82
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:179
std::set< Job > m_jobSet
Definition JobQueue.h:250
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:142
beast::insight::Hook hook
Definition JobQueue.h:269
beast::Journal m_journal
Definition JobQueue.h:247
std::mutex m_mutex
Definition JobQueue.h:248
beast::insight::Gauge job_count
Definition JobQueue.h:268
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:388
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:29
std::atomic_bool stopping_
Definition JobQueue.h:252
void finishJob(JobType type)
Definition JobQueue.cpp:365
void getNextJob(Job &job)
Definition JobQueue.cpp:328
std::condition_variable cv_
Definition JobQueue.h:271
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:29
JobType type() const
Definition JobTypeInfo.h:64
int limit() const
Definition JobTypeInfo.h:76
static JobTypes const & instance()
Definition JobTypes.h:128
void doJob()
Definition Job.cpp:62
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:56
JobType getType() const
Definition Job.cpp:50
Manages partitions for logging.
Definition Log.h:52
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:54
void addTask()
Add a task to be performed.
Definition Workers.cpp:129
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:52
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
virtual void jobQueue(JobType const type)=0
Log queued job.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:44
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:45
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:35
@ jtINVALID
Definition Job.h:37
@ jtGENERIC
Definition Job.h:87
@ jtCLIENT
Definition Job.h:45
@ jtCLIENT_WEBSOCKET
Definition Job.h:51
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T piecewise_construct
beast::insight::Event execute
Definition JobTypeData.h:53
beast::insight::Event dequeue
Definition JobTypeData.h:52
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:63
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:64