rippled
Loading...
Searching...
No Matches
Workers.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/core/detail/Workers.h>
21#include <xrpld/perflog/PerfLog.h>
22
23#include <xrpl/beast/core/CurrentThreadName.h>
24#include <xrpl/beast/utility/instrumentation.h>
25
26namespace ripple {
27
29 Callback& callback,
30 perf::PerfLog* perfLog,
31 std::string const& threadNames,
32 int numberOfThreads)
33 : m_callback(callback)
34 , perfLog_(perfLog)
35 , m_threadNames(threadNames)
36 , m_allPaused(true)
37 , m_semaphore(0)
38 , m_numberOfThreads(0)
39 , m_activeCount(0)
40 , m_pauseCount(0)
41 , m_runningTaskCount(0)
42{
43 setNumberOfThreads(numberOfThreads);
44}
45
47{
48 stop();
49
51}
52
53int
55{
56 return m_numberOfThreads;
57}
58
59// VFALCO NOTE if this function is called quickly to reduce then
60// increase the number of threads, it could result in
61// more paused threads being created than expected.
62//
63void
64Workers::setNumberOfThreads(int numberOfThreads)
65{
66 static int instance{0};
67 if (m_numberOfThreads == numberOfThreads)
68 return;
69
70 if (perfLog_)
71 perfLog_->resizeJobs(numberOfThreads);
72
73 if (numberOfThreads > m_numberOfThreads)
74 {
75 // Increasing the number of working threads
76 int const amount = numberOfThreads - m_numberOfThreads;
77
78 for (int i = 0; i < amount; ++i)
79 {
80 // See if we can reuse a paused worker
81 Worker* worker = m_paused.pop_front();
82
83 if (worker != nullptr)
84 {
85 // If we got here then the worker thread is at [1]
86 // This will unblock their call to wait()
87 //
88 worker->notify();
89 }
90 else
91 {
92 worker = new Worker(*this, m_threadNames, instance++);
93 m_everyone.push_front(worker);
94 }
95 }
96 }
97 else
98 {
99 // Decreasing the number of working threads
100 int const amount = m_numberOfThreads - numberOfThreads;
101
102 for (int i = 0; i < amount; ++i)
103 {
104 ++m_pauseCount;
105
106 // Pausing a thread counts as one "internal task"
108 }
109 }
110
111 m_numberOfThreads = numberOfThreads;
112}
113
114void
116{
118
120 m_cv.wait(lk, [this] { return m_allPaused; });
121 lk.unlock();
122
123 XRPL_ASSERT(
125 "ripple::Workers::stop : zero running tasks");
126}
127
128void
130{
132}
133
134int
136{
137 return m_runningTaskCount.load();
138}
139
140void
142{
143 for (;;)
144 {
145 Worker* const worker = stack.pop_front();
146
147 if (worker != nullptr)
148 {
149 // This call blocks until the thread orderly exits
150 delete worker;
151 }
152 else
153 {
154 break;
155 }
156 }
157}
158
159//------------------------------------------------------------------------------
160
162 Workers& workers,
163 std::string const& threadName,
164 int const instance)
165 : m_workers{workers}
166 , threadName_{threadName}
167 , instance_{instance}
168 , wakeCount_{0}
169 , shouldExit_{false}
170{
172}
173
175{
176 {
177 std::lock_guard lock{mutex_};
178 ++wakeCount_;
179 shouldExit_ = true;
180 }
181
182 wakeup_.notify_one();
183 thread_.join();
184}
185
186void
188{
189 std::lock_guard lock{mutex_};
190 ++wakeCount_;
191 wakeup_.notify_one();
192}
193
194void
196{
197 bool shouldExit = true;
198 do
199 {
200 // Increment the count of active workers, and if
201 // we are the first one then reset the "all paused" event
202 //
203 if (++m_workers.m_activeCount == 1)
204 {
205 std::lock_guard lk{m_workers.m_mut};
206 m_workers.m_allPaused = false;
207 }
208
209 for (;;)
210 {
211 // Put the name back in case the callback changed it
212 beast::setCurrentThreadName(threadName_);
213
214 // Acquire a task or "internal task."
215 //
216 m_workers.m_semaphore.wait();
217
218 // See if there's a pause request. This
219 // counts as an "internal task."
220 //
221 int pauseCount = m_workers.m_pauseCount.load();
222
223 if (pauseCount > 0)
224 {
225 // Try to decrement
226 pauseCount = --m_workers.m_pauseCount;
227
228 if (pauseCount >= 0)
229 {
230 // We got paused
231 break;
232 }
233 else
234 {
235 // Undo our decrement
236 ++m_workers.m_pauseCount;
237 }
238 }
239
240 // We couldn't pause so we must have gotten
241 // unblocked in order to process a task.
242 //
243 ++m_workers.m_runningTaskCount;
244 m_workers.m_callback.processTask(instance_);
245 --m_workers.m_runningTaskCount;
246 }
247
248 // Any worker that goes into the paused list must
249 // guarantee that it will eventually block on its
250 // event object.
251 //
252 m_workers.m_paused.push_front(this);
253
254 // Decrement the count of active workers, and if we
255 // are the last one then signal the "all paused" event.
256 //
257 if (--m_workers.m_activeCount == 0)
258 {
259 std::lock_guard lk{m_workers.m_mut};
260 m_workers.m_allPaused = true;
261 m_workers.m_cv.notify_all();
262 }
263
264 // Set inactive thread name.
265 beast::setCurrentThreadName("(" + threadName_ + ")");
266
267 // [1] We will be here when the paused list is popped
268 //
269 // We block on our condition_variable, wakeup_, a requirement of being
270 // put into the paused list.
271 //
272 // wakeup_ will get signaled by either Worker::notify() or ~Worker.
273 {
274 std::unique_lock<std::mutex> lock{mutex_};
275 wakeup_.wait(lock, [this] { return this->wakeCount_ > 0; });
276
277 shouldExit = shouldExit_;
278 --wakeCount_;
279 }
280 } while (!shouldExit);
281}
282
283} // namespace ripple
Multiple Producer, Multiple Consumer (MPMC) intrusive stack.
Element * pop_front()
Pop an element off the stack.
std::thread thread_
Definition: Workers.h:208
Worker(Workers &workers, std::string const &threadName, int const instance)
Definition: Workers.cpp:161
Workers is effectively a thread pool.
Definition: Workers.h:82
std::mutex m_mut
Definition: Workers.h:224
Workers(Callback &callback, perf::PerfLog *perfLog, std::string const &threadNames="Worker", int numberOfThreads=static_cast< int >(std::thread::hardware_concurrency()))
Create the object.
Definition: Workers.cpp:28
std::condition_variable m_cv
Definition: Workers.h:223
int getNumberOfThreads() const noexcept
Retrieve the desired number of threads.
Definition: Workers.cpp:54
static void deleteWorkers(beast::LockFreeStack< Worker > &stack)
Definition: Workers.cpp:141
beast::LockFreeStack< Worker, PausedTag > m_paused
Definition: Workers.h:234
int numberOfCurrentlyRunningTasks() const noexcept
Get the number of currently executing calls of Callback::processTask.
Definition: Workers.cpp:135
void addTask()
Add a task to be performed.
Definition: Workers.cpp:129
int m_numberOfThreads
Definition: Workers.h:227
std::string m_threadNames
Definition: Workers.h:222
beast::LockFreeStack< Worker > m_everyone
Definition: Workers.h:232
bool m_allPaused
Definition: Workers.h:225
std::atomic< int > m_pauseCount
Definition: Workers.h:229
void stop()
Pause all threads and wait until they are paused.
Definition: Workers.cpp:115
perf::PerfLog * perfLog_
Definition: Workers.h:221
std::atomic< int > m_runningTaskCount
Definition: Workers.h:231
semaphore m_semaphore
Definition: Workers.h:226
void setNumberOfThreads(int numberOfThreads)
Set the desired number of threads.
Definition: Workers.cpp:64
void notify()
Increment the count and unblock one waiting thread.
Definition: semaphore.h:48
Singleton class that maintains performance counters and optionally writes Json-formatted data to a di...
Definition: PerfLog.h:52
virtual void resizeJobs(int const resize)=0
Ensure enough room to store each currently executing job.
T join(T... args)
T load(T... args)
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:25
Called to perform tasks as needed.
Definition: Workers.h:86