rippled
Loading...
Searching...
No Matches
Workers.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/core/detail/Workers.h>
21#include <xrpld/perflog/PerfLog.h>
22#include <xrpl/beast/core/CurrentThreadName.h>
23#include <xrpl/beast/utility/instrumentation.h>
24
25namespace ripple {
26
28 Callback& callback,
29 perf::PerfLog* perfLog,
30 std::string const& threadNames,
31 int numberOfThreads)
32 : m_callback(callback)
33 , perfLog_(perfLog)
34 , m_threadNames(threadNames)
35 , m_allPaused(true)
36 , m_semaphore(0)
37 , m_numberOfThreads(0)
38 , m_activeCount(0)
39 , m_pauseCount(0)
40 , m_runningTaskCount(0)
41{
42 setNumberOfThreads(numberOfThreads);
43}
44
46{
47 stop();
48
50}
51
52int
54{
55 return m_numberOfThreads;
56}
57
58// VFALCO NOTE if this function is called quickly to reduce then
59// increase the number of threads, it could result in
60// more paused threads being created than expected.
61//
62void
63Workers::setNumberOfThreads(int numberOfThreads)
64{
65 static int instance{0};
66 if (m_numberOfThreads == numberOfThreads)
67 return;
68
69 if (perfLog_)
70 perfLog_->resizeJobs(numberOfThreads);
71
72 if (numberOfThreads > m_numberOfThreads)
73 {
74 // Increasing the number of working threads
75 int const amount = numberOfThreads - m_numberOfThreads;
76
77 for (int i = 0; i < amount; ++i)
78 {
79 // See if we can reuse a paused worker
80 Worker* worker = m_paused.pop_front();
81
82 if (worker != nullptr)
83 {
84 // If we got here then the worker thread is at [1]
85 // This will unblock their call to wait()
86 //
87 worker->notify();
88 }
89 else
90 {
91 worker = new Worker(*this, m_threadNames, instance++);
92 m_everyone.push_front(worker);
93 }
94 }
95 }
96 else
97 {
98 // Decreasing the number of working threads
99 int const amount = m_numberOfThreads - numberOfThreads;
100
101 for (int i = 0; i < amount; ++i)
102 {
103 ++m_pauseCount;
104
105 // Pausing a thread counts as one "internal task"
107 }
108 }
109
110 m_numberOfThreads = numberOfThreads;
111}
112
113void
115{
117
119 m_cv.wait(lk, [this] { return m_allPaused; });
120 lk.unlock();
121
122 XRPL_ASSERT(
124 "ripple::Workers::stop : zero running tasks");
125}
126
127void
129{
131}
132
133int
135{
136 return m_runningTaskCount.load();
137}
138
139void
141{
142 for (;;)
143 {
144 Worker* const worker = stack.pop_front();
145
146 if (worker != nullptr)
147 {
148 // This call blocks until the thread orderly exits
149 delete worker;
150 }
151 else
152 {
153 break;
154 }
155 }
156}
157
158//------------------------------------------------------------------------------
159
161 Workers& workers,
162 std::string const& threadName,
163 int const instance)
164 : m_workers{workers}
165 , threadName_{threadName}
166 , instance_{instance}
167 , wakeCount_{0}
168 , shouldExit_{false}
169{
171}
172
174{
175 {
176 std::lock_guard lock{mutex_};
177 ++wakeCount_;
178 shouldExit_ = true;
179 }
180
181 wakeup_.notify_one();
182 thread_.join();
183}
184
185void
187{
188 std::lock_guard lock{mutex_};
189 ++wakeCount_;
190 wakeup_.notify_one();
191}
192
193void
195{
196 bool shouldExit = true;
197 do
198 {
199 // Increment the count of active workers, and if
200 // we are the first one then reset the "all paused" event
201 //
202 if (++m_workers.m_activeCount == 1)
203 {
204 std::lock_guard lk{m_workers.m_mut};
205 m_workers.m_allPaused = false;
206 }
207
208 for (;;)
209 {
210 // Put the name back in case the callback changed it
211 beast::setCurrentThreadName(threadName_);
212
213 // Acquire a task or "internal task."
214 //
215 m_workers.m_semaphore.wait();
216
217 // See if there's a pause request. This
218 // counts as an "internal task."
219 //
220 int pauseCount = m_workers.m_pauseCount.load();
221
222 if (pauseCount > 0)
223 {
224 // Try to decrement
225 pauseCount = --m_workers.m_pauseCount;
226
227 if (pauseCount >= 0)
228 {
229 // We got paused
230 break;
231 }
232 else
233 {
234 // Undo our decrement
235 ++m_workers.m_pauseCount;
236 }
237 }
238
239 // We couldn't pause so we must have gotten
240 // unblocked in order to process a task.
241 //
242 ++m_workers.m_runningTaskCount;
243 m_workers.m_callback.processTask(instance_);
244 --m_workers.m_runningTaskCount;
245 }
246
247 // Any worker that goes into the paused list must
248 // guarantee that it will eventually block on its
249 // event object.
250 //
251 m_workers.m_paused.push_front(this);
252
253 // Decrement the count of active workers, and if we
254 // are the last one then signal the "all paused" event.
255 //
256 if (--m_workers.m_activeCount == 0)
257 {
258 std::lock_guard lk{m_workers.m_mut};
259 m_workers.m_allPaused = true;
260 m_workers.m_cv.notify_all();
261 }
262
263 // Set inactive thread name.
264 beast::setCurrentThreadName("(" + threadName_ + ")");
265
266 // [1] We will be here when the paused list is popped
267 //
268 // We block on our condition_variable, wakeup_, a requirement of being
269 // put into the paused list.
270 //
271 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
272 {
273 std::unique_lock<std::mutex> lock{mutex_};
274 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
275
276 shouldExit = shouldExit_;
277 --wakeCount_;
278 }
279 } while (!shouldExit);
280}
281
282} // namespace ripple
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
std::thread thread_
Definition: Workers.h:206
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition: Workers.cpp:160
Workers is effectively a thread pool.
Definition: Workers.h:80
std::mutex m_mut
Definition: Workers.h:222
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition: Workers.cpp:27
std::condition_variable m_cv
Definition: Workers.h:221
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:53
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition: Workers.cpp:140
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition: Workers.h:232
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition: Workers.cpp:134
void addTask()
Add a task to be performed.
Definition: Workers.cpp:128
int m_numberOfThreads
Definition: Workers.h:225
std::string m_threadNames
Definition: Workers.h:220
beast::LockFreeStack< Worker > m_everyone
Definition: Workers.h:230
bool m_allPaused
Definition: Workers.h:223
std::atomic< int > m_pauseCount
Definition: Workers.h:227
void stop()
Pause all threads and wait until they are paused.
Definition: Workers.cpp:114
perf::PerfLog * perfLog_
Definition: Workers.h:219
std::atomic< int > m_runningTaskCount
Definition: Workers.h:229
semaphore m_semaphore
Definition: Workers.h:224
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition: Workers.cpp:63
void notify()
Increment the count and unblock one waiting thread.
Definition: semaphore.h:48
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:49
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
T join(T... args)
T load(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
Called to perform tasks as needed.
Definition: Workers.h:84