rippled
Loading...
Searching...
No Matches
JobQueue.h
1#pragma once
2
3#include <xrpl/basics/LocalValue.h>
4#include <xrpl/core/ClosureCounter.h>
5#include <xrpl/core/JobTypeData.h>
6#include <xrpl/core/JobTypes.h>
7#include <xrpl/core/detail/Workers.h>
8#include <xrpl/json/json_value.h>
9
10#include <boost/coroutine/all.hpp>
11
12#include <set>
13
14namespace xrpl {
15
16namespace perf {
17class PerfLog;
18}
19
20class Logs;
22{
23 explicit Coro_create_t() = default;
24};
25
37{
38public:
41 {
42 private:
51 boost::coroutines::asymmetric_coroutine<void>::pull_type coro_;
52 boost::coroutines::asymmetric_coroutine<void>::push_type* yield_;
53#ifndef NDEBUG
54 bool finished_ = false;
55#endif
56
57 public:
58 // Private: Used in the implementation
59 template <class F>
61
62 // Not copy-constructible or assignable
63 Coro(Coro const&) = delete;
64 Coro&
65 operator=(Coro const&) = delete;
66
68
78 void
79 yield() const;
80
94 bool
96
106 void
108
110 bool
111 runnable() const;
112
114 void
116
118 void
120 };
121
122 using JobFunction = std::function<void()>;
123
124 JobQueue(
125 int threadCount,
126 beast::insight::Collector::ptr const& collector,
127 beast::Journal journal,
128 Logs& logs,
129 perf::PerfLog& perfLog);
130 ~JobQueue();
131
141 template <
142 typename JobHandler,
144 bool
145 addJob(JobType type, std::string const& name, JobHandler&& jobHandler)
146 {
147 if (auto optionalCountedJob = jobCounter_.wrap(std::forward<JobHandler>(jobHandler)))
148 {
149 return addRefCountedJob(type, name, std::move(*optionalCountedJob));
150 }
151 return false;
152 }
153
163 template <class F>
165 postCoro(JobType t, std::string const& name, F&& f);
166
169 int
170 getJobCount(JobType t) const;
171
174 int
175 getJobCountTotal(JobType t) const;
176
179 int
180 getJobCountGE(JobType t) const;
181
185 makeLoadEvent(JobType t, std::string const& name);
186
189 void
190 addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed);
191
192 // Cannot be const because LoadMonitor has no const methods.
193 bool
194 isOverloaded();
195
196 // Cannot be const because LoadMonitor has no const methods.
198 getJson(int c = 0);
199
201 void
202 rendezvous();
203
204 void
205 stop();
206
207 bool
209 {
210 return stopping_;
211 }
212
213 // We may be able to move away from this, but we can keep it during the
214 // transition.
215 bool
216 isStopped() const;
217
218private:
219 friend class Coro;
220
222
232
233 // The number of jobs currently in processTask()
235
236 // The number of suspended coroutines
237 int nSuspend_ = 0;
238
240
241 // Statistics tracking
246
248
249 void
250 collect();
253
254 // Adds a reference counted job to the JobQueue.
255 //
256 // param type The type of job.
257 // param name Name of the job.
258 // param func std::function with signature void (Job&). Called when the
259 // job is executed.
260 //
261 // return true if func added to queue.
262 bool
263 addRefCountedJob(JobType type, std::string const& name, JobFunction const& func);
264
265 // Returns the next Job we should run now.
266 //
267 // RunnableJob:
268 // A Job in the JobSet whose slots count for its type is greater than zero.
269 //
270 // Pre-conditions:
271 // mJobSet must not be empty.
272 // mJobSet holds at least one RunnableJob
273 //
274 // Post-conditions:
275 // job is a valid Job object.
276 // job is removed from mJobQueue.
277 // Waiting job count of its type is decremented
278 // Running job count of its type is incremented
279 //
280 // Invariants:
281 // The calling thread owns the JobLock
282 void
283 getNextJob(Job& job);
284
285 // Indicates that a running Job has completed its task.
286 //
287 // Pre-conditions:
288 // Job must not exist in mJobSet.
289 // The JobType must not be invalid.
290 //
291 // Post-conditions:
292 // The running count of that JobType is decremented
293 // A new task is signaled if there are more waiting Jobs than the limit, if
294 // any.
295 //
296 // Invariants:
297 // <none>
298 void
299 finishJob(JobType type);
300
301 // Runs the next appropriate waiting Job.
302 //
303 // Pre-conditions:
304 // A RunnableJob must exist in the JobSet
305 //
306 // Post-conditions:
307 // The chosen RunnableJob will have Job::doJob() called.
308 //
309 // Invariants:
310 // <none>
311 void
312 processTask(int instance) override;
313
314 // Returns the limit of running jobs for the given job type.
315 // For jobs with no limit, we return the largest int. Hopefully that
316 // will be enough.
317 int
318 getJobLimit(JobType type);
319};
320
321/*
322 An RPC command is received and is handled via ServerHandler(HTTP) or
323 Handler(websocket), depending on the connection type. The handler then calls
324 the JobQueue::postCoro() method to create a coroutine and run it at a later
325 point. This frees up the handler thread and allows it to continue handling
326 other requests while the RPC command completes its work asynchronously.
327
328 postCoro() creates a Coro object. When the Coro ctor is called, and its
329 coro_ member is initialized (a boost::coroutines::pull_type), execution
330 automatically passes to the coroutine, which we don't want at this point,
331 since we are still in the handler thread context. It's important to note
332 here that construction of a boost pull_type automatically passes execution to
333 the coroutine. A pull_type object automatically generates a push_type that is
334 passed as a parameter (do_yield) in the signature of the function the
335 pull_type was created with. This function is immediately called during coro_
336 construction and within it, Coro::yield_ is assigned the push_type
337 parameter (do_yield) address and called (yield()) so we can return execution
338 back to the caller's stack.
339
340 postCoro() then calls Coro::post(), which schedules a job on the job
341 queue to continue execution of the coroutine in a JobQueue worker thread at
342 some later time. When the job runs, we lock on the Coro::mutex_ and call
343 coro_ which continues where we had left off. Since we the last thing we did
344 in coro_ was call yield(), the next thing we continue with is calling the
345 function param f, that was passed into Coro ctor. It is within this
346 function body that the caller specifies what he would like to do while
347 running in the coroutine and allow them to suspend and resume execution.
348 A task that relies on other events to complete, such as path finding, calls
349 Coro::yield() to suspend its execution while waiting on those events to
350 complete and continue when signaled via the Coro::post() method.
351
352 There is a potential race condition that exists here where post() can get
353 called before yield() after f is called. Technically the problem only occurs
354 if the job that post() scheduled is executed before yield() is called.
355 If the post() job were to be executed before yield(), undefined behavior
356 would occur. The lock ensures that coro_ is not called again until we exit
357 the coroutine. At which point a scheduled resume() job waiting on the lock
358 would gain entry, harmlessly call coro_ and immediately return as we have
359 already completed the coroutine.
360
361 The race condition occurs as follows:
362
363 1- The coroutine is running.
364 2- The coroutine is about to suspend, but before it can do so, it must
365 arrange for some event to wake it up.
366 3- The coroutine arranges for some event to wake it up.
367 4- Before the coroutine can suspend, that event occurs and the
368 resumption of the coroutine is scheduled on the job queue. 5- Again, before
369 the coroutine can suspend, the resumption of the coroutine is dispatched. 6-
370 Again, before the coroutine can suspend, the resumption code runs the
371 coroutine.
372 The coroutine is now running in two threads.
373
374 The lock prevents this from happening as step 6 will block until the
375 lock is released which only happens after the coroutine completes.
376*/
377
378} // namespace xrpl
379
380#include <xrpl/core/Coro.ipp>
381
382namespace xrpl {
383
384template <class F>
387{
388 /* First param is a detail type to make construction private.
389 Last param is the function the coroutine runs. Signature of
390 void(std::shared_ptr<Coro>).
391 */
392 auto coro = std::make_shared<Coro>(Coro_create_t{}, *this, t, name, std::forward<F>(f));
393 if (!coro->post())
394 {
395 // The Coro was not successfully posted. Disable it so it's destructor
396 // can run with no negative side effects. Then destroy it.
397 coro->expectEarlyExit();
398 coro.reset();
399 }
400 return coro;
401}
402
403} // namespace xrpl
Represents a JSON value.
Definition json_value.h:130
A generic endpoint for log messages.
Definition Journal.h:40
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
std::optional< Substitute< Closure > > wrap(Closure &&closure)
Wrap the passed closure with a reference counter.
Coroutines must run to completion.
Definition JobQueue.h:41
Coro(Coro const &)=delete
Coro(Coro_create_t, JobQueue &, JobType, std::string const &, F &&)
std::mutex mutex_
Definition JobQueue.h:48
bool post()
Schedule coroutine execution.
bool runnable() const
Returns true if the Coro is still runnable (has not returned).
void yield() const
Suspend coroutine execution.
std::mutex mutex_run_
Definition JobQueue.h:49
std::string name_
Definition JobQueue.h:46
void expectEarlyExit()
Once called, the Coro allows early exit without an assert.
std::condition_variable cv_
Definition JobQueue.h:50
boost::coroutines::asymmetric_coroutine< void >::push_type * yield_
Definition JobQueue.h:52
boost::coroutines::asymmetric_coroutine< void >::pull_type coro_
Definition JobQueue.h:51
void join()
Waits until coroutine returns from the user function.
Coro & operator=(Coro const &)=delete
void resume()
Resume coroutine execution.
detail::LocalValues lvs_
Definition JobQueue.h:43
A pool of threads to perform work.
Definition JobQueue.h:37
int getJobCountGE(JobType t) const
All waiting jobs at or greater than this priority.
Definition JobQueue.cpp:126
void processTask(int instance) override
Perform a task.
Definition JobQueue.cpp:325
beast::Journal m_journal
Definition JobQueue.h:223
std::mutex m_mutex
Definition JobQueue.h:224
JobCounter jobCounter_
Definition JobQueue.h:227
JobTypeData & getJobTypeData(JobType type)
Definition JobQueue.cpp:236
std::shared_ptr< Coro > postCoro(JobType t, std::string const &name, F &&f)
Creates a coroutine and adds a job to the queue which will run it.
Definition JobQueue.h:386
int getJobCountTotal(JobType t) const
Jobs waiting plus running at this priority.
Definition JobQueue.cpp:116
JobTypeData m_invalidJobData
Definition JobQueue.h:231
beast::insight::Gauge job_count
Definition JobQueue.h:244
bool isStopped() const
Definition JobQueue.cpp:271
void rendezvous()
Block until no jobs running.
Definition JobQueue.cpp:229
Workers m_workers
Definition JobQueue.h:239
std::atomic_bool stopping_
Definition JobQueue.h:228
JobDataMap m_jobData
Definition JobQueue.h:230
bool addJob(JobType type, std::string const &name, JobHandler &&jobHandler)
Adds a job to the JobQueue.
Definition JobQueue.h:145
int getJobCount(JobType t) const
Jobs waiting at this priority.
Definition JobQueue.cpp:106
std::set< Job > m_jobSet
Definition JobQueue.h:226
bool addRefCountedJob(JobType type, std::string const &name, JobFunction const &func)
Definition JobQueue.cpp:60
bool isOverloaded()
Definition JobQueue.cpp:166
std::atomic_bool stopped_
Definition JobQueue.h:229
bool isStopping() const
Definition JobQueue.h:208
std::condition_variable cv_
Definition JobQueue.h:247
int getJobLimit(JobType type)
Definition JobQueue.cpp:376
std::uint64_t m_lastJob
Definition JobQueue.h:225
beast::insight::Hook hook
Definition JobQueue.h:245
void addLoadEvents(JobType t, int count, std::chrono::milliseconds elapsed)
Add multiple load events.
Definition JobQueue.cpp:155
std::unique_ptr< LoadEvent > makeLoadEvent(JobType t, std::string const &name)
Return a scoped LoadEvent.
Definition JobQueue.cpp:143
beast::insight::Collector::ptr m_collector
Definition JobQueue.h:243
perf::PerfLog & perfLog_
Definition JobQueue.h:242
void finishJob(JobType type)
Definition JobQueue.cpp:306
Json::Value getJson(int c=0)
Definition JobQueue.cpp:172
void getNextJob(Job &job)
Definition JobQueue.cpp:277
Manages partitions for logging.
Definition Log.h:32
Workers is effectively a thread pool.
Definition Workers.h:61
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition PerfLog.h:31
T is_same_v
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
JobType
Definition Job.h:14
T reset(T... args)
Coro_create_t()=default
Called to perform tasks as needed.
Definition Workers.h:65