From be9c9555066183a8ced2b5390d2ddcd9774c5280 Mon Sep 17 00:00:00 2001 From: Scott Schurr Date: Mon, 19 Dec 2016 18:53:12 -0800 Subject: [PATCH] Convert Workers to std::thread (RIPD-1189) --- src/ripple/core/impl/Workers.cpp | 63 ++++++++++++++++++++++---------- src/ripple/core/impl/Workers.h | 14 ++++++- src/ripple/core/impl/semaphore.h | 3 -- src/test/core/Workers_test.cpp | 61 +++++++++++++++++++------------ 4 files changed, 94 insertions(+), 47 deletions(-) diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index de27265ac..f9a363477 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -80,9 +80,8 @@ void Workers::setNumberOfThreads (int numberOfThreads) else { worker = new Worker (*this, m_threadNames); + m_everyone.push_front (worker); } - - m_everyone.push_front (worker); } } else if (numberOfThreads < m_numberOfThreads) @@ -96,7 +95,7 @@ void Workers::setNumberOfThreads (int numberOfThreads) ++m_pauseCount; // Pausing a thread counts as one "internal task" - m_semaphore.signal (); + m_semaphore.notify (); } } @@ -115,7 +114,7 @@ void Workers::pauseAllThreadsAndWait () void Workers::addTask () { - m_semaphore.signal (); + m_semaphore.notify (); } int Workers::numberOfCurrentlyRunningTasks () const noexcept @@ -144,27 +143,48 @@ void Workers::deleteWorkers (beast::LockFreeStack & stack) //------------------------------------------------------------------------------ Workers::Worker::Worker (Workers& workers, std::string const& threadName) - : Thread (threadName) - , m_workers (workers) + : m_workers {workers} + , threadName_ {threadName} + , wakeCount_ {0} + , shouldExit_ {false} { - startThread (); + thread_ = std::thread {&Workers::Worker::run, this}; } Workers::Worker::~Worker () { - stopThread (); + { + std::lock_guard lock {mutex_}; + ++wakeCount_; + shouldExit_ = true; + } + + wakeup_.notify_one(); + thread_.join(); +} + +void Workers::Worker::notify () +{ + std::lock_guard lock {mutex_}; + ++wakeCount_; + wakeup_.notify_one(); +} + +static void setInactiveThreadName (std::string const& threadName) +{ + beast::Thread::setCurrentThreadName ("(" + threadName + ")"); } void Workers::Worker::run () { - // Call runImpl() and report if any exceptions escape runImpl. - threadEntry (this, &Workers::Worker::runImpl, - "Workers::Worker::run(); Thread: " + Thread::getThreadName()); + setInactiveThreadName (threadName_); + threadEntry (this, &Workers::Worker::runImpl, threadName_); } void Workers::Worker::runImpl () { - while (! threadShouldExit ()) + bool shouldExit = true; + do { // Increment the count of active workers, and if // we are the first one then reset the "all paused" event @@ -208,7 +228,7 @@ void Workers::Worker::runImpl () --m_workers.m_runningTaskCount; // Put the name back in case the callback changed it - Thread::setCurrentThreadName (Thread::getThreadName()); + beast::Thread::setCurrentThreadName (threadName_); } // Any worker that goes into the paused list must @@ -223,17 +243,22 @@ void Workers::Worker::runImpl () if (--m_workers.m_activeCount == 0) m_workers.m_allPaused.signal (); - Thread::setCurrentThreadName ("(" + getThreadName() + ")"); + setInactiveThreadName (threadName_); // [1] We will be here when the paused list is popped // - // We block on our event object, a requirement of being + // We block on our condition_variable, wakeup_, a requirement of being // put into the paused list. // - // This will get signaled on either a reactivate or a stopThread() - // - wait (); - } + // wakeup_ will get signaled by either Worker::notify() or ~Worker. + { + std::unique_lock lock {mutex_}; + wakeup_.wait (lock, [this] {return this->wakeCount_ > 0;}); + + shouldExit = shouldExit_; + --wakeCount_; + } + } while (! shouldExit); } } // ripple diff --git a/src/ripple/core/impl/Workers.h b/src/ripple/core/impl/Workers.h index f9af11483..0a475f0cd 100644 --- a/src/ripple/core/impl/Workers.h +++ b/src/ripple/core/impl/Workers.h @@ -24,6 +24,8 @@ #include #include #include +#include +#include #include #include @@ -119,19 +121,27 @@ private: class Worker : public beast::LockFreeStack ::Node , public beast::LockFreeStack ::Node - , public beast::Thread { public: Worker (Workers& workers, std::string const& threadName); ~Worker (); + void notify (); + private: - void run () override; + void run (); void runImpl (); private: Workers& m_workers; + std::string const threadName_; + + std::thread thread_; + std::mutex mutex_; + std::condition_variable wakeup_; + int wakeCount_; // how many times to un-pause + bool shouldExit_; }; private: diff --git a/src/ripple/core/impl/semaphore.h b/src/ripple/core/impl/semaphore.h index eceff0dd7..4b4175251 100644 --- a/src/ripple/core/impl/semaphore.h +++ b/src/ripple/core/impl/semaphore.h @@ -54,9 +54,6 @@ public: m_cond.notify_one (); } - // Deprecated, for backward compatibility - void signal () { notify (); } - /** Block until notify is called. */ void wait () { diff --git a/src/test/core/Workers_test.cpp b/src/test/core/Workers_test.cpp index 2a1582e21..31aaf5088 100644 --- a/src/test/core/Workers_test.cpp +++ b/src/test/core/Workers_test.cpp @@ -29,9 +29,9 @@ class Workers_test : public beast::unit_test::suite public: struct TestCallback : Workers::Callback { - explicit TestCallback(int count_) - : finished(false, count_ == 0) - , count(count_) + TestCallback() + : finished(false, false) + , count(0) { } @@ -45,26 +45,41 @@ public: std::atomic count; }; - void testThreads(int const threadCount) + void testThreads(int const tc1, int const tc2, int const tc3) { - testcase("threadCount = " + std::to_string(threadCount)); + testcase("threadCounts: " + std::to_string(tc1) + + " -> " + std::to_string(tc2) + " -> " + std::to_string(tc3)); - TestCallback cb(threadCount); + TestCallback cb; - Workers w(cb, "Test", 0); - BEAST_EXPECT(w.getNumberOfThreads() == 0); + Workers w(cb, "Test", tc1); + BEAST_EXPECT(w.getNumberOfThreads() == tc1); - w.setNumberOfThreads(threadCount); - BEAST_EXPECT(w.getNumberOfThreads() == threadCount); + auto testForThreadCount = [this, &cb, &w] (int const threadCount) + { + // Prepare the callback. + cb.count = threadCount; + if (threadCount == 0) + cb.finished.signal(); + else + cb.finished.reset(); - for (int i = 0; i < threadCount; ++i) - w.addTask(); + // Execute the test. + w.setNumberOfThreads(threadCount); + BEAST_EXPECT(w.getNumberOfThreads() == threadCount); - // 10 seconds should be enough to finish on any system - // - bool signaled = cb.finished.wait(10 * 1000); - BEAST_EXPECT(signaled); + for (int i = 0; i < threadCount; ++i) + w.addTask(); + // 10 seconds should be enough to finish on any system + // + bool const signaled = cb.finished.wait(10 * 1000); + BEAST_EXPECT(signaled); + BEAST_EXPECT(cb.count.load() == 0); + }; + testForThreadCount (tc1); + testForThreadCount (tc2); + testForThreadCount (tc3); w.pauseAllThreadsAndWait(); // We had better finished all our work! @@ -73,15 +88,15 @@ public: void run() { - testThreads(0); - testThreads(1); - testThreads(2); - testThreads(4); - testThreads(16); - testThreads(64); + testThreads( 0, 0, 0); + testThreads( 1, 0, 1); + testThreads( 2, 1, 2); + testThreads( 4, 3, 5); + testThreads(16, 4, 15); + testThreads(64, 3, 65); } }; BEAST_DEFINE_TESTSUITE(Workers, core, ripple); -} \ No newline at end of file +}