diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp index 33e558e20e..b4fa54ec9e 100644 --- a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp @@ -17,9 +17,111 @@ */ //============================================================================== +Workers::Workers (Callback& callback, int numberOfThreads) + : m_callback (callback) + , m_allPaused (true, true) + , m_semaphore (0) + , m_numberOfThreads (0) +{ + setNumberOfThreads (numberOfThreads); +} + +Workers::~Workers () +{ + pauseAllThreadsAndWait (); + + deleteWorkers (m_everyone); +} + +int Workers::getNumberOfThreads () const noexcept +{ + return m_numberOfThreads; +} + +// VFALCO NOTE if this function is called quickly to reduce then +// increase the number of threads, it could result in +// more paused threads being created than expected. // -// Workers::Worker -// +void Workers::setNumberOfThreads (int numberOfThreads) +{ + if (m_numberOfThreads != numberOfThreads) + { + if (numberOfThreads > m_numberOfThreads) + { + // Increasing the number of working threads + + int const amount = numberOfThreads - m_numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + // See if we can reuse a paused worker + Worker* worker = m_paused.pop_front (); + + if (worker != nullptr) + { + // If we got here then the worker thread is at [1] + // This will unblock their call to wait() + // + worker->notify (); + } + else + { + worker = new Worker (*this); + } + + m_everyone.push_front (worker); + } + } + else if (numberOfThreads < m_numberOfThreads) + { + // Decreasing the number of working threads + + int const amount = m_numberOfThreads - numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + ++m_pauseCount; + + // Pausing a thread counts as one "internal task" + m_semaphore.signal (); + } + } + + m_numberOfThreads = numberOfThreads; + } +} + +void Workers::pauseAllThreadsAndWait () +{ + setNumberOfThreads (0); + + m_allPaused.wait (); +} + +void Workers::addTask () +{ + m_semaphore.signal (); +} + +void Workers::deleteWorkers (LockFreeStack & stack) +{ + for (;;) + { + Worker* const worker = stack.pop_front (); + + if (worker != nullptr) + { + // This call blocks until the thread orderly exits + delete worker; + } + else + { + break; + } + } +} + +//------------------------------------------------------------------------------ Workers::Worker::Worker (Workers& workers) : Thread ("Worker") @@ -37,139 +139,138 @@ void Workers::Worker::run () { while (! threadShouldExit ()) { - m_workers.m_allPaused.reset (); - - ++m_workers.m_activeCount; + // Increment the count of active workers, and if + // we are the first one then reset the "all paused" event + // + if (++m_workers.m_activeCount == 1) + m_workers.m_allPaused.reset (); for (;;) { + // Acquire a task or "internal task." + // m_workers.m_semaphore.wait (); - // See if we should pause + // See if there's a pause request. This + // counts as an "internal task." + // int pauseCount = m_workers.m_pauseCount.get (); - + if (pauseCount > 0) { // Try to decrement pauseCount = --m_workers.m_pauseCount; - // Did we get paused> if (pauseCount >= 0) { - // Yes, so signal again if we need more threads to pause - if (pauseCount > 0) - m_workers.addTask (); - + // We got paused break; } else { - // Not paused, undo the decrement + // Undo our decrement ++m_workers.m_pauseCount; } } + // We couldn't pause so we must have gotten + // unblocked in order to process a task. + // m_workers.m_callback.processTask (); } - // must happen before decrementing m_activeCount + // Any worker that goes into the paused list must + // guarantee that it will eventually block on its + // event object. + // m_workers.m_paused.push_front (this); + // Decrement the count of active workers, and if we + // are the last one then signal the "all paused" event. + // if (--m_workers.m_activeCount == 0) m_workers.m_allPaused.signal (); - // If we get here then this thread became sidelined via - // a call to setNumberOfThreads. We block on the thread event - // instead of exiting the thread, because it is bad form for - // a server process to constantly create and destroy threads. + // [1] We will be here when the paused list is popped // - // The thread event is signaled either to make the thread - // resume participating in tasks, or to make it exit. + // We block on our event object, a requirement of being + // put into the paused list. + // + // This will get signaled on either a reactivate or a stopThread() // - wait (); } } //------------------------------------------------------------------------------ -// -// Workers -// - -Workers::Workers (Callback& callback, int numberOfThreads) - : m_callback (callback) - , m_semaphore (0) - , m_numberOfThreads (0) +class WorkersTests : public UnitTest { - setNumberOfThreads (numberOfThreads); -} - -Workers::~Workers () -{ - setNumberOfThreads (0); - - m_allPaused.wait (); - - deleteWorkers (m_active); - deleteWorkers (m_paused); -} - -void Workers::setNumberOfThreads (int numberOfThreads) -{ - if (numberOfThreads > m_numberOfThreads) +public: + WorkersTests () : UnitTest ("Workers", "beast") { - int const amount = numberOfThreads - m_numberOfThreads; - - for (int i = 0; i < amount; ++i) - { - Worker* worker = m_paused.pop_front (); - - if (worker != nullptr) - { - worker->notify (); - } - else - { - worker = new Worker (*this); - } - - m_active.push_front (worker); - } } - else if (numberOfThreads < m_numberOfThreads) + + struct TestCallback : Workers::Callback { - int const amount = m_numberOfThreads - numberOfThreads; - - for (int i = 0; i < amount; ++i) + explicit TestCallback (int count_) + : finished (false, count_ == 0) + , count (count_) { - ++m_pauseCount; } - // pausing threads counts as an "internal task" - m_semaphore.signal (); - } -} + void processTask () + { + if (--count == 0) + finished.signal (); + } -void Workers::addTask () -{ - m_semaphore.signal (); -} + WaitableEvent finished; + Atomic count; + }; -void Workers::deleteWorkers (LockFreeStack & stack) -{ - for (;;) + void testThreads (int const threadCount) { - Worker* worker = stack.pop_front (); + String s; + s << "threadCount = " << String (threadCount); + beginTestCase (s); - if (worker != nullptr) - { - // This call blocks until the thread orderly exits - delete worker; - } - else - { - break; - } + TestCallback cb (threadCount); + + Workers w (cb, 0); + expect (w.getNumberOfThreads () == 0); + + w.setNumberOfThreads (threadCount); + expect (w.getNumberOfThreads () == threadCount); + + for (int i = 0; i < threadCount; ++i) + w.addTask (); + + // 10 seconds should be enough to finish on any system + // + bool signaled = cb.finished.wait (10 * 1000); + + expect (signaled, "timed out"); + + w.pauseAllThreadsAndWait (); + + int const count (cb.count.get ()); + + expectEquals (count, 0); } -} + + void runTest () + { + testThreads (0); + testThreads (1); + testThreads (2); + testThreads (4); + testThreads (16); + testThreads (64); + testThreads (128); + testThreads (256); + testThreads (512); + } +}; + +static WorkersTests workersTests; diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h index 404139d712..22e72bb816 100644 --- a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h @@ -43,16 +43,28 @@ public: ~Workers (); - /** Set the desired number of threads. + /** Retrieve the desired number of threads. + This just returns the number of active threads that were requested. If + there was a recent call to setNumberOfThreads, the actual number of active + threads may be temporarily different from what was last requested. + + @note This function is not thread-safe. + */ + int getNumberOfThreads () const noexcept; + + /** Set the desired number of threads. @note This function is not thread-safe. */ void setNumberOfThreads (int numberOfThreads); + /** Pause and wait for all threads. + @note This function is not thread-safe. + */ + void pauseAllThreadsAndWait (); + /** Increment the number of tasks. - The callback will be called for each task. - @note This function is thread-safe. */ void addTask (); @@ -60,8 +72,20 @@ public: //-------------------------------------------------------------------------- private: + struct PausedTag { }; + + /* A Worker executes tasks on its provided thread. + + These are the states: + + Active: Running the task processing loop. + Idle: Active, but blocked on waiting for a task. + Pausd: Blocked waiting to exit or become active. + */ + class Worker : public LockFreeStack ::Node + , public LockFreeStack ::Node , public Thread { public: @@ -81,14 +105,13 @@ private: private: Callback& m_callback; - Semaphore m_semaphore; - int m_numberOfThreads; - - WaitableEvent m_allPaused; - Atomic m_activeCount; - Atomic m_pauseCount; - LockFreeStack m_active; - LockFreeStack m_paused; + WaitableEvent m_allPaused; // signaled when all threads paused + Semaphore m_semaphore; // each pending task is 1 resource + int m_numberOfThreads; // how many we want active now + Atomic m_activeCount; // to know when all are paused + Atomic m_pauseCount; // how many threads need to pause now + LockFreeStack m_everyone; // holds all created workers + LockFreeStack m_paused; // holds just paused workers }; #endif diff --git a/TODO.txt b/TODO.txt index b138053fdf..8d7c502fda 100644 --- a/TODO.txt +++ b/TODO.txt @@ -6,9 +6,8 @@ Vinnie's List: Changes day to day, descending priority (Items marked '*' can be handled by others.) - Show summary for text output of unit test results -- Make ProofOfWorkTests manual since they aren't used - * Make everyone check GitHub Issues every day +- Make ProofOfWorkTests manual since they aren't used - Do something about the throw() reporting weaknesses: * Make sure all Sconstruct and .pro builds have debug symbols in release * Replace all throw with beast::Throw()