rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/core/JobQueue.h>
21#include <xrpld/perflog/PerfLog.h>
22#include <xrpl/basics/contract.h>
23#include <mutex>
24
25namespace ripple {
26
28 int threadCount,
29 beast::insight::Collector::ptr const& collector,
30 beast::Journal journal,
31 Logs& logs,
32 perf::PerfLog& perfLog)
33 : m_journal(journal)
34 , m_lastJob(0)
35 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
36 , m_processCount(0)
37 , m_workers(*this, &perfLog, "JobQueue", threadCount)
38 , perfLog_(perfLog)
39 , m_collector(collector)
40{
41 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
42
43 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
44 job_count = m_collector->make_gauge("job_count");
45
46 {
48
49 for (auto const& x : JobTypes::instance())
50 {
51 JobTypeInfo const& jt = x.second;
52
53 // And create dynamic information for all jobs
54 auto const result(m_jobData.emplace(
55 std::piecewise_construct,
58 XRPL_ASSERT(
59 result.second == true,
60 "ripple::JobQueue::JobQueue : jobs added");
61 (void)result.second;
62 }
63 }
64}
65
67{
68 // Must unhook before destroying
70}
71
72void
74{
76 job_count = m_jobSet.size();
77}
78
79bool
81 JobType type,
82 std::string const& name,
83 JobFunction const& func)
84{
85 XRPL_ASSERT(
86 type != jtINVALID,
87 "ripple::JobQueue::addRefCountedJob : valid input job type");
88
89 auto iter(m_jobData.find(type));
90 XRPL_ASSERT(
91 iter != m_jobData.end(),
92 "ripple::JobQueue::addRefCountedJob : job type found in jobs");
93 if (iter == m_jobData.end())
94 return false;
95
96 JLOG(m_journal.debug())
97 << __func__ << " : Adding job : " << name << " : " << type;
98 JobTypeData& data(iter->second);
99
100 // FIXME: Workaround incorrect client shutdown ordering
101 // do not add jobs to a queue with no threads
102 XRPL_ASSERT(
103 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
105 "ripple::JobQueue::addRefCountedJob : threads available or job "
106 "requires no threads");
107
108 {
110 auto result =
111 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
112 auto const& job = *result.first;
113
114 JobType const type(job.getType());
115 XRPL_ASSERT(
116 type != jtINVALID,
117 "ripple::JobQueue::addRefCountedJob : has valid job type");
118 XRPL_ASSERT(
119 m_jobSet.find(job) != m_jobSet.end(),
120 "ripple::JobQueue::addRefCountedJob : job found");
121 perfLog_.jobQueue(type);
122
123 JobTypeData& data(getJobTypeData(type));
124
125 if (data.waiting + data.running < getJobLimit(type))
126 {
128 }
129 else
130 {
131 // defer the task until we go below the limit
132 ++data.deferred;
133 }
134 ++data.waiting;
135 }
136 return true;
137}
138
139int
141{
143
144 JobDataMap::const_iterator c = m_jobData.find(t);
145
146 return (c == m_jobData.end()) ? 0 : c->second.waiting;
147}
148
149int
151{
153
154 JobDataMap::const_iterator c = m_jobData.find(t);
155
156 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
157}
158
159int
161{
162 // return the number of jobs at this priority level or greater
163 int ret = 0;
164
166
167 for (auto const& x : m_jobData)
168 {
169 if (x.first >= t)
170 ret += x.second.waiting;
171 }
172
173 return ret;
174}
175
178{
179 JobDataMap::iterator iter(m_jobData.find(t));
180 XRPL_ASSERT(
181 iter != m_jobData.end(),
182 "ripple::JobQueue::makeLoadEvent : valid job type input");
183
184 if (iter == m_jobData.end())
185 return {};
186
187 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
188}
189
190void
192{
193 if (isStopped())
194 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
195
196 JobDataMap::iterator iter(m_jobData.find(t));
197 XRPL_ASSERT(
198 iter != m_jobData.end(),
199 "ripple::JobQueue::addLoadEvents : valid job type input");
200 iter->second.load().addSamples(count, elapsed);
201}
202
203bool
205{
206 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
207 return entry.second.load().isOver();
208 });
209}
210
213{
214 using namespace std::chrono_literals;
216
217 ret["threads"] = m_workers.getNumberOfThreads();
218
219 Json::Value priorities = Json::arrayValue;
220
222
223 for (auto& x : m_jobData)
224 {
225 XRPL_ASSERT(
226 x.first != jtINVALID, "ripple::JobQueue::getJson : valid job type");
227
228 if (x.first == jtGENERIC)
229 continue;
230
231 JobTypeData& data(x.second);
232
233 LoadMonitor::Stats stats(data.stats());
234
235 int waiting(data.waiting);
236 int running(data.running);
237
238 if ((stats.count != 0) || (waiting != 0) ||
239 (stats.latencyPeak != 0ms) || (running != 0))
240 {
241 Json::Value& pri = priorities.append(Json::objectValue);
242
243 pri["job_type"] = data.name();
244
245 if (stats.isOverloaded)
246 pri["over_target"] = true;
247
248 if (waiting != 0)
249 pri["waiting"] = waiting;
250
251 if (stats.count != 0)
252 pri["per_second"] = static_cast<int>(stats.count);
253
254 if (stats.latencyPeak != 0ms)
255 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
256
257 if (stats.latencyAvg != 0ms)
258 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
259
260 if (running != 0)
261 pri["in_progress"] = running;
262 }
263 }
264
265 ret["job_types"] = priorities;
266
267 return ret;
268}
269
270void
272{
274 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
275}
276
279{
280 JobDataMap::iterator c(m_jobData.find(type));
281 XRPL_ASSERT(
282 c != m_jobData.end(),
283 "ripple::JobQueue::getJobTypeData : valid job type input");
284
285 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
286 // and use something sane.
287 if (c == m_jobData.end())
288 return m_invalidJobData;
289
290 return c->second;
291}
292
293void
295{
296 stopping_ = true;
297 using namespace std::chrono_literals;
298 jobCounter_.join("JobQueue", 1s, m_journal);
299 {
300 // After the JobCounter is joined, all jobs have finished executing
301 // (i.e. returned from `Job::doJob`) and no more are being accepted,
302 // but there may still be some threads between the return of
303 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
304 // we must wait on the condition variable to make these assertions.
306 cv_.wait(
307 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
308 XRPL_ASSERT(
309 m_processCount == 0,
310 "ripple::JobQueue::stop : all processes completed");
311 XRPL_ASSERT(
312 m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
313 XRPL_ASSERT(
314 nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
315 stopped_ = true;
316 }
317}
318
319bool
321{
322 return stopped_;
323}
324
325void
327{
328 XRPL_ASSERT(
329 !m_jobSet.empty(), "ripple::JobQueue::getNextJob : non-empty jobs");
330
332 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
333 {
334 JobType const type = iter->getType();
335 XRPL_ASSERT(
336 type != jtINVALID, "ripple::JobQueue::getNextJob : valid job type");
337
338 JobTypeData& data(getJobTypeData(type));
339 XRPL_ASSERT(
340 data.running <= getJobLimit(type),
341 "ripple::JobQueue::getNextJob : maximum jobs running");
342
343 // Run this job if we're running below the limit.
344 if (data.running < getJobLimit(data.type()))
345 {
346 XRPL_ASSERT(
347 data.waiting > 0,
348 "ripple::JobQueue::getNextJob : positive data waiting");
349 --data.waiting;
350 ++data.running;
351 break;
352 }
353 }
354
355 XRPL_ASSERT(
356 iter != m_jobSet.end(),
357 "ripple::JobQueue::getNextJob : found next job");
358 job = *iter;
359 m_jobSet.erase(iter);
360}
361
362void
364{
365 XRPL_ASSERT(
366 type != jtINVALID,
367 "ripple::JobQueue::finishJob : valid input job type");
368
369 JobTypeData& data = getJobTypeData(type);
370
371 // Queue a deferred task if possible
372 if (data.deferred > 0)
373 {
374 XRPL_ASSERT(
375 data.running + data.waiting >= getJobLimit(type),
376 "ripple::JobQueue::finishJob : job limit");
377
378 --data.deferred;
380 }
381
382 --data.running;
383}
384
385void
387{
388 JobType type;
389
390 {
391 using namespace std::chrono;
392 Job::clock_type::time_point const start_time(Job::clock_type::now());
393 {
394 Job job;
395 {
397 getNextJob(job);
399 }
400 type = job.getType();
401 JobTypeData& data(getJobTypeData(type));
402 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
403
404 // The amount of time that the job was in the queue
405 auto const q_time =
406 ceil<microseconds>(start_time - job.queue_time());
407 perfLog_.jobStart(type, q_time, start_time, instance);
408
409 job.doJob();
410
411 // The amount of time it took to execute the job
412 auto const x_time =
413 ceil<microseconds>(Job::clock_type::now() - start_time);
414
415 if (x_time >= 10ms || q_time >= 10ms)
416 {
417 getJobTypeData(type).dequeue.notify(q_time);
418 getJobTypeData(type).execute.notify(x_time);
419 }
420 perfLog_.jobFinish(type, x_time, instance);
421 }
422 }
423
424 {
426 // Job should be destroyed before stopping
427 // otherwise destructors with side effects can access
428 // parent objects that are already destroyed.
429 finishJob(type);
430 if (--m_processCount == 0 && m_jobSet.empty())
431 cv_.notify_all();
432 }
433
434 // Note that when Job::~Job is called, the last reference
435 // to the associated LoadEvent object (in the Job) may be destroyed.
436}
437
438int
440{
441 JobTypeInfo const& j(JobTypes::instance().get(type));
442 XRPL_ASSERT(
443 j.type() != jtINVALID,
444 "ripple::JobQueue::getJobLimit : valid job type");
445
446 return j.limit();
447}
448
449} // namespace ripple
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition: json_value.h:147
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:891
A generic endpoint for log messages.
Definition: Journal.h:59
Stream debug() const
Definition: Journal.h:327
Stream info() const
Definition: Journal.h:333
Stream trace() const
Severity stream access functions.
Definition: Journal.h:321
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition: Event.h:64
A reference to a handler for performing polled collection.
Definition: Hook.h:32
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
JobTypeData m_invalidJobData
Definition: JobQueue.h:253
int getJobLimit(JobType type)
Definition: JobQueue.cpp:439
std::atomic_bool stopped_
Definition: JobQueue.h:251
std::uint64_t m_lastJob
Definition: JobQueue.h:247
JobDataMap m_jobData
Definition: JobQueue.h:252
void rendezvous()
Block until no jobs running.
Definition: JobQueue.cpp:271
JobCounter jobCounter_
Definition: JobQueue.h:249
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:150
bool isStopped() const
Definition: JobQueue.cpp:320
bool isOverloaded()
Definition: JobQueue.cpp:204
perf::PerfLog & perfLog_
Definition: JobQueue.h:264
Workers m_workers
Definition: JobQueue.h:261
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:278
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:160
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:191
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:212
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:265
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:80
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:177
std::set< Job > m_jobSet
Definition: JobQueue.h:248
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:140
beast::insight::Hook hook
Definition: JobQueue.h:267
beast::Journal m_journal
Definition: JobQueue.h:245
std::mutex m_mutex
Definition: JobQueue.h:246
beast::insight::Gauge job_count
Definition: JobQueue.h:266
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:386
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:27
std::atomic_bool stopping_
Definition: JobQueue.h:250
void finishJob(JobType type)
Definition: JobQueue.cpp:363
void getNextJob(Job &job)
Definition: JobQueue.cpp:326
std::condition_variable cv_
Definition: JobQueue.h:269
Holds all the 'static' information about a job, which does not change.
Definition: JobTypeInfo.h:29
JobType type() const
Definition: JobTypeInfo.h:64
int limit() const
Definition: JobTypeInfo.h:76
static JobTypes const & instance()
Definition: JobTypes.h:129
void doJob()
Definition: Job.cpp:62
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition: Job.cpp:56
JobType getType() const
Definition: Job.cpp:50
Manages partitions for logging.
Definition: Log.h:49
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
void addTask()
Add a task to be performed.
Definition: Workers.cpp:128
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:49
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
virtual void jobQueue(JobType const type)=0
Log queued job.
T emplace(T... args)
T end(T... args)
T find(T... args)
T forward_as_tuple(T... args)
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
Definition: BasicConfig.h:356
JobType
Definition: Job.h:35
@ jtINVALID
Definition: Job.h:37
@ jtGENERIC
Definition: Job.h:87
@ jtCLIENT
Definition: Job.h:45
@ jtCLIENT_WEBSOCKET
Definition: Job.h:51
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
Definition: contract.cpp:48
beast::insight::Event execute
Definition: JobTypeData.h:52
beast::insight::Event dequeue
Definition: JobTypeData.h:51
std::chrono::milliseconds latencyAvg
Definition: LoadMonitor.h:61
std::chrono::milliseconds latencyPeak
Definition: LoadMonitor.h:62