rippled
JobQueue.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/PerfLog.h>
21 #include <ripple/basics/contract.h>
22 #include <ripple/core/JobQueue.h>
23 #include <mutex>
24 
25 namespace ripple {
26 
28  int threadCount,
29  beast::insight::Collector::ptr const& collector,
30  beast::Journal journal,
31  Logs& logs,
32  perf::PerfLog& perfLog)
33  : m_journal(journal)
34  , m_lastJob(0)
35  , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
36  , m_processCount(0)
37  , m_workers(*this, &perfLog, "JobQueue", threadCount)
38  , m_cancelCallback(std::bind(&JobQueue::isStopping, this))
39  , perfLog_(perfLog)
40  , m_collector(collector)
41 {
42  JLOG(m_journal.info()) << "Using " << threadCount << " threads";
43 
44  hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
45  job_count = m_collector->make_gauge("job_count");
46 
47  {
49 
50  for (auto const& x : JobTypes::instance())
51  {
52  JobTypeInfo const& jt = x.second;
53 
54  // And create dynamic information for all jobs
55  auto const result(m_jobData.emplace(
56  std::piecewise_construct,
59  assert(result.second == true);
60  (void)result.second;
61  }
62  }
63 }
64 
66 {
67  // Must unhook before destroying
69 }
70 
71 void
73 {
75  job_count = m_jobSet.size();
76 }
77 
78 bool
80  JobType type,
81  std::string const& name,
82  JobFunction const& func)
83 {
84  assert(type != jtINVALID);
85 
86  auto iter(m_jobData.find(type));
87  assert(iter != m_jobData.end());
88  if (iter == m_jobData.end())
89  return false;
90 
91  JLOG(m_journal.debug())
92  << __func__ << " : Adding job : " << name << " : " << type;
93  JobTypeData& data(iter->second);
94 
95  // FIXME: Workaround incorrect client shutdown ordering
96  // do not add jobs to a queue with no threads
97  assert(
98  (type >= jtCLIENT && type <= jtCLIENT_WEBSOCKET) ||
100 
101  {
102  std::lock_guard lock(m_mutex);
103  auto result = m_jobSet.emplace(
104  type, name, ++m_lastJob, data.load(), func, m_cancelCallback);
105  queueJob(*result.first, lock);
106  }
107  return true;
108 }
109 
110 int
112 {
113  std::lock_guard lock(m_mutex);
114 
115  JobDataMap::const_iterator c = m_jobData.find(t);
116 
117  return (c == m_jobData.end()) ? 0 : c->second.waiting;
118 }
119 
120 int
122 {
123  std::lock_guard lock(m_mutex);
124 
125  JobDataMap::const_iterator c = m_jobData.find(t);
126 
127  return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
128 }
129 
130 int
132 {
133  // return the number of jobs at this priority level or greater
134  int ret = 0;
135 
136  std::lock_guard lock(m_mutex);
137 
138  for (auto const& x : m_jobData)
139  {
140  if (x.first >= t)
141  ret += x.second.waiting;
142  }
143 
144  return ret;
145 }
146 
149 {
150  JobDataMap::iterator iter(m_jobData.find(t));
151  assert(iter != m_jobData.end());
152 
153  if (iter == m_jobData.end())
154  return {};
155 
156  return std::make_unique<LoadEvent>(iter->second.load(), name, true);
157 }
158 
159 void
161 {
162  if (isStopped())
163  LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
164 
165  JobDataMap::iterator iter(m_jobData.find(t));
166  assert(iter != m_jobData.end());
167  iter->second.load().addSamples(count, elapsed);
168 }
169 
170 bool
172 {
173  return std::any_of(m_jobData.begin(), m_jobData.end(), [](auto& entry) {
174  return entry.second.load().isOver();
175  });
176 }
177 
180 {
181  using namespace std::chrono_literals;
183 
184  ret["threads"] = m_workers.getNumberOfThreads();
185 
186  Json::Value priorities = Json::arrayValue;
187 
188  std::lock_guard lock(m_mutex);
189 
190  for (auto& x : m_jobData)
191  {
192  assert(x.first != jtINVALID);
193 
194  if (x.first == jtGENERIC)
195  continue;
196 
197  JobTypeData& data(x.second);
198 
199  LoadMonitor::Stats stats(data.stats());
200 
201  int waiting(data.waiting);
202  int running(data.running);
203 
204  if ((stats.count != 0) || (waiting != 0) ||
205  (stats.latencyPeak != 0ms) || (running != 0))
206  {
207  Json::Value& pri = priorities.append(Json::objectValue);
208 
209  pri["job_type"] = data.name();
210 
211  if (stats.isOverloaded)
212  pri["over_target"] = true;
213 
214  if (waiting != 0)
215  pri["waiting"] = waiting;
216 
217  if (stats.count != 0)
218  pri["per_second"] = static_cast<int>(stats.count);
219 
220  if (stats.latencyPeak != 0ms)
221  pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
222 
223  if (stats.latencyAvg != 0ms)
224  pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
225 
226  if (running != 0)
227  pri["in_progress"] = running;
228  }
229  }
230 
231  ret["job_types"] = priorities;
232 
233  return ret;
234 }
235 
236 void
238 {
240  cv_.wait(lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
241 }
242 
245 {
246  JobDataMap::iterator c(m_jobData.find(type));
247  assert(c != m_jobData.end());
248 
249  // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
250  // and use something sane.
251  if (c == m_jobData.end())
252  return m_invalidJobData;
253 
254  return c->second;
255 }
256 
257 void
259 {
260  stopping_ = true;
261  using namespace std::chrono_literals;
262  jobCounter_.join("JobQueue", 1s, m_journal);
263  {
264  // After the JobCounter is joined, all jobs have finished executing
265  // (i.e. returned from `Job::doJob`) and no more are being accepted,
266  // but there may still be some threads between the return of
267  // `Job::doJob` and the return of `JobQueue::processTask`. That is why
268  // we must wait on the condition variable to make these assertions.
270  cv_.wait(
271  lock, [this] { return m_processCount == 0 && m_jobSet.empty(); });
272  assert(m_processCount == 0);
273  assert(m_jobSet.empty());
274  assert(nSuspend_ == 0);
275  stopped_ = true;
276  }
277 }
278 
279 bool
281 {
282  return stopped_;
283 }
284 
285 void
287 {
288  JobType const type(job.getType());
289  assert(type != jtINVALID);
290  assert(m_jobSet.find(job) != m_jobSet.end());
291  perfLog_.jobQueue(type);
292 
293  JobTypeData& data(getJobTypeData(type));
294 
295  if (data.waiting + data.running < getJobLimit(type))
296  {
297  m_workers.addTask();
298  }
299  else
300  {
301  // defer the task until we go below the limit
302  //
303  ++data.deferred;
304  }
305  ++data.waiting;
306 }
307 
308 void
310 {
311  assert(!m_jobSet.empty());
312 
314  for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
315  {
316  JobTypeData& data(getJobTypeData(iter->getType()));
317 
318  assert(data.running <= getJobLimit(data.type()));
319 
320  // Run this job if we're running below the limit.
321  if (data.running < getJobLimit(data.type()))
322  {
323  assert(data.waiting > 0);
324  break;
325  }
326  }
327 
328  assert(iter != m_jobSet.end());
329 
330  JobType const type = iter->getType();
331  JobTypeData& data(getJobTypeData(type));
332 
333  assert(type != jtINVALID);
334 
335  job = *iter;
336  m_jobSet.erase(iter);
337 
338  --data.waiting;
339  ++data.running;
340 }
341 
342 void
344 {
345  assert(type != jtINVALID);
346 
347  JobTypeData& data = getJobTypeData(type);
348 
349  // Queue a deferred task if possible
350  if (data.deferred > 0)
351  {
352  assert(data.running + data.waiting >= getJobLimit(type));
353 
354  --data.deferred;
355  m_workers.addTask();
356  }
357 
358  --data.running;
359 }
360 
361 void
363 {
364  JobType type;
365 
366  {
367  using namespace std::chrono;
368  Job::clock_type::time_point const start_time(Job::clock_type::now());
369  {
370  Job job;
371  {
372  std::lock_guard lock(m_mutex);
373  getNextJob(job);
374  ++m_processCount;
375  }
376  type = job.getType();
377  JobTypeData& data(getJobTypeData(type));
378  JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
379 
380  // The amount of time that the job was in the queue
381  auto const q_time =
382  ceil<microseconds>(start_time - job.queue_time());
383  perfLog_.jobStart(type, q_time, start_time, instance);
384 
385  job.doJob();
386 
387  // The amount of time it took to execute the job
388  auto const x_time =
389  ceil<microseconds>(Job::clock_type::now() - start_time);
390 
391  if (x_time >= 10ms || q_time >= 10ms)
392  {
393  getJobTypeData(type).dequeue.notify(q_time);
394  getJobTypeData(type).execute.notify(x_time);
395  }
396  perfLog_.jobFinish(type, x_time, instance);
397  }
398  }
399 
400  {
401  std::lock_guard lock(m_mutex);
402  // Job should be destroyed before stopping
403  // otherwise destructors with side effects can access
404  // parent objects that are already destroyed.
405  finishJob(type);
406  if (--m_processCount == 0 && m_jobSet.empty())
407  cv_.notify_all();
408  }
409 
410  // Note that when Job::~Job is called, the last reference
411  // to the associated LoadEvent object (in the Job) may be destroyed.
412 }
413 
414 int
416 {
417  JobTypeInfo const& j(JobTypes::instance().get(type));
418  assert(j.type() != jtINVALID);
419 
420  return j.limit();
421 }
422 
423 } // namespace ripple
ripple::JobQueue::finishJob
void finishJob(JobType type)
Definition: JobQueue.cpp:343
ripple::JobQueue::m_jobSet
std::set< Job > m_jobSet
Definition: JobQueue.h:248
ripple::JobQueue::nSuspend_
int nSuspend_
Definition: JobQueue.h:259
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr< Collector >
ripple::JobQueue::JobQueue
JobQueue(int threadCount, beast::insight::Collector::ptr const &collector, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:27
ripple::jtCLIENT
@ jtCLIENT
Definition: Job.h:44
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::JobTypes
Definition: JobTypes.h:32
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
ripple::JobTypeData::execute
beast::insight::Event execute
Definition: JobTypeData.h:52
std::map::find
T find(T... args)
ripple::Workers::getNumberOfThreads
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
ripple::JobQueue::getJobLimit
int getJobLimit(JobType type)
Definition: JobQueue.cpp:415
std::chrono::milliseconds
ripple::JobQueue::getJson
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:179
ripple::LoadMonitor::Stats::count
std::uint64_t count
Definition: LoadMonitor.h:60
ripple::LoadMonitor::Stats::isOverloaded
bool isOverloaded
Definition: LoadMonitor.h:63
std::map::emplace
T emplace(T... args)
ripple::Job::queue_time
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition: Job.cpp:65
std::lock_guard
STL class.
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:48
ripple::JobQueue::addRefCountedJob
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:79
std::function
std::any_of
T any_of(T... args)
ripple::JobQueue::m_journal
beast::Journal m_journal
Definition: JobQueue.h:245
ripple::JobTypeInfo::limit
int limit() const
Definition: JobTypeInfo.h:76
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:111
ripple::JobQueue::jobCounter_
JobCounter jobCounter_
Definition: JobQueue.h:249
ripple::JobQueue::m_invalidJobData
JobTypeData m_invalidJobData
Definition: JobQueue.h:253
ripple::JobTypeData::dequeue
beast::insight::Event dequeue
Definition: JobTypeData.h:51
ripple::jtCLIENT_WEBSOCKET
@ jtCLIENT_WEBSOCKET
Definition: Job.h:51
ripple::JobQueue::getJobCountTotal
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:121
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
ripple::LoadMonitor::Stats
Definition: LoadMonitor.h:56
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::JobQueue::~JobQueue
~JobQueue()
Definition: JobQueue.cpp:65
ripple::JobQueue::hook
beast::insight::Hook hook
Definition: JobQueue.h:268
ripple::JobQueue::m_mutex
std::mutex m_mutex
Definition: JobQueue.h:246
ripple::JobTypeInfo::type
JobType type() const
Definition: JobTypeInfo.h:64
ripple::JobQueue::m_collector
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:266
ripple::JobQueue::isOverloaded
bool isOverloaded()
Definition: JobQueue.cpp:171
std::unique_lock
STL class.
ripple::JobTypeInfo
Holds all the 'static' information about a job, which does not change.
Definition: JobTypeInfo.h:28
ripple::jtGENERIC
@ jtGENERIC
Definition: Job.h:86
ripple::jtINVALID
@ jtINVALID
Definition: Job.h:36
ripple::JobTypes::instance
static JobTypes const & instance()
Definition: JobTypes.h:124
ripple::JobQueue::processTask
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:362
ripple::JobQueue::m_jobData
JobDataMap m_jobData
Definition: JobQueue.h:252
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::JobTypeData
Definition: JobTypeData.h:29
ripple::Job
Definition: Job.h:94
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::condition_variable::wait
T wait(T... args)
ripple::JobQueue::getJobTypeData
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:244
ripple::JobQueue::m_lastJob
std::uint64_t m_lastJob
Definition: JobQueue.h:247
ripple::perf::PerfLog::jobQueue
virtual void jobQueue(JobType const type)=0
Log queued job.
ripple::perf::PerfLog::jobStart
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
ripple::JobQueue::isStopped
bool isStopped() const
Definition: JobQueue.cpp:280
std::forward_as_tuple
T forward_as_tuple(T... args)
ripple::JobQueue::stopping_
std::atomic_bool stopping_
Definition: JobQueue.h:250
ripple::JobQueue::stopped_
std::atomic_bool stopped_
Definition: JobQueue.h:251
ripple::JobQueue::rendezvous
void rendezvous()
Block until no jobs running.
Definition: JobQueue.cpp:237
ripple::JobQueue::m_cancelCallback
Job::CancelCallback m_cancelCallback
Definition: JobQueue.h:262
ripple::JobQueue
A pool of threads to perform work.
Definition: JobQueue.h:55
ripple::JobQueue::stop
void stop()
Definition: JobQueue.cpp:258
ripple::JobQueue::getJobCountGE
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:131
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::JobQueue::queueJob
void queueJob(Job const &job, std::lock_guard< std::mutex > const &lock)
Definition: JobQueue.cpp:286
std::map::begin
T begin(T... args)
ripple::JobQueue::perfLog_
perf::PerfLog & perfLog_
Definition: JobQueue.h:265
std
STL namespace.
ripple::LogicError
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
Definition: contract.cpp:48
std::chrono::milliseconds::count
T count(T... args)
ripple::Workers::addTask
void addTask()
Add a task to be performed.
Definition: Workers.cpp:126
ripple::ClosureCounter::join
void join(char const *name, std::chrono::milliseconds wait, beast::Journal j)
Returns once all counted in-flight closures are destroyed.
Definition: ClosureCounter.h:166
mutex
ripple::LoadMonitor::Stats::latencyAvg
std::chrono::milliseconds latencyAvg
Definition: LoadMonitor.h:61
beast::Journal::debug
Stream debug() const
Definition: Journal.h:315
ripple::JobType
JobType
Definition: Job.h:34
ripple::JobQueue::job_count
beast::insight::Gauge job_count
Definition: JobQueue.h:267
std::map::end
T end(T... args)
ripple::Job::getType
JobType getType() const
Definition: Job.cpp:52
ripple::JobQueue::m_processCount
int m_processCount
Definition: JobQueue.h:256
ripple::JobQueue::cv_
std::condition_variable cv_
Definition: JobQueue.h:270
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:148
ripple::JobQueue::m_workers
Workers m_workers
Definition: JobQueue.h:261
ripple::JobQueue::addLoadEvents
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:160
ripple::Job::doJob
void doJob()
Definition: Job.cpp:79
std::unique_ptr
STL class.
ripple::LoadMonitor::Stats::latencyPeak
std::chrono::milliseconds latencyPeak
Definition: LoadMonitor.h:62
beast::insight::Hook
A reference to a handler for performing polled collection.
Definition: Hook.h:31
std::condition_variable::notify_all
T notify_all(T... args)
std::set
STL class.
ripple::JobQueue::getNextJob
void getNextJob(Job &job)
Definition: JobQueue.cpp:309
ripple::perf::PerfLog::jobFinish
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
Json::Value
Represents a JSON value.
Definition: json_value.h:145
beast::insight::Event::notify
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition: Event.h:64
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:118
ripple::JobQueue::collect
void collect()
Definition: JobQueue.cpp:72
std::chrono
std::chrono::steady_clock::now
T now(T... args)