rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/core/JobQueue.h>
21#include <xrpld/perflog/PerfLog.h>
22#include <xrpl/basics/contract.h>
23
24#include <mutex>
25
26namespace ripple {
27
29 int threadCount,
30 beast::insight::Collector::ptr const& collector,
31 beast::Journal journal,
32 Logs& logs,
33 perf::PerfLog& perfLog)
34 : m_journal(journal)
35 , m_lastJob(0)
36 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
37 , m_processCount(0)
38 , m_workers(*this, &perfLog, "JobQueue", threadCount)
39 , perfLog_(perfLog)
40 , m_collector(collector)
41{
42 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
43
44 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
45 job_count = m_collector->make_gauge("job_count");
46
47 {
49
50 for (auto const& x : JobTypes::instance())
51 {
52 JobTypeInfo const& jt = x.second;
53
54 // And create dynamic information for all jobs
55 auto const result(m_jobData.emplace(
56 std::piecewise_construct,
59 XRPL_ASSERT(
60 result.second == true,
61 "ripple::JobQueue::JobQueue : jobs added");
62 (void)result.second;
63 }
64 }
65}
66
68{
69 // Must unhook before destroying
71}
72
73void
75{
77 job_count = m_jobSet.size();
78}
79
80bool
82 JobType type,
83 std::string const& name,
84 JobFunction const& func)
85{
86 XRPL_ASSERT(
87 type != jtINVALID,
88 "ripple::JobQueue::addRefCountedJob : valid input job type");
89
90 auto iter(m_jobData.find(type));
91 XRPL_ASSERT(
92 iter != m_jobData.end(),
93 "ripple::JobQueue::addRefCountedJob : job type found in jobs");
94 if (iter == m_jobData.end())
95 return false;
96
97 JLOG(m_journal.debug())
98 << __func__ << " : Adding job : " << name << " : " << type;
99 JobTypeData& data(iter->second);
100
101 // FIXME: Workaround incorrect client shutdown ordering
102 // do not add jobs to a queue with no threads
103 XRPL_ASSERT(
104 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
106 "ripple::JobQueue::addRefCountedJob : threads available or job "
107 "requires no threads");
108
109 {
111 auto result =
112 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
113 auto const& job = *result.first;
114
115 JobType const type(job.getType());
116 XRPL_ASSERT(
117 type != jtINVALID,
118 "ripple::JobQueue::addRefCountedJob : has valid job type");
119 XRPL_ASSERT(
120 m_jobSet.find(job) != m_jobSet.end(),
121 "ripple::JobQueue::addRefCountedJob : job found");
122 perfLog_.jobQueue(type);
123
124 JobTypeData& data(getJobTypeData(type));
125
126 if (data.waiting + data.running < getJobLimit(type))
127 {
129 }
130 else
131 {
132 // defer the task until we go below the limit
133 ++data.deferred;
134 }
135 ++data.waiting;
136 }
137 return true;
138}
139
140int
142{
144
145 JobDataMap::const_iterator c = m_jobData.find(t);
146
147 return (c == m_jobData.end()) ? 0 : c->second.waiting;
148}
149
150int
152{
154
155 JobDataMap::const_iterator c = m_jobData.find(t);
156
157 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
158}
159
160int
162{
163 // return the number of jobs at this priority level or greater
164 int ret = 0;
165
167
168 for (auto const& x : m_jobData)
169 {
170 if (x.first >= t)
171 ret += x.second.waiting;
172 }
173
174 return ret;
175}
176
179{
180 JobDataMap::iterator iter(m_jobData.find(t));
181 XRPL_ASSERT(
182 iter != m_jobData.end(),
183 "ripple::JobQueue::makeLoadEvent : valid job type input");
184
185 if (iter == m_jobData.end())
186 return {};
187
188 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
189}
190
191void
193{
194 if (isStopped())
195 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
196
197 JobDataMap::iterator iter(m_jobData.find(t));
198 XRPL_ASSERT(
199 iter != m_jobData.end(),
200 "ripple::JobQueue::addLoadEvents : valid job type input");
201 iter->second.load().addSamples(count, elapsed);
202}
203
204bool
206{
207 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
208 return entry.second.load().isOver();
209 });
210}
211
214{
215 using namespace std::chrono_literals;
217
218 ret["threads"] = m_workers.getNumberOfThreads();
219
220 Json::Value priorities = Json::arrayValue;
221
223
224 for (auto& x : m_jobData)
225 {
226 XRPL_ASSERT(
227 x.first != jtINVALID, "ripple::JobQueue::getJson : valid job type");
228
229 if (x.first == jtGENERIC)
230 continue;
231
232 JobTypeData& data(x.second);
233
234 LoadMonitor::Stats stats(data.stats());
235
236 int waiting(data.waiting);
237 int running(data.running);
238
239 if ((stats.count != 0) || (waiting != 0) ||
240 (stats.latencyPeak != 0ms) || (running != 0))
241 {
242 Json::Value& pri = priorities.append(Json::objectValue);
243
244 pri["job_type"] = data.name();
245
246 if (stats.isOverloaded)
247 pri["over_target"] = true;
248
249 if (waiting != 0)
250 pri["waiting"] = waiting;
251
252 if (stats.count != 0)
253 pri["per_second"] = static_cast<int>(stats.count);
254
255 if (stats.latencyPeak != 0ms)
256 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
257
258 if (stats.latencyAvg != 0ms)
259 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
260
261 if (running != 0)
262 pri["in_progress"] = running;
263 }
264 }
265
266 ret["job_types"] = priorities;
267
268 return ret;
269}
270
271void
273{
275 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
276}
277
280{
281 JobDataMap::iterator c(m_jobData.find(type));
282 XRPL_ASSERT(
283 c != m_jobData.end(),
284 "ripple::JobQueue::getJobTypeData : valid job type input");
285
286 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
287 // and use something sane.
288 if (c == m_jobData.end())
289 return m_invalidJobData;
290
291 return c->second;
292}
293
294void
296{
297 stopping_ = true;
298 using namespace std::chrono_literals;
299 jobCounter_.join("JobQueue", 1s, m_journal);
300 {
301 // After the JobCounter is joined, all jobs have finished executing
302 // (i.e. returned from `Job::doJob`) and no more are being accepted,
303 // but there may still be some threads between the return of
304 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
305 // we must wait on the condition variable to make these assertions.
307 cv_.wait(
308 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
309 XRPL_ASSERT(
310 m_processCount == 0,
311 "ripple::JobQueue::stop : all processes completed");
312 XRPL_ASSERT(
313 m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
314 XRPL_ASSERT(
315 nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
316 stopped_ = true;
317 }
318}
319
320bool
322{
323 return stopped_;
324}
325
326void
328{
329 XRPL_ASSERT(
330 !m_jobSet.empty(), "ripple::JobQueue::getNextJob : non-empty jobs");
331
333 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
334 {
335 JobType const type = iter->getType();
336 XRPL_ASSERT(
337 type != jtINVALID, "ripple::JobQueue::getNextJob : valid job type");
338
339 JobTypeData& data(getJobTypeData(type));
340 XRPL_ASSERT(
341 data.running <= getJobLimit(type),
342 "ripple::JobQueue::getNextJob : maximum jobs running");
343
344 // Run this job if we're running below the limit.
345 if (data.running < getJobLimit(data.type()))
346 {
347 XRPL_ASSERT(
348 data.waiting > 0,
349 "ripple::JobQueue::getNextJob : positive data waiting");
350 --data.waiting;
351 ++data.running;
352 break;
353 }
354 }
355
356 XRPL_ASSERT(
357 iter != m_jobSet.end(),
358 "ripple::JobQueue::getNextJob : found next job");
359 job = *iter;
360 m_jobSet.erase(iter);
361}
362
363void
365{
366 XRPL_ASSERT(
367 type != jtINVALID,
368 "ripple::JobQueue::finishJob : valid input job type");
369
370 JobTypeData& data = getJobTypeData(type);
371
372 // Queue a deferred task if possible
373 if (data.deferred > 0)
374 {
375 XRPL_ASSERT(
376 data.running + data.waiting >= getJobLimit(type),
377 "ripple::JobQueue::finishJob : job limit");
378
379 --data.deferred;
381 }
382
383 --data.running;
384}
385
386void
388{
389 JobType type;
390
391 {
392 using namespace std::chrono;
393 Job::clock_type::time_point const start_time(Job::clock_type::now());
394 {
395 Job job;
396 {
398 getNextJob(job);
400 }
401 type = job.getType();
402 JobTypeData& data(getJobTypeData(type));
403 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
404
405 // The amount of time that the job was in the queue
406 auto const q_time =
407 ceil<microseconds>(start_time - job.queue_time());
408 perfLog_.jobStart(type, q_time, start_time, instance);
409
410 job.doJob();
411
412 // The amount of time it took to execute the job
413 auto const x_time =
414 ceil<microseconds>(Job::clock_type::now() - start_time);
415
416 if (x_time >= 10ms || q_time >= 10ms)
417 {
418 getJobTypeData(type).dequeue.notify(q_time);
419 getJobTypeData(type).execute.notify(x_time);
420 }
421 perfLog_.jobFinish(type, x_time, instance);
422 }
423 }
424
425 {
427 // Job should be destroyed before stopping
428 // otherwise destructors with side effects can access
429 // parent objects that are already destroyed.
430 finishJob(type);
431 if (--m_processCount == 0 && m_jobSet.empty())
432 cv_.notify_all();
433 }
434
435 // Note that when Job::~Job is called, the last reference
436 // to the associated LoadEvent object (in the Job) may be destroyed.
437}
438
439int
441{
442 JobTypeInfo const& j(JobTypes::instance().get(type));
443 XRPL_ASSERT(
444 j.type() != jtINVALID,
445 "ripple::JobQueue::getJobLimit : valid job type");
446
447 return j.limit();
448}
449
450} // namespace ripple
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition: json_value.h:148
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:897
A generic endpoint for log messages.
Definition: Journal.h:60
Stream debug() const
Definition: Journal.h:328
Stream info() const
Definition: Journal.h:334
Stream trace() const
Severity stream access functions.
Definition: Journal.h:322
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition: Event.h:64
A reference to a handler for performing polled collection.
Definition: Hook.h:32
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
JobTypeData m_invalidJobData
Definition: JobQueue.h:252
int getJobLimit(JobType type)
Definition: JobQueue.cpp:440
std::atomic_bool stopped_
Definition: JobQueue.h:250
std::uint64_t m_lastJob
Definition: JobQueue.h:246
JobDataMap m_jobData
Definition: JobQueue.h:251
void rendezvous()
Block until no jobs running.
Definition: JobQueue.cpp:272
JobCounter jobCounter_
Definition: JobQueue.h:248
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:151
bool isStopped() const
Definition: JobQueue.cpp:321
bool isOverloaded()
Definition: JobQueue.cpp:205
perf::PerfLog & perfLog_
Definition: JobQueue.h:263
Workers m_workers
Definition: JobQueue.h:260
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:279
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:161
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:192
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:213
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:264
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:81
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:178
std::set< Job > m_jobSet
Definition: JobQueue.h:247
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:141
beast::insight::Hook hook
Definition: JobQueue.h:266
beast::Journal m_journal
Definition: JobQueue.h:244
std::mutex m_mutex
Definition: JobQueue.h:245
beast::insight::Gauge job_count
Definition: JobQueue.h:265
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:387
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:28
std::atomic_bool stopping_
Definition: JobQueue.h:249
void finishJob(JobType type)
Definition: JobQueue.cpp:364
void getNextJob(Job &job)
Definition: JobQueue.cpp:327
std::condition_variable cv_
Definition: JobQueue.h:268
Holds all the 'static' information about a job, which does not change.
Definition: JobTypeInfo.h:29
JobType type() const
Definition: JobTypeInfo.h:64
int limit() const
Definition: JobTypeInfo.h:76
static JobTypes const & instance()
Definition: JobTypes.h:128
void doJob()
Definition: Job.cpp:61
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition: Job.cpp:55
JobType getType() const
Definition: Job.cpp:49
Manages partitions for logging.
Definition: Log.h:51
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
void addTask()
Add a task to be performed.
Definition: Workers.cpp:128
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:51
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
virtual void jobQueue(JobType const type)=0
Log queued job.
T emplace(T... args)
T end(T... args)
T find(T... args)
T forward_as_tuple(T... args)
@ arrayValue
array value (ordered list)
Definition: json_value.h:43
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:44
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
Definition: BasicConfig.h:355
JobType
Definition: Job.h:34
@ jtINVALID
Definition: Job.h:36
@ jtGENERIC
Definition: Job.h:86
@ jtCLIENT
Definition: Job.h:44
@ jtCLIENT_WEBSOCKET
Definition: Job.h:50
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
Definition: contract.cpp:50
beast::insight::Event execute
Definition: JobTypeData.h:52
beast::insight::Event dequeue
Definition: JobTypeData.h:51
std::chrono::milliseconds latencyAvg
Definition: LoadMonitor.h:62
std::chrono::milliseconds latencyPeak
Definition: LoadMonitor.h:63