rippled
Loading...
Searching...
No Matches
JobQueue.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/core/JobQueue.h>
3#include <xrpl/core/PerfLog.h>
4
5#include <mutex>
6
7namespace xrpl {
8
10 int threadCount,
11 beast::insight::Collector::ptr const& collector,
12 beast::Journal journal,
13 Logs& logs,
14 perf::PerfLog& perfLog)
15 : m_journal(journal)
16 , m_lastJob(0)
17 , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
18 , m_processCount(0)
19 , m_workers(*this, &perfLog, "JobQueue", threadCount)
20 , perfLog_(perfLog)
21 , m_collector(collector)
22{
23 JLOG(m_journal.info()) << "Using " << threadCount << " threads";
24
25 hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
26 job_count = m_collector->make_gauge("job_count");
27
28 {
30
31 for (auto const& x : JobTypes::instance())
32 {
33 JobTypeInfo const& jt = x.second;
34
35 // And create dynamic information for all jobs
36 auto const result(m_jobData.emplace(
40 XRPL_ASSERT(result.second == true, "xrpl::JobQueue::JobQueue : jobs added");
41 (void)result.second;
42 }
43 }
44}
45
47{
48 // Must unhook before destroying
50}
51
52void
58
59bool
61{
62 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::addRefCountedJob : valid input job type");
63
64 auto iter(m_jobData.find(type));
65 XRPL_ASSERT(iter != m_jobData.end(), "xrpl::JobQueue::addRefCountedJob : job type found in jobs");
66 if (iter == m_jobData.end())
67 return false;
68
69 JLOG(m_journal.debug()) << __func__ << " : Adding job : " << name << " : " << type;
70 JobTypeData& data(iter->second);
71
72 // FIXME: Workaround incorrect client shutdown ordering
73 // do not add jobs to a queue with no threads
74 XRPL_ASSERT(
75 (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) || m_workers.getNumberOfThreads() > 0,
76 "xrpl::JobQueue::addRefCountedJob : threads available or job "
77 "requires no threads");
78
79 {
81 auto result = m_jobSet.emplace(type, name, ++m_lastJob, data.load(), func);
82 auto const& job = *result.first;
83
84 JobType const type(job.getType());
85 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::addRefCountedJob : has valid job type");
86 XRPL_ASSERT(m_jobSet.find(job) != m_jobSet.end(), "xrpl::JobQueue::addRefCountedJob : job found");
87 perfLog_.jobQueue(type);
88
89 JobTypeData& data(getJobTypeData(type));
90
91 if (data.waiting + data.running < getJobLimit(type))
92 {
94 }
95 else
96 {
97 // defer the task until we go below the limit
98 ++data.deferred;
99 }
100 ++data.waiting;
101 }
102 return true;
103}
104
105int
107{
109
110 JobDataMap::const_iterator c = m_jobData.find(t);
111
112 return (c == m_jobData.end()) ? 0 : c->second.waiting;
113}
114
115int
117{
119
120 JobDataMap::const_iterator c = m_jobData.find(t);
121
122 return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
123}
124
125int
127{
128 // return the number of jobs at this priority level or greater
129 int ret = 0;
130
132
133 for (auto const& x : m_jobData)
134 {
135 if (x.first >= t)
136 ret += x.second.waiting;
137 }
138
139 return ret;
140}
141
144{
145 JobDataMap::iterator iter(m_jobData.find(t));
146 XRPL_ASSERT(iter != m_jobData.end(), "xrpl::JobQueue::makeLoadEvent : valid job type input");
147
148 if (iter == m_jobData.end())
149 return {};
150
151 return std::make_unique<LoadEvent>(iter->second.load(), name, true);
152}
153
154void
156{
157 if (isStopped())
158 LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
159
160 JobDataMap::iterator iter(m_jobData.find(t));
161 XRPL_ASSERT(iter != m_jobData.end(), "xrpl::JobQueue::addLoadEvents : valid job type input");
162 iter->second.load().addSamples(count, elapsed);
163}
164
165bool
167{
168 return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) { return entry.second.load().isOver(); });
169}
170
173{
174 using namespace std::chrono_literals;
176
177 ret["threads"] = m_workers.getNumberOfThreads();
178
179 Json::Value priorities = Json::arrayValue;
180
182
183 for (auto& x : m_jobData)
184 {
185 XRPL_ASSERT(x.first != jtINVALID, "xrpl::JobQueue::getJson : valid job type");
186
187 if (x.first == jtGENERIC)
188 continue;
189
190 JobTypeData& data(x.second);
191
192 LoadMonitor::Stats stats(data.stats());
193
194 int waiting(data.waiting);
195 int running(data.running);
196
197 if ((stats.count != 0) || (waiting != 0) || (stats.latencyPeak != 0ms) || (running != 0))
198 {
199 Json::Value& pri = priorities.append(Json::objectValue);
200
201 pri["job_type"] = data.name();
202
203 if (stats.isOverloaded)
204 pri["over_target"] = true;
205
206 if (waiting != 0)
207 pri["waiting"] = waiting;
208
209 if (stats.count != 0)
210 pri["per_second"] = static_cast<int>(stats.count);
211
212 if (stats.latencyPeak != 0ms)
213 pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
214
215 if (stats.latencyAvg != 0ms)
216 pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
217
218 if (running != 0)
219 pri["in_progress"] = running;
220 }
221 }
222
223 ret["job_types"] = priorities;
224
225 return ret;
226}
227
228void
230{
232 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
233}
234
237{
238 JobDataMap::iterator c(m_jobData.find(type));
239 XRPL_ASSERT(c != m_jobData.end(), "xrpl::JobQueue::getJobTypeData : valid job type input");
240
241 // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
242 // and use something sane.
243 if (c == m_jobData.end())
244 return m_invalidJobData;
245
246 return c->second;
247}
248
249void
251{
252 stopping_ = true;
253 using namespace std::chrono_literals;
254 jobCounter_.join("JobQueue", 1s, m_journal);
255 {
256 // After the JobCounter is joined, all jobs have finished executing
257 // (i.e. returned from `Job::doJob`) and no more are being accepted,
258 // but there may still be some threads between the return of
259 // `Job::doJob` and the return of `JobQueue::processTask`. That is why
260 // we must wait on the condition variable to make these assertions.
262 cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
263 XRPL_ASSERT(m_processCount == 0, "xrpl::JobQueue::stop : all processes completed");
264 XRPL_ASSERT(m_jobSet.empty(), "xrpl::JobQueue::stop : all jobs completed");
265 XRPL_ASSERT(nSuspend_ == 0, "xrpl::JobQueue::stop : no coros suspended");
266 stopped_ = true;
267 }
268}
269
270bool
272{
273 return stopped_;
274}
275
276void
278{
279 XRPL_ASSERT(!m_jobSet.empty(), "xrpl::JobQueue::getNextJob : non-empty jobs");
280
282 for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
283 {
284 JobType const type = iter->getType();
285 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::getNextJob : valid job type");
286
287 JobTypeData& data(getJobTypeData(type));
288 XRPL_ASSERT(data.running <= getJobLimit(type), "xrpl::JobQueue::getNextJob : maximum jobs running");
289
290 // Run this job if we're running below the limit.
291 if (data.running < getJobLimit(data.type()))
292 {
293 XRPL_ASSERT(data.waiting > 0, "xrpl::JobQueue::getNextJob : positive data waiting");
294 --data.waiting;
295 ++data.running;
296 break;
297 }
298 }
299
300 XRPL_ASSERT(iter != m_jobSet.end(), "xrpl::JobQueue::getNextJob : found next job");
301 job = *iter;
302 m_jobSet.erase(iter);
303}
304
305void
307{
308 XRPL_ASSERT(type != jtINVALID, "xrpl::JobQueue::finishJob : valid input job type");
309
310 JobTypeData& data = getJobTypeData(type);
311
312 // Queue a deferred task if possible
313 if (data.deferred > 0)
314 {
315 XRPL_ASSERT(data.running + data.waiting >= getJobLimit(type), "xrpl::JobQueue::finishJob : job limit");
316
317 --data.deferred;
319 }
320
321 --data.running;
322}
323
324void
326{
327 JobType type;
328
329 {
330 using namespace std::chrono;
331 Job::clock_type::time_point const start_time(Job::clock_type::now());
332 {
333 Job job;
334 {
336 getNextJob(job);
338 }
339 type = job.getType();
340 JobTypeData& data(getJobTypeData(type));
341 JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
342
343 // The amount of time that the job was in the queue
344 auto const q_time = ceil<microseconds>(start_time - job.queue_time());
345 perfLog_.jobStart(type, q_time, start_time, instance);
346
347 job.doJob();
348
349 // The amount of time it took to execute the job
350 auto const x_time = ceil<microseconds>(Job::clock_type::now() - start_time);
351
352 if (x_time >= 10ms || q_time >= 10ms)
353 {
354 getJobTypeData(type).dequeue.notify(q_time);
355 getJobTypeData(type).execute.notify(x_time);
356 }
357 perfLog_.jobFinish(type, x_time, instance);
358 }
359 }
360
361 {
363 // Job should be destroyed before stopping
364 // otherwise destructors with side effects can access
365 // parent objects that are already destroyed.
366 finishJob(type);
367 if (--m_processCount == 0 && m_jobSet.empty())
368 cv_.notify_all();
369 }
370
371 // Note that when Job::~Job is called, the last reference
372 // to the associated LoadEvent object (in the Job) may be destroyed.
373}
374
375int
377{
378 JobTypeInfo const& j(JobTypes::instance().get(type));
379 XRPL_ASSERT(j.type() != jtINVALID, "xrpl::JobQueue::getJobLimit : valid job type");
380
381 return j.limit();
382}
383
384} // namespace xrpl
T any_of(T... args)
T begin(T... args)
T bind(T... args)
Represents a JSON value.
Definition json_value.h:131
Value & append(Value const &value)
Append value to array at the end.
A generic endpoint for log messages.
Definition Journal.h:41
Stream debug() const
Definition Journal.h:301
Stream info() const
Definition Journal.h:307
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition Event.h:45
A reference to a handler for performing polled collection.
Definition Hook.h:13
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:126
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:325
beast::Journal m_journal
Definition JobQueue.h:224
std::mutex m_mutex
Definition JobQueue.h:225
JobCounter jobCounter_
Definition JobQueue.h:228
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:236
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:116
JobTypeData m_invalidJobData
Definition JobQueue.h:232
beast::insight::Gauge job_count
Definition JobQueue.h:245
bool isStopped() const
Definition JobQueue.cpp:271
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:229
Workers m_workers
Definition JobQueue.h:240
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition JobQueue.cpp:9
std::atomic_bool stopping_
Definition JobQueue.h:229
JobDataMap m_jobData
Definition JobQueue.h:231
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:106
std::set< Job > m_jobSet
Definition JobQueue.h:227
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:60
bool isOverloaded()
Definition JobQueue.cpp:166
std::atomic_bool stopped_
Definition JobQueue.h:230
std::condition_variable cv_
Definition JobQueue.h:248
int getJobLimit(JobType type)
Definition JobQueue.cpp:376
std::uint64_t m_lastJob
Definition JobQueue.h:226
beast::insight::Hook hook
Definition JobQueue.h:246
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:155
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:143
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:244
perf::PerfLog & perfLog_
Definition JobQueue.h:243
void finishJob(JobType type)
Definition JobQueue.cpp:306
Json::Value getJson(int c=0)
Definition JobQueue.cpp:172
void getNextJob(Job &job)
Definition JobQueue.cpp:277
Holds all the 'static' information about a job, which does not change.
Definition JobTypeInfo.h:10
int limit() const
Definition JobTypeInfo.h:53
JobType type() const
Definition JobTypeInfo.h:41
static JobTypes const & instance()
Definition JobTypes.h:98
JobType getType() const
Definition Job.cpp:21
void doJob()
Definition Job.cpp:33
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition Job.cpp:27
Manages partitions for logging.
Definition Log.h:33
void addTask()
Add a task to be performed.
Definition Workers.cpp:103
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition Workers.cpp:30
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:32
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
virtual void jobQueue(JobType const type)=0
Log queued job.
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
T emplace(T... args)
T end(T... args)
T erase(T... args)
T find(T... args)
T forward_as_tuple(T... args)
T is_same_v
@ arrayValue
array value (ordered list)
Definition json_value.h:26
@ objectValue
object value (collection of name/value pairs).
Definition json_value.h:27
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
JobType
Definition Job.h:15
@ jtINVALID
Definition Job.h:17
@ jtCLIENT_WEBSOCKET
Definition Job.h:31
@ jtCLIENT
Definition Job.h:25
@ jtGENERIC
Definition Job.h:67
T piecewise_construct
beast::insight::Event dequeue
Definition JobTypeData.h:32
beast::insight::Event execute
Definition JobTypeData.h:33
std::chrono::milliseconds latencyPeak
Definition LoadMonitor.h:42
std::chrono::milliseconds latencyAvg
Definition LoadMonitor.h:41