Convert Workers to std::thread (RIPD-1189)

This commit is contained in:
Scott Schurr
2016-12-19 18:53:12 -08:00
committed by Nik Bougalis
parent 1989b1028f
commit be9c955506
4 changed files with 94 additions and 47 deletions

View File

@@ -80,9 +80,8 @@ void Workers::setNumberOfThreads (int numberOfThreads)
else else
{ {
worker = new Worker (*this, m_threadNames); worker = new Worker (*this, m_threadNames);
m_everyone.push_front (worker);
} }
m_everyone.push_front (worker);
} }
} }
else if (numberOfThreads < m_numberOfThreads) else if (numberOfThreads < m_numberOfThreads)
@@ -96,7 +95,7 @@ void Workers::setNumberOfThreads (int numberOfThreads)
++m_pauseCount; ++m_pauseCount;
// Pausing a thread counts as one "internal task" // Pausing a thread counts as one "internal task"
m_semaphore.signal (); m_semaphore.notify ();
} }
} }
@@ -115,7 +114,7 @@ void Workers::pauseAllThreadsAndWait ()
void Workers::addTask () void Workers::addTask ()
{ {
m_semaphore.signal (); m_semaphore.notify ();
} }
int Workers::numberOfCurrentlyRunningTasks () const noexcept int Workers::numberOfCurrentlyRunningTasks () const noexcept
@@ -144,27 +143,48 @@ void Workers::deleteWorkers (beast::LockFreeStack <Worker>& stack)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Workers::Worker::Worker (Workers& workers, std::string const& threadName) Workers::Worker::Worker (Workers& workers, std::string const& threadName)
: Thread (threadName) : m_workers {workers}
, m_workers (workers) , threadName_ {threadName}
, wakeCount_ {0}
, shouldExit_ {false}
{ {
startThread (); thread_ = std::thread {&Workers::Worker::run, this};
} }
Workers::Worker::~Worker () Workers::Worker::~Worker ()
{ {
stopThread (); {
std::lock_guard <std::mutex> lock {mutex_};
++wakeCount_;
shouldExit_ = true;
}
wakeup_.notify_one();
thread_.join();
}
void Workers::Worker::notify ()
{
std::lock_guard <std::mutex> lock {mutex_};
++wakeCount_;
wakeup_.notify_one();
}
static void setInactiveThreadName (std::string const& threadName)
{
beast::Thread::setCurrentThreadName ("(" + threadName + ")");
} }
void Workers::Worker::run () void Workers::Worker::run ()
{ {
// Call runImpl() and report if any exceptions escape runImpl. setInactiveThreadName (threadName_);
threadEntry (this, &Workers::Worker::runImpl, threadEntry (this, &Workers::Worker::runImpl, threadName_);
"Workers::Worker::run(); Thread: " + Thread::getThreadName());
} }
void Workers::Worker::runImpl () void Workers::Worker::runImpl ()
{ {
while (! threadShouldExit ()) bool shouldExit = true;
do
{ {
// Increment the count of active workers, and if // Increment the count of active workers, and if
// we are the first one then reset the "all paused" event // we are the first one then reset the "all paused" event
@@ -208,7 +228,7 @@ void Workers::Worker::runImpl ()
--m_workers.m_runningTaskCount; --m_workers.m_runningTaskCount;
// Put the name back in case the callback changed it // Put the name back in case the callback changed it
Thread::setCurrentThreadName (Thread::getThreadName()); beast::Thread::setCurrentThreadName (threadName_);
} }
// Any worker that goes into the paused list must // Any worker that goes into the paused list must
@@ -223,17 +243,22 @@ void Workers::Worker::runImpl ()
if (--m_workers.m_activeCount == 0) if (--m_workers.m_activeCount == 0)
m_workers.m_allPaused.signal (); m_workers.m_allPaused.signal ();
Thread::setCurrentThreadName ("(" + getThreadName() + ")"); setInactiveThreadName (threadName_);
// [1] We will be here when the paused list is popped // [1] We will be here when the paused list is popped
// //
// We block on our event object, a requirement of being // We block on our condition_variable, wakeup_, a requirement of being
// put into the paused list. // put into the paused list.
// //
// This will get signaled on either a reactivate or a stopThread() // wakeup_ will get signaled by either Worker::notify() or ~Worker.
// {
wait (); std::unique_lock <std::mutex> lock {mutex_};
} wakeup_.wait (lock, [this] {return this->wakeCount_ > 0;});
shouldExit = shouldExit_;
--wakeCount_;
}
} while (! shouldExit);
} }
} // ripple } // ripple

View File

@@ -24,6 +24,8 @@
#include <ripple/beast/core/Thread.h> #include <ripple/beast/core/Thread.h>
#include <ripple/beast/core/LockFreeStack.h> #include <ripple/beast/core/LockFreeStack.h>
#include <atomic> #include <atomic>
#include <condition_variable>
#include <mutex>
#include <string> #include <string>
#include <thread> #include <thread>
@@ -119,19 +121,27 @@ private:
class Worker class Worker
: public beast::LockFreeStack <Worker>::Node : public beast::LockFreeStack <Worker>::Node
, public beast::LockFreeStack <Worker, PausedTag>::Node , public beast::LockFreeStack <Worker, PausedTag>::Node
, public beast::Thread
{ {
public: public:
Worker (Workers& workers, std::string const& threadName); Worker (Workers& workers, std::string const& threadName);
~Worker (); ~Worker ();
void notify ();
private: private:
void run () override; void run ();
void runImpl (); void runImpl ();
private: private:
Workers& m_workers; Workers& m_workers;
std::string const threadName_;
std::thread thread_;
std::mutex mutex_;
std::condition_variable wakeup_;
int wakeCount_; // how many times to un-pause
bool shouldExit_;
}; };
private: private:

View File

@@ -54,9 +54,6 @@ public:
m_cond.notify_one (); m_cond.notify_one ();
} }
// Deprecated, for backward compatibility
void signal () { notify (); }
/** Block until notify is called. */ /** Block until notify is called. */
void wait () void wait ()
{ {

View File

@@ -29,9 +29,9 @@ class Workers_test : public beast::unit_test::suite
public: public:
struct TestCallback : Workers::Callback struct TestCallback : Workers::Callback
{ {
explicit TestCallback(int count_) TestCallback()
: finished(false, count_ == 0) : finished(false, false)
, count(count_) , count(0)
{ {
} }
@@ -45,26 +45,41 @@ public:
std::atomic <int> count; std::atomic <int> count;
}; };
void testThreads(int const threadCount) void testThreads(int const tc1, int const tc2, int const tc3)
{ {
testcase("threadCount = " + std::to_string(threadCount)); testcase("threadCounts: " + std::to_string(tc1) +
" -> " + std::to_string(tc2) + " -> " + std::to_string(tc3));
TestCallback cb(threadCount); TestCallback cb;
Workers w(cb, "Test", 0); Workers w(cb, "Test", tc1);
BEAST_EXPECT(w.getNumberOfThreads() == 0); BEAST_EXPECT(w.getNumberOfThreads() == tc1);
w.setNumberOfThreads(threadCount); auto testForThreadCount = [this, &cb, &w] (int const threadCount)
BEAST_EXPECT(w.getNumberOfThreads() == threadCount); {
// Prepare the callback.
cb.count = threadCount;
if (threadCount == 0)
cb.finished.signal();
else
cb.finished.reset();
for (int i = 0; i < threadCount; ++i) // Execute the test.
w.addTask(); w.setNumberOfThreads(threadCount);
BEAST_EXPECT(w.getNumberOfThreads() == threadCount);
// 10 seconds should be enough to finish on any system for (int i = 0; i < threadCount; ++i)
// w.addTask();
bool signaled = cb.finished.wait(10 * 1000);
BEAST_EXPECT(signaled);
// 10 seconds should be enough to finish on any system
//
bool const signaled = cb.finished.wait(10 * 1000);
BEAST_EXPECT(signaled);
BEAST_EXPECT(cb.count.load() == 0);
};
testForThreadCount (tc1);
testForThreadCount (tc2);
testForThreadCount (tc3);
w.pauseAllThreadsAndWait(); w.pauseAllThreadsAndWait();
// We had better finished all our work! // We had better finished all our work!
@@ -73,15 +88,15 @@ public:
void run() void run()
{ {
testThreads(0); testThreads( 0, 0, 0);
testThreads(1); testThreads( 1, 0, 1);
testThreads(2); testThreads( 2, 1, 2);
testThreads(4); testThreads( 4, 3, 5);
testThreads(16); testThreads(16, 4, 15);
testThreads(64); testThreads(64, 3, 65);
} }
}; };
BEAST_DEFINE_TESTSUITE(Workers, core, ripple); BEAST_DEFINE_TESTSUITE(Workers, core, ripple);
} }