rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/core/JobQueue.h>
3#include <xrpl/core/PerfLog.h>
4
5#include <mutex>
6
7namespace ripple {
8
10 int threadCount,
11 beast::insight::Collector::ptr const& collector,
12 beast::Journal journal,
13 Logs& logs,
14 perf::PerfLog& perfLog)
15 : m_journal(journal)
16 , m_lastJob(0)
17 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
18 , m_processCount(0)
19 , m_workers(*this, &perfLog, "JobQueue", threadCount)
20 , perfLog_(perfLog)
21 , m_collector(collector)
22{
23 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
24
25 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
26 job_count = m_collector->make_gauge("job_count");
27
28 {
30
31 for (auto const& x : JobTypes::instance())
32 {
33 JobTypeInfo const& jt = x.second;
34
35 // And create dynamic information for all jobs
36 auto const result(m_jobData.emplace(
40 XRPL_ASSERT(
41 result.second == true,
42 "ripple::JobQueue::JobQueue : jobs added");
43 (void)result.second;
44 }
45 }
46}
47
49{
50 // Must unhook before destroying
52}
53
54void
60
61bool
63 JobType type,
64 std::string const& name,
65 JobFunction const& func)
66{
67 XRPL_ASSERT(
68 type != jtINVALID,
69 "ripple::JobQueue::addRefCountedJob : valid input job type");
70
71 auto iter(m_jobData.find(type));
72 XRPL_ASSERT(
73 iter != m_jobData.end(),
74 "ripple::JobQueue::addRefCountedJob : job type found in jobs");
75 if (iter == m_jobData.end())
76 return false;
77
78 JLOG(m_journal.debug())
79 << __func__ << " : Adding job : " << name << " : " << type;
80 JobTypeData& data(iter->second);
81
82 // FIXME: Workaround incorrect client shutdown ordering
83 // do not add jobs to a queue with no threads
84 XRPL_ASSERT(
85 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
87 "ripple::JobQueue::addRefCountedJob : threads available or job "
88 "requires no threads");
89
90 {
92 auto result =
93 m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
94 auto const& job = *result.first;
95
96 JobType const type(job.getType());
97 XRPL_ASSERT(
98 type != jtINVALID,
99 "ripple::JobQueue::addRefCountedJob : has valid job type");
100 XRPL_ASSERT(
101 m_jobSet.find(job) != m_jobSet.end(),
102 "ripple::JobQueue::addRefCountedJob : job found");
103 perfLog_.jobQueue(type);
104
105 JobTypeData& data(getJobTypeData(type));
106
107 if (data.waiting + data.running < getJobLimit(type))
108 {
110 }
111 else
112 {
113 // defer the task until we go below the limit
114 ++data.deferred;
115 }
116 ++data.waiting;
117 }
118 return true;
119}
120
121int
123{
125
126 JobDataMap::const_iterator c = m_jobData.find(t);
127
128 return (c == m_jobData.end()) ? 0 : c->second.waiting;
129}
130
131int
133{
135
136 JobDataMap::const_iterator c = m_jobData.find(t);
137
138 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
139}
140
141int
143{
144 // return the number of jobs at this priority level or greater
145 int ret = 0;
146
148
149 for (auto const& x : m_jobData)
150 {
151 if (x.first >= t)
152 ret += x.second.waiting;
153 }
154
155 return ret;
156}
157
160{
161 JobDataMap::iterator iter(m_jobData.find(t));
162 XRPL_ASSERT(
163 iter != m_jobData.end(),
164 "ripple::JobQueue::makeLoadEvent : valid job type input");
165
166 if (iter == m_jobData.end())
167 return {};
168
169 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
170}
171
172void
174{
175 if (isStopped())
176 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
177
178 JobDataMap::iterator iter(m_jobData.find(t));
179 XRPL_ASSERT(
180 iter != m_jobData.end(),
181 "ripple::JobQueue::addLoadEvents : valid job type input");
182 iter->second.load().addSamples(count, elapsed);
183}
184
185bool
187{
188 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
189 return entry.second.load().isOver();
190 });
191}
192
195{
196 using namespace std::chrono_literals;
198
199 ret["threads"] = m_workers.getNumberOfThreads();
200
201 Json::Value priorities = Json::arrayValue;
202
204
205 for (auto& x : m_jobData)
206 {
207 XRPL_ASSERT(
208 x.first != jtINVALID, "ripple::JobQueue::getJson : valid job type");
209
210 if (x.first == jtGENERIC)
211 continue;
212
213 JobTypeData& data(x.second);
214
215 LoadMonitor::Stats stats(data.stats());
216
217 int waiting(data.waiting);
218 int running(data.running);
219
220 if ((stats.count != 0) || (waiting != 0) ||
221 (stats.latencyPeak != 0ms) || (running != 0))
222 {
223 Json::Value& pri = priorities.append(Json::objectValue);
224
225 pri["job_type"] = data.name();
226
227 if (stats.isOverloaded)
228 pri["over_target"] = true;
229
230 if (waiting != 0)
231 pri["waiting"] = waiting;
232
233 if (stats.count != 0)
234 pri["per_second"] = static_cast<int>(stats.count);
235
236 if (stats.latencyPeak != 0ms)
237 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
238
239 if (stats.latencyAvg != 0ms)
240 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
241
242 if (running != 0)
243 pri["in_progress"] = running;
244 }
245 }
246
247 ret["job_types"] = priorities;
248
249 return ret;
250}
251
252void
254{
256 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
257}
258
261{
262 JobDataMap::iterator c(m_jobData.find(type));
263 XRPL_ASSERT(
264 c != m_jobData.end(),
265 "ripple::JobQueue::getJobTypeData : valid job type input");
266
267 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
268 // and use something sane.
269 if (c == m_jobData.end())
270 return m_invalidJobData;
271
272 return c->second;
273}
274
275void
277{
278 stopping_ = true;
279 using namespace std::chrono_literals;
280 jobCounter_.join("JobQueue", 1s, m_journal);
281 {
282 // After the JobCounter is joined, all jobs have finished executing
283 // (i.e. returned from `Job::doJob`) and no more are being accepted,
284 // but there may still be some threads between the return of
285 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
286 // we must wait on the condition variable to make these assertions.
288 cv_.wait(
289 lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
290 XRPL_ASSERT(
291 m_processCount == 0,
292 "ripple::JobQueue::stop : all processes completed");
293 XRPL_ASSERT(
294 m_jobSet.empty(), "ripple::JobQueue::stop : all jobs completed");
295 XRPL_ASSERT(
296 nSuspend_ == 0, "ripple::JobQueue::stop : no coros suspended");
297 stopped_ = true;
298 }
299}
300
301bool
303{
304 return stopped_;
305}
306
307void
309{
310 XRPL_ASSERT(
311 !m_jobSet.empty(), "ripple::JobQueue::getNextJob : non-empty jobs");
312
314 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
315 {
316 JobType const type = iter->getType();
317 XRPL_ASSERT(
318 type != jtINVALID, "ripple::JobQueue::getNextJob : valid job type");
319
320 JobTypeData& data(getJobTypeData(type));
321 XRPL_ASSERT(
322 data.running <= getJobLimit(type),
323 "ripple::JobQueue::getNextJob : maximum jobs running");
324
325 // Run this job if we're running below the limit.
326 if (data.running < getJobLimit(data.type()))
327 {
328 XRPL_ASSERT(
329 data.waiting > 0,
330 "ripple::JobQueue::getNextJob : positive data waiting");
331 --data.waiting;
332 ++data.running;
333 break;
334 }
335 }
336
337 XRPL_ASSERT(
338 iter != m_jobSet.end(),
339 "ripple::JobQueue::getNextJob : found next job");
340 job = *iter;
341 m_jobSet.erase(iter);
342}
343
344void
346{
347 XRPL_ASSERT(
348 type != jtINVALID,
349 "ripple::JobQueue::finishJob : valid input job type");
350
351 JobTypeData& data = getJobTypeData(type);
352
353 // Queue a deferred task if possible
354 if (data.deferred > 0)
355 {
356 XRPL_ASSERT(
357 data.running + data.waiting >= getJobLimit(type),
358 "ripple::JobQueue::finishJob : job limit");
359
360 --data.deferred;
362 }
363
364 --data.running;
365}
366
367void
369{
370 JobType type;
371
372 {
373 using namespace std::chrono;
374 Job::clock_type::time_point const start_time(Job::clock_type::now());
375 {
376 Job job;
377 {
379 getNextJob(job);
381 }
382 type = job.getType();
383 JobTypeData& data(getJobTypeData(type));
384 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
385
386 // The amount of time that the job was in the queue
387 auto const q_time =
388 ceil<microseconds>(start_time - job.queue_time());
389 perfLog_.jobStart(type, q_time, start_time, instance);
390
391 job.doJob();
392
393 // The amount of time it took to execute the job
394 auto const x_time =
395 ceil<microseconds>(Job::clock_type::now() - start_time);
396
397 if (x_time >= 10ms || q_time >= 10ms)
398 {
399 getJobTypeData(type).dequeue.notify(q_time);
400 getJobTypeData(type).execute.notify(x_time);
401 }
402 perfLog_.jobFinish(type, x_time, instance);
403 }
404 }
405
406 {
408 // Job should be destroyed before stopping
409 // otherwise destructors with side effects can access
410 // parent objects that are already destroyed.
411 finishJob(type);
412 if (--m_processCount == 0 && m_jobSet.empty())
413 cv_.notify_all();
414 }
415
416 // Note that when Job::~Job is called, the last reference
417 // to the associated LoadEvent object (in the Job) may be destroyed.
418}
419
420int
422{
423 JobTypeInfo const& j(JobTypes::instance().get(type));
424 XRPL_ASSERT(
425 j.type() != jtINVALID,
426 "ripple::JobQueue::getJobLimit : valid job type");
427
428 return j.limit();
429}
430
431} // namespace ripple
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:309
Stream info() const
Definition Journal.h:315
Stream trace() const
Severity stream access functions.
Definition Journal.h:303
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:45
A reference to a handler for performing polled collection.
Definition Hook.h:13
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
JobTypeData m_invalidJobData
Definition JobQueue.h:235
int getJobLimit(JobType type)
Definition JobQueue.cpp:421
std::atomic_bool stopped_
Definition JobQueue.h:233
std::uint64_t m_lastJob
Definition JobQueue.h:229
JobDataMap m_jobData
Definition JobQueue.h:234
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:253
JobCounter jobCounter_
Definition JobQueue.h:231
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:132
bool isStopped() const
Definition JobQueue.cpp:302
perf::PerfLog & perfLog_
Definition JobQueue.h:246
Workers m_workers
Definition JobQueue.h:243
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:260
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:142
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:173
Json::Value getJson(int c=0)
Definition JobQueue.cpp:194
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:247
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:62
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:159
std::set< Job > m_jobSet
Definition JobQueue.h:230
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:122
beast::insight::Hook hook
Definition JobQueue.h:249
beast::Journal m_journal
Definition JobQueue.h:227
std::mutex m_mutex
Definition JobQueue.h:228
beast::insight::Gauge job_count
Definition JobQueue.h:248
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:368
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:9
std::atomic_bool stopping_
Definition JobQueue.h:232
void finishJob(JobType type)
Definition JobQueue.cpp:345
void getNextJob(Job &job)
Definition JobQueue.cpp:308
std::condition_variable cv_
Definition JobQueue.h:251
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:10
JobType type() const
Definition JobTypeInfo.h:45
int limit() const
Definition JobTypeInfo.h:57
static JobTypes const & instance()
Definition JobTypes.h:109
void doJob()
Definition Job.cpp:42
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:36
JobType getType() const
Definition Job.cpp:30
Manages partitions for logging.
Definition Log.h:33
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:34
void addTask()
Add a task to be performed.
Definition Workers.cpp:109
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
virtual void jobQueue(JobType const type)=0
Log queued job.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:15
@ jtINVALID
Definition Job.h:17
@ jtGENERIC
Definition Job.h:67
@ jtCLIENT
Definition Job.h:25
@ jtCLIENT_WEBSOCKET
Definition Job.h:31
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T piecewise_construct
beast::insight::Event execute
Definition JobTypeData.h:33
beast::insight::Event dequeue
Definition JobTypeData.h:32
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:43
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:44