rippled
JobQueue.cpp
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #include <ripple/basics/PerfLog.h>
21 #include <ripple/basics/contract.h>
22 #include <ripple/core/JobQueue.h>
23 
24 namespace ripple {
25 
27  beast::insight::Collector::ptr const& collector,
28  Stoppable& parent,
29  beast::Journal journal,
30  Logs& logs,
31  perf::PerfLog& perfLog)
32  : Stoppable("JobQueue", parent)
33  , m_journal(journal)
34  , m_lastJob(0)
35  , m_invalidJobData(JobTypes::instance().getInvalid(), collector, logs)
36  , m_processCount(0)
37  , m_workers(*this, &perfLog, "JobQueue", 0)
38  , m_cancelCallback(std::bind(&Stoppable::isStopping, this))
39  , perfLog_(perfLog)
40  , m_collector(collector)
41 {
42  hook = m_collector->make_hook(std::bind(&JobQueue::collect, this));
43  job_count = m_collector->make_gauge("job_count");
44 
45  {
47 
48  for (auto const& x : JobTypes::instance())
49  {
50  JobTypeInfo const& jt = x.second;
51 
52  // And create dynamic information for all jobs
53  auto const result(m_jobData.emplace(
54  std::piecewise_construct,
57  assert(result.second == true);
58  (void)result.second;
59  }
60  }
61 }
62 
64 {
65  // Must unhook before destroying
67 }
68 
69 void
71 {
73  job_count = m_jobSet.size();
74 }
75 
76 bool
78  JobType type,
79  std::string const& name,
80  JobFunction const& func)
81 {
82  assert(type != jtINVALID);
83 
84  auto iter(m_jobData.find(type));
85  assert(iter != m_jobData.end());
86  if (iter == m_jobData.end())
87  return false;
88 
89  JobTypeData& data(iter->second);
90 
91  // FIXME: Workaround incorrect client shutdown ordering
92  // do not add jobs to a queue with no threads
93  assert(type == jtCLIENT || m_workers.getNumberOfThreads() > 0);
94 
95  {
97 
98  // If this goes off it means that a child didn't follow
99  // the Stoppable API rules. A job may only be added if:
100  //
101  // - The JobQueue has NOT stopped
102  // AND
103  // * We are currently processing jobs
104  // OR
105  // * We have have pending jobs
106  // OR
107  // * Not all children are stopped
108  //
109  assert(
110  !isStopped() &&
111  (m_processCount > 0 || !m_jobSet.empty() || !areChildrenStopped()));
112 
113  std::pair<std::set<Job>::iterator, bool> result(m_jobSet.insert(
114  Job(type, name, ++m_lastJob, data.load(), func, m_cancelCallback)));
115  queueJob(*result.first, lock);
116  }
117  return true;
118 }
119 
120 int
122 {
123  std::lock_guard lock(m_mutex);
124 
125  JobDataMap::const_iterator c = m_jobData.find(t);
126 
127  return (c == m_jobData.end()) ? 0 : c->second.waiting;
128 }
129 
130 int
132 {
133  std::lock_guard lock(m_mutex);
134 
135  JobDataMap::const_iterator c = m_jobData.find(t);
136 
137  return (c == m_jobData.end()) ? 0 : (c->second.waiting + c->second.running);
138 }
139 
140 int
142 {
143  // return the number of jobs at this priority level or greater
144  int ret = 0;
145 
146  std::lock_guard lock(m_mutex);
147 
148  for (auto const& x : m_jobData)
149  {
150  if (x.first >= t)
151  ret += x.second.waiting;
152  }
153 
154  return ret;
155 }
156 
157 void
158 JobQueue::setThreadCount(int c, bool const standaloneMode)
159 {
160  if (standaloneMode)
161  {
162  c = 1;
163  }
164  else if (c == 0)
165  {
166  c = static_cast<int>(std::thread::hardware_concurrency());
167  c = 2 + std::min(c, 4); // I/O will bottleneck
168  JLOG(m_journal.info()) << "Auto-tuning to " << c
169  << " validation/transaction/proposal threads.";
170  }
171  else
172  {
173  JLOG(m_journal.info()) << "Configured " << c
174  << " validation/transaction/proposal threads.";
175  }
176 
178 }
179 
182 {
183  JobDataMap::iterator iter(m_jobData.find(t));
184  assert(iter != m_jobData.end());
185 
186  if (iter == m_jobData.end())
187  return {};
188 
189  return std::make_unique<LoadEvent>(iter->second.load(), name, true);
190 }
191 
192 void
194 {
195  if (isStopped())
196  LogicError("JobQueue::addLoadEvents() called after JobQueue stopped");
197 
198  JobDataMap::iterator iter(m_jobData.find(t));
199  assert(iter != m_jobData.end());
200  iter->second.load().addSamples(count, elapsed);
201 }
202 
203 bool
205 {
206  int count = 0;
207 
208  for (auto& x : m_jobData)
209  {
210  if (x.second.load().isOver())
211  ++count;
212  }
213 
214  return count > 0;
215 }
216 
219 {
220  using namespace std::chrono_literals;
222 
223  ret["threads"] = m_workers.getNumberOfThreads();
224 
225  Json::Value priorities = Json::arrayValue;
226 
227  std::lock_guard lock(m_mutex);
228 
229  for (auto& x : m_jobData)
230  {
231  assert(x.first != jtINVALID);
232 
233  if (x.first == jtGENERIC)
234  continue;
235 
236  JobTypeData& data(x.second);
237 
238  LoadMonitor::Stats stats(data.stats());
239 
240  int waiting(data.waiting);
241  int running(data.running);
242 
243  if ((stats.count != 0) || (waiting != 0) ||
244  (stats.latencyPeak != 0ms) || (running != 0))
245  {
246  Json::Value& pri = priorities.append(Json::objectValue);
247 
248  pri["job_type"] = data.name();
249 
250  if (stats.isOverloaded)
251  pri["over_target"] = true;
252 
253  if (waiting != 0)
254  pri["waiting"] = waiting;
255 
256  if (stats.count != 0)
257  pri["per_second"] = static_cast<int>(stats.count);
258 
259  if (stats.latencyPeak != 0ms)
260  pri["peak_time"] = static_cast<int>(stats.latencyPeak.count());
261 
262  if (stats.latencyAvg != 0ms)
263  pri["avg_time"] = static_cast<int>(stats.latencyAvg.count());
264 
265  if (running != 0)
266  pri["in_progress"] = running;
267  }
268  }
269 
270  ret["job_types"] = priorities;
271 
272  return ret;
273 }
274 
275 void
277 {
279  cv_.wait(lock, [&] { return m_processCount == 0 && m_jobSet.empty(); });
280 }
281 
284 {
285  JobDataMap::iterator c(m_jobData.find(type));
286  assert(c != m_jobData.end());
287 
288  // NIKB: This is ugly and I hate it. We must remove jtINVALID completely
289  // and use something sane.
290  if (c == m_jobData.end())
291  return m_invalidJobData;
292 
293  return c->second;
294 }
295 
296 void
298 {
299  // onStop must be defined and empty here,
300  // otherwise the base class will do the wrong thing.
301 }
302 
303 void
305 {
306  // We are stopped when all of the following are true:
307  //
308  // 1. A stop notification was received
309  // 2. All Stoppable children have stopped
310  // 3. There are no executing calls to processTask
311  // 4. There are no remaining Jobs in the job set
312  // 5. There are no suspended coroutines
313  //
314  if (isStopping() && areChildrenStopped() && (m_processCount == 0) &&
315  m_jobSet.empty() && nSuspend_ == 0)
316  {
317  stopped();
318  }
319 }
320 
321 void
323 {
324  JobType const type(job.getType());
325  assert(type != jtINVALID);
326  assert(m_jobSet.find(job) != m_jobSet.end());
327  perfLog_.jobQueue(type);
328 
329  JobTypeData& data(getJobTypeData(type));
330 
331  if (data.waiting + data.running < getJobLimit(type))
332  {
333  m_workers.addTask();
334  }
335  else
336  {
337  // defer the task until we go below the limit
338  //
339  ++data.deferred;
340  }
341  ++data.waiting;
342 }
343 
344 void
346 {
347  assert(!m_jobSet.empty());
348 
350  for (iter = m_jobSet.begin(); iter != m_jobSet.end(); ++iter)
351  {
352  JobTypeData& data(getJobTypeData(iter->getType()));
353 
354  assert(data.running <= getJobLimit(data.type()));
355 
356  // Run this job if we're running below the limit.
357  if (data.running < getJobLimit(data.type()))
358  {
359  assert(data.waiting > 0);
360  break;
361  }
362  }
363 
364  assert(iter != m_jobSet.end());
365 
366  JobType const type = iter->getType();
367  JobTypeData& data(getJobTypeData(type));
368 
369  assert(type != jtINVALID);
370 
371  job = *iter;
372  m_jobSet.erase(iter);
373 
374  --data.waiting;
375  ++data.running;
376 }
377 
378 void
380 {
381  assert(type != jtINVALID);
382 
383  JobTypeData& data = getJobTypeData(type);
384 
385  // Queue a deferred task if possible
386  if (data.deferred > 0)
387  {
388  assert(data.running + data.waiting >= getJobLimit(type));
389 
390  --data.deferred;
391  m_workers.addTask();
392  }
393 
394  --data.running;
395 }
396 
397 void
399 {
400  JobType type;
401 
402  {
403  using namespace std::chrono;
404  Job::clock_type::time_point const start_time(Job::clock_type::now());
405  {
406  Job job;
407  {
408  std::lock_guard lock(m_mutex);
409  getNextJob(job);
410  ++m_processCount;
411  }
412  type = job.getType();
413  JobTypeData& data(getJobTypeData(type));
414  JLOG(m_journal.trace()) << "Doing " << data.name() << "job";
415 
416  // The amount of time that the job was in the queue
417  auto const q_time =
418  date::ceil<microseconds>(start_time - job.queue_time());
419  perfLog_.jobStart(type, q_time, start_time, instance);
420 
421  job.doJob();
422 
423  // The amount of time it took to execute the job
424  auto const x_time =
425  date::ceil<microseconds>(Job::clock_type::now() - start_time);
426 
427  if (x_time >= 10ms || q_time >= 10ms)
428  {
429  getJobTypeData(type).dequeue.notify(q_time);
430  getJobTypeData(type).execute.notify(x_time);
431  }
432  perfLog_.jobFinish(type, x_time, instance);
433  }
434  }
435 
436  {
437  std::lock_guard lock(m_mutex);
438  // Job should be destroyed before calling checkStopped
439  // otherwise destructors with side effects can access
440  // parent objects that are already destroyed.
441  finishJob(type);
442  if (--m_processCount == 0 && m_jobSet.empty())
443  cv_.notify_all();
444  checkStopped(lock);
445  }
446 
447  // Note that when Job::~Job is called, the last reference
448  // to the associated LoadEvent object (in the Job) may be destroyed.
449 }
450 
451 int
453 {
454  JobTypeInfo const& j(JobTypes::instance().get(type));
455  assert(j.type() != jtINVALID);
456 
457  return j.limit();
458 }
459 
460 void
462 {
463  std::lock_guard lock(m_mutex);
464  checkStopped(lock);
465 }
466 
467 } // namespace ripple
ripple::JobQueue::finishJob
void finishJob(JobType type)
Definition: JobQueue.cpp:379
ripple::JobQueue::m_jobSet
std::set< Job > m_jobSet
Definition: JobQueue.h:239
ripple::JobQueue::nSuspend_
int nSuspend_
Definition: JobQueue.h:247
std::bind
T bind(T... args)
std::string
STL class.
std::shared_ptr< Collector >
ripple::jtCLIENT
@ jtCLIENT
Definition: Job.h:48
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
ripple::Stoppable::stopped
void stopped()
Called by derived classes to indicate that the stoppable has stopped.
Definition: Stoppable.cpp:72
beast::Journal::trace
Stream trace() const
Severity stream access functions.
Definition: Journal.h:309
ripple::JobTypes
Definition: JobTypes.h:32
Json::arrayValue
@ arrayValue
array value (ordered list)
Definition: json_value.h:42
std::pair
ripple::JobTypeData::execute
beast::insight::Event execute
Definition: JobTypeData.h:52
std::map::find
T find(T... args)
ripple::Workers::getNumberOfThreads
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
ripple::JobQueue::JobQueue
JobQueue(beast::insight::Collector::ptr const &collector, Stoppable &parent, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:26
ripple::JobQueue::getJobLimit
int getJobLimit(JobType type)
Definition: JobQueue.cpp:452
std::chrono::milliseconds
ripple::JobQueue::getJson
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:218
ripple::LoadMonitor::Stats::count
std::uint64_t count
Definition: LoadMonitor.h:60
ripple::LoadMonitor::Stats::isOverloaded
bool isOverloaded
Definition: LoadMonitor.h:63
ripple::JobQueue::checkStopped
void checkStopped(std::lock_guard< std::mutex > const &lock)
Definition: JobQueue.cpp:304
std::map::emplace
T emplace(T... args)
ripple::Workers::setNumberOfThreads
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition: Workers.cpp:63
ripple::Job::queue_time
clock_type::time_point const & queue_time() const
Returns the time when the job was queued.
Definition: Job.cpp:65
std::lock_guard
STL class.
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:46
ripple::JobQueue::addRefCountedJob
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:77
std::function
ripple::JobQueue::m_journal
beast::Journal m_journal
Definition: JobQueue.h:236
ripple::JobTypeInfo::limit
int limit() const
Definition: JobTypeInfo.h:77
ripple::JobQueue::onStop
void onStop() override
Override called when the stop notification is issued.
Definition: JobQueue.cpp:297
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:121
ripple::JobQueue::m_invalidJobData
JobTypeData m_invalidJobData
Definition: JobQueue.h:241
ripple::JobTypeData::dequeue
beast::insight::Event dequeue
Definition: JobTypeData.h:51
ripple::JobQueue::getJobCountTotal
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:131
ripple::Stoppable::isStopped
bool isStopped() const
Returns true if the requested stop has completed.
Definition: Stoppable.cpp:60
Json::Value::append
Value & append(const Value &value)
Append value to array at the end.
Definition: json_value.cpp:882
ripple::LoadMonitor::Stats
Definition: LoadMonitor.h:56
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:201
Json::objectValue
@ objectValue
object value (collection of name/value pairs).
Definition: json_value.h:43
ripple::JobQueue::~JobQueue
~JobQueue()
Definition: JobQueue.cpp:63
ripple::JobQueue::hook
beast::insight::Hook hook
Definition: JobQueue.h:256
std::thread::hardware_concurrency
T hardware_concurrency(T... args)
ripple::JobQueue::m_mutex
std::mutex m_mutex
Definition: JobQueue.h:237
ripple::JobTypeInfo::type
JobType type() const
Definition: JobTypeInfo.h:65
ripple::JobQueue::m_collector
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:254
ripple::JobQueue::isOverloaded
bool isOverloaded()
Definition: JobQueue.cpp:204
std::unique_lock
STL class.
ripple::JobTypeInfo
Holds all the 'static' information about a job, which does not change.
Definition: JobTypeInfo.h:28
ripple::jtGENERIC
@ jtGENERIC
Definition: Job.h:74
ripple::Stoppable::areChildrenStopped
bool areChildrenStopped() const
Returns true if all children have stopped.
Definition: Stoppable.cpp:66
ripple::jtINVALID
@ jtINVALID
Definition: Job.h:35
ripple::JobTypes::instance
static JobTypes const & instance()
Definition: JobTypes.h:101
ripple::JobQueue::processTask
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:398
ripple::JobQueue::onChildrenStopped
void onChildrenStopped() override
Override called when all children have stopped.
Definition: JobQueue.cpp:461
ripple::JobQueue::m_jobData
JobDataMap m_jobData
Definition: JobQueue.h:240
beast::Journal::info
Stream info() const
Definition: Journal.h:321
ripple::JobTypeData
Definition: JobTypeData.h:29
ripple::Job
Definition: Job.h:82
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::condition_variable::wait
T wait(T... args)
ripple::JobQueue::getJobTypeData
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:283
ripple::JobQueue::m_lastJob
std::uint64_t m_lastJob
Definition: JobQueue.h:238
ripple::perf::PerfLog::jobQueue
virtual void jobQueue(JobType const type)=0
Log queued job.
ripple::perf::PerfLog::jobStart
virtual void jobStart(JobType const type, microseconds dur, steady_time_point startTime, int instance)=0
Log job executing.
std::forward_as_tuple
T forward_as_tuple(T... args)
ripple::JobQueue::rendezvous
void rendezvous()
Block until no tasks running.
Definition: JobQueue.cpp:276
ripple::JobQueue::m_cancelCallback
Job::CancelCallback m_cancelCallback
Definition: JobQueue.h:250
std::min
T min(T... args)
ripple::JobQueue::getJobCountGE
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:141
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::JobQueue::queueJob
void queueJob(Job const &job, std::lock_guard< std::mutex > const &lock)
Definition: JobQueue.cpp:322
ripple::JobQueue::perfLog_
perf::PerfLog & perfLog_
Definition: JobQueue.h:253
std
STL namespace.
ripple::LogicError
void LogicError(std::string const &how) noexcept
Called when faulty logic causes a broken invariant.
Definition: contract.cpp:48
ripple::JobQueue::setThreadCount
void setThreadCount(int c, bool const standaloneMode)
Set the number of thread serving the job queue to precisely this number.
Definition: JobQueue.cpp:158
std::chrono::milliseconds::count
T count(T... args)
ripple::Workers::addTask
void addTask()
Add a task to be performed.
Definition: Workers.cpp:126
ripple::LoadMonitor::Stats::latencyAvg
std::chrono::milliseconds latencyAvg
Definition: LoadMonitor.h:61
ripple::JobType
JobType
Definition: Job.h:33
ripple::JobQueue::job_count
beast::insight::Gauge job_count
Definition: JobQueue.h:255
std::map::end
T end(T... args)
ripple::Job::getType
JobType getType() const
Definition: Job.cpp:52
ripple::JobQueue::m_processCount
int m_processCount
Definition: JobQueue.h:244
ripple::JobQueue::cv_
std::condition_variable cv_
Definition: JobQueue.h:258
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:181
ripple::JobQueue::m_workers
Workers m_workers
Definition: JobQueue.h:249
ripple::JobQueue::addLoadEvents
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:193
ripple::Job::doJob
void doJob()
Definition: Job.cpp:79
std::unique_ptr
STL class.
ripple::LoadMonitor::Stats::latencyPeak
std::chrono::milliseconds latencyPeak
Definition: LoadMonitor.h:62
beast::insight::Hook
A reference to a handler for performing polled collection.
Definition: Hook.h:31
std::condition_variable::notify_all
T notify_all(T... args)
std::set
STL class.
ripple::JobQueue::getNextJob
void getNextJob(Job &job)
Definition: JobQueue.cpp:345
ripple::perf::PerfLog::jobFinish
virtual void jobFinish(JobType const type, microseconds dur, int instance)=0
Log job finishing.
Json::Value
Represents a JSON value.
Definition: json_value.h:145
beast::insight::Event::notify
void notify(std::chrono::duration< Rep, Period > const &value) const
Push an event notification.
Definition: Event.h:66
ripple::get
T & get(EitherAmount &amt)
Definition: AmountSpec.h:116
ripple::JobQueue::collect
void collect()
Definition: JobQueue.cpp:70
ripple::Stoppable::isStopping
bool isStopping() const
Returns true if the stoppable should stop.
Definition: Stoppable.cpp:54
std::chrono
std::chrono::steady_clock::now
T now(T... args)