rippled
JobQueue.h
1 //------------------------------------------------------------------------------
2 /*
3  This file is part of rippled: https://github.com/ripple/rippled
4  Copyright (c) 2012, 2013 Ripple Labs Inc.
5 
6  Permission to use, copy, modify, and/or distribute this software for any
7  purpose with or without fee is hereby granted, provided that the above
8  copyright notice and this permission notice appear in all copies.
9 
10  THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11  WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12  MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13  ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14  WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15  ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16  OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17 */
18 //==============================================================================
19 
20 #ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
21 #define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
22 
23 #include <ripple/basics/LocalValue.h>
24 #include <ripple/core/JobTypeData.h>
25 #include <ripple/core/JobTypes.h>
26 #include <ripple/core/Stoppable.h>
27 #include <ripple/core/impl/Workers.h>
28 #include <ripple/json/json_value.h>
29 #include <boost/coroutine/all.hpp>
30 #include <boost/range/begin.hpp> // workaround for boost 1.72 bug
31 #include <boost/range/end.hpp> // workaround for boost 1.72 bug
32 
33 namespace ripple {
34 
35 namespace perf {
36 class PerfLog;
37 }
38 
39 class Logs;
41 {
42  explicit Coro_create_t() = default;
43 };
44 
55 class JobQueue : public Stoppable, private Workers::Callback
56 {
57 public:
59  class Coro : public std::enable_shared_from_this<Coro>
60  {
61  private:
66  bool running_;
70  boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
71  boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
72 #ifndef NDEBUG
73  bool finished_ = false;
74 #endif
75 
76  public:
77  // Private: Used in the implementation
78  template <class F>
80 
81  // Not copy-constructible or assignable
82  Coro(Coro const&) = delete;
83  Coro&
84  operator=(Coro const&) = delete;
85 
86  ~Coro();
87 
97  void
98  yield() const;
99 
113  bool
114  post();
115 
125  void
126  resume();
127 
129  bool
130  runnable() const;
131 
133  void
134  expectEarlyExit();
135 
137  void
138  join();
139  };
140 
141  using JobFunction = std::function<void(Job&)>;
142 
143  JobQueue(
144  beast::insight::Collector::ptr const& collector,
145  Stoppable& parent,
146  beast::Journal journal,
147  Logs& logs,
148  perf::PerfLog& perfLog);
149  ~JobQueue();
150 
160  template <
161  typename JobHandler,
162  typename = std::enable_if_t<std::is_same<
163  decltype(std::declval<JobHandler&&>()(std::declval<Job&>())),
164  void>::value>>
165  bool
166  addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
167  {
168  if (auto optionalCountedJob = Stoppable::jobCounter().wrap(
169  std::forward<JobHandler>(jobHandler)))
170  {
171  return addRefCountedJob(type, name, std::move(*optionalCountedJob));
172  }
173  return false;
174  }
175 
185  template <class F>
187  postCoro(JobType t, std::string const& name, F&& f);
188 
191  int
192  getJobCount(JobType t) const;
193 
196  int
197  getJobCountTotal(JobType t) const;
198 
201  int
202  getJobCountGE(JobType t) const;
203 
206  void
207  setThreadCount(int c, bool const standaloneMode);
208 
212  makeLoadEvent(JobType t, std::string const& name);
213 
216  void
217  addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
218 
219  // Cannot be const because LoadMonitor has no const methods.
220  bool
221  isOverloaded();
222 
223  // Cannot be const because LoadMonitor has no const methods.
225  getJson(int c = 0);
226 
228  void
229  rendezvous();
230 
231 private:
232  friend class Coro;
233 
235 
242 
243  // The number of jobs currently in processTask()
245 
246  // The number of suspended coroutines
247  int nSuspend_ = 0;
248 
251 
252  // Statistics tracking
257 
259 
260  void
261  collect();
262  JobTypeData&
263  getJobTypeData(JobType type);
264 
265  void
266  onStop() override;
267 
268  // Signals the service stopped if the stopped condition is met.
269  void
271 
272  // Adds a reference counted job to the JobQueue.
273  //
274  // param type The type of job.
275  // param name Name of the job.
276  // param func std::function with signature void (Job&). Called when the
277  // job is executed.
278  //
279  // return true if func added to queue.
280  bool
282  JobType type,
283  std::string const& name,
284  JobFunction const& func);
285 
286  // Signals an added Job for processing.
287  //
288  // Pre-conditions:
289  // The JobType must be valid.
290  // The Job must exist in mJobSet.
291  // The Job must not have previously been queued.
292  //
293  // Post-conditions:
294  // Count of waiting jobs of that type will be incremented.
295  // If JobQueue exists, and has at least one thread, Job will eventually
296  // run.
297  //
298  // Invariants:
299  // The calling thread owns the JobLock
300  void
301  queueJob(Job const& job, std::lock_guard<std::mutex> const& lock);
302 
303  // Returns the next Job we should run now.
304  //
305  // RunnableJob:
306  // A Job in the JobSet whose slots count for its type is greater than zero.
307  //
308  // Pre-conditions:
309  // mJobSet must not be empty.
310  // mJobSet holds at least one RunnableJob
311  //
312  // Post-conditions:
313  // job is a valid Job object.
314  // job is removed from mJobQueue.
315  // Waiting job count of its type is decremented
316  // Running job count of its type is incremented
317  //
318  // Invariants:
319  // The calling thread owns the JobLock
320  void
321  getNextJob(Job& job);
322 
323  // Indicates that a running Job has completed its task.
324  //
325  // Pre-conditions:
326  // Job must not exist in mJobSet.
327  // The JobType must not be invalid.
328  //
329  // Post-conditions:
330  // The running count of that JobType is decremented
331  // A new task is signaled if there are more waiting Jobs than the limit, if
332  // any.
333  //
334  // Invariants:
335  // <none>
336  void
337  finishJob(JobType type);
338 
339  // Runs the next appropriate waiting Job.
340  //
341  // Pre-conditions:
342  // A RunnableJob must exist in the JobSet
343  //
344  // Post-conditions:
345  // The chosen RunnableJob will have Job::doJob() called.
346  //
347  // Invariants:
348  // <none>
349  void
350  processTask(int instance) override;
351 
352  // Returns the limit of running jobs for the given job type.
353  // For jobs with no limit, we return the largest int. Hopefully that
354  // will be enough.
355  int
356  getJobLimit(JobType type);
357 
358  void
359  onChildrenStopped() override;
360 };
361 
362 /*
363  An RPC command is received and is handled via ServerHandler(HTTP) or
364  Handler(websocket), depending on the connection type. The handler then calls
365  the JobQueue::postCoro() method to create a coroutine and run it at a later
366  point. This frees up the handler thread and allows it to continue handling
367  other requests while the RPC command completes its work asynchronously.
368 
369  postCoro() creates a Coro object. When the Coro ctor is called, and its
370  coro_ member is initialized (a boost::coroutines::pull_type), execution
371  automatically passes to the coroutine, which we don't want at this point,
372  since we are still in the handler thread context. It's important to note
373  here that construction of a boost pull_type automatically passes execution to
374  the coroutine. A pull_type object automatically generates a push_type that is
375  passed as a parameter (do_yield) in the signature of the function the
376  pull_type was created with. This function is immediately called during coro_
377  construction and within it, Coro::yield_ is assigned the push_type
378  parameter (do_yield) address and called (yield()) so we can return execution
379  back to the caller's stack.
380 
381  postCoro() then calls Coro::post(), which schedules a job on the job
382  queue to continue execution of the coroutine in a JobQueue worker thread at
383  some later time. When the job runs, we lock on the Coro::mutex_ and call
384  coro_ which continues where we had left off. Since we the last thing we did
385  in coro_ was call yield(), the next thing we continue with is calling the
386  function param f, that was passed into Coro ctor. It is within this
387  function body that the caller specifies what he would like to do while
388  running in the coroutine and allow them to suspend and resume execution.
389  A task that relies on other events to complete, such as path finding, calls
390  Coro::yield() to suspend its execution while waiting on those events to
391  complete and continue when signaled via the Coro::post() method.
392 
393  There is a potential race condition that exists here where post() can get
394  called before yield() after f is called. Technically the problem only occurs
395  if the job that post() scheduled is executed before yield() is called.
396  If the post() job were to be executed before yield(), undefined behavior
397  would occur. The lock ensures that coro_ is not called again until we exit
398  the coroutine. At which point a scheduled resume() job waiting on the lock
399  would gain entry, harmlessly call coro_ and immediately return as we have
400  already completed the coroutine.
401 
402  The race condition occurs as follows:
403 
404  1- The coroutine is running.
405  2- The coroutine is about to suspend, but before it can do so, it must
406  arrange for some event to wake it up.
407  3- The coroutine arranges for some event to wake it up.
408  4- Before the coroutine can suspend, that event occurs and the
409  resumption of the coroutine is scheduled on the job queue. 5- Again, before
410  the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
411  Again, before the coroutine can suspend, the resumption code runs the
412  coroutine.
413  The coroutine is now running in two threads.
414 
415  The lock prevents this from happening as step 6 will block until the
416  lock is released which only happens after the coroutine completes.
417 */
418 
419 } // namespace ripple
420 
421 #include <ripple/core/Coro.ipp>
422 
423 namespace ripple {
424 
425 template <class F>
427 JobQueue::postCoro(JobType t, std::string const& name, F&& f)
428 {
429  /* First param is a detail type to make construction private.
430  Last param is the function the coroutine runs. Signature of
431  void(std::shared_ptr<Coro>).
432  */
433  auto coro = std::make_shared<Coro>(
434  Coro_create_t{}, *this, t, name, std::forward<F>(f));
435  if (!coro->post())
436  {
437  // The Coro was not successfully posted. Disable it so it's destructor
438  // can run with no negative side effects. Then destroy it.
439  coro->expectEarlyExit();
440  coro.reset();
441  }
442  return coro;
443 }
444 
445 } // namespace ripple
446 
447 #endif
ripple::Coro_create_t::Coro_create_t
Coro_create_t()=default
ripple::JobQueue::finishJob
void finishJob(JobType type)
Definition: JobQueue.cpp:381
std::is_same
ripple::JobQueue::m_jobSet
std::set< Job > m_jobSet
Definition: JobQueue.h:239
ripple::JobQueue::nSuspend_
int nSuspend_
Definition: JobQueue.h:247
std::string
STL class.
std::shared_ptr< Collector >
ripple::JobQueue::postCoro
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition: JobQueue.h:427
ripple::Logs
Manages partitions for logging.
Definition: Log.h:48
ripple::JobQueue::Coro::post
bool post()
Schedule coroutine execution.
ripple::JobQueue::JobQueue
JobQueue(beast::insight::Collector::ptr const &collector, Stoppable &parent, beast::Journal journal, Logs &logs, perf::PerfLog &perfLog)
Definition: JobQueue.cpp:26
ripple::JobQueue::getJobLimit
int getJobLimit(JobType type)
Definition: JobQueue.cpp:454
std::chrono::milliseconds
ripple::JobQueue::Coro::coro_
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
Definition: JobQueue.h:70
ripple::JobQueue::Coro::expectEarlyExit
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
ripple::JobQueue::getJson
Json::Value getJson(int c=0)
Definition: JobQueue.cpp:220
ripple::JobQueue::checkStopped
void checkStopped(std::lock_guard< std::mutex > const &lock)
Definition: JobQueue.cpp:306
std::lock_guard
STL class.
ripple::perf::PerfLog
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:46
ripple::JobQueue::addRefCountedJob
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition: JobQueue.cpp:77
ripple::JobQueue::addJob
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition: JobQueue.h:166
std::function
ripple::JobQueue::Coro::jq_
JobQueue & jq_
Definition: JobQueue.h:63
ripple::Workers::Callback
Called to perform tasks as needed.
Definition: Workers.h:43
ripple::JobQueue::m_journal
beast::Journal m_journal
Definition: JobQueue.h:236
ripple::JobQueue::onStop
void onStop() override
Override called when the stop notification is issued.
Definition: JobQueue.cpp:299
ripple::JobQueue::Coro
Coroutines must run to completion.
Definition: JobQueue.h:59
ripple::JobQueue::Coro::~Coro
~Coro()
ripple::JobQueue::getJobCount
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition: JobQueue.cpp:123
ripple::JobQueue::m_invalidJobData
JobTypeData m_invalidJobData
Definition: JobQueue.h:241
ripple::JobQueue::getJobCountTotal
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition: JobQueue.cpp:133
ripple::JobQueue::Coro::resume
void resume()
Resume coroutine execution.
ripple::Stoppable
Provides an interface for starting and stopping.
Definition: Stoppable.h:201
ripple::JobQueue::Coro::lvs_
detail::LocalValues lvs_
Definition: JobQueue.h:62
ripple::JobQueue::Coro::Coro
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
ripple::JobQueue::~JobQueue
~JobQueue()
Definition: JobQueue.cpp:63
ripple::JobQueue::hook
beast::insight::Hook hook
Definition: JobQueue.h:256
std::enable_if_t
ripple::JobQueue::Coro::cv_
std::condition_variable cv_
Definition: JobQueue.h:69
ripple::JobQueue::m_mutex
std::mutex m_mutex
Definition: JobQueue.h:237
ripple::JobQueue::m_collector
beast::insight::Collector::ptr m_collector
Definition: JobQueue.h:254
ripple::JobQueue::isOverloaded
bool isOverloaded()
Definition: JobQueue.cpp:206
ripple::Stoppable::jobCounter
JobCounter & jobCounter()
Definition: Stoppable.h:420
ripple::JobQueue::processTask
void processTask(int instance) override
Perform a task.
Definition: JobQueue.cpp:400
std::enable_shared_from_this
ripple::JobQueue::onChildrenStopped
void onChildrenStopped() override
Override called when all children have stopped.
Definition: JobQueue.cpp:463
ripple::JobQueue::m_jobData
JobDataMap m_jobData
Definition: JobQueue.h:240
ripple::JobTypeData
Definition: JobTypeData.h:29
ripple::Job
Definition: Job.h:84
beast::Journal
A generic endpoint for log messages.
Definition: Journal.h:58
std::uint64_t
ripple::JobQueue::getJobTypeData
JobTypeData & getJobTypeData(JobType type)
Definition: JobQueue.cpp:285
ripple::JobQueue::m_lastJob
std::uint64_t m_lastJob
Definition: JobQueue.h:238
ripple::JobQueue::Coro::yield
void yield() const
Suspend coroutine execution.
std::map< JobType, JobTypeData >
ripple::JobQueue::rendezvous
void rendezvous()
Block until no tasks running.
Definition: JobQueue.cpp:278
ripple::JobQueue::Coro::join
void join()
Waits until coroutine returns from the user function.
ripple::JobQueue::Coro::type_
JobType type_
Definition: JobQueue.h:64
ripple::Workers
A group of threads that process tasks.
Definition: Workers.h:39
beast::insight::Gauge
A metric for measuring an integral value.
Definition: Gauge.h:39
ripple::JobQueue::m_cancelCallback
Job::CancelCallback m_cancelCallback
Definition: JobQueue.h:250
ripple::JobQueue
A pool of threads to perform work.
Definition: JobQueue.h:55
ripple::JobQueue::getJobCountGE
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition: JobQueue.cpp:143
ripple::JobQueue::Coro::name_
std::string name_
Definition: JobQueue.h:65
ripple
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: RCLCensorshipDetector.h:29
ripple::JobQueue::Coro::yield_
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
Definition: JobQueue.h:71
ripple::JobQueue::queueJob
void queueJob(Job const &job, std::lock_guard< std::mutex > const &lock)
Definition: JobQueue.cpp:324
ripple::JobQueue::perfLog_
perf::PerfLog & perfLog_
Definition: JobQueue.h:253
ripple::JobQueue::setThreadCount
void setThreadCount(int c, bool const standaloneMode)
Set the number of thread serving the job queue to precisely this number.
Definition: JobQueue.cpp:160
ripple::JobQueue::Coro::finished_
bool finished_
Definition: JobQueue.h:73
std::condition_variable
ripple::Coro_create_t
Definition: JobQueue.h:40
std::mutex
STL class.
ripple::JobType
JobType
Definition: Job.h:33
ripple::JobQueue::job_count
beast::insight::Gauge job_count
Definition: JobQueue.h:255
ripple::JobQueue::Coro::mutex_run_
std::mutex mutex_run_
Definition: JobQueue.h:68
ripple::JobQueue::Coro::mutex_
std::mutex mutex_
Definition: JobQueue.h:67
ripple::JobQueue::m_processCount
int m_processCount
Definition: JobQueue.h:244
ripple::JobQueue::cv_
std::condition_variable cv_
Definition: JobQueue.h:258
ripple::JobQueue::makeLoadEvent
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition: JobQueue.cpp:183
ripple::JobQueue::m_workers
Workers m_workers
Definition: JobQueue.h:249
ripple::JobQueue::addLoadEvents
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition: JobQueue.cpp:195
ripple::detail::LocalValues
Definition: LocalValue.h:31
std::unique_ptr
STL class.
beast::insight::Hook
A reference to a handler for performing polled collection.
Definition: Hook.h:31
std::set
STL class.
ripple::JobQueue::getNextJob
void getNextJob(Job &job)
Definition: JobQueue.cpp:347
Json::Value
Represents a JSON value.
Definition: json_value.h:145
ripple::JobQueue::Coro::operator=
Coro & operator=(Coro const &)=delete
ripple::JobQueue::Coro::running_
bool running_
Definition: JobQueue.h:66
ripple::JobQueue::collect
void collect()
Definition: JobQueue.cpp:70
ripple::JobQueue::Coro::runnable
bool runnable() const
Returns true if the Coro is still runnable (has not returned).