diff --git a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj index a7f7a585f1..227c1f85f6 100644 --- a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj +++ b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj @@ -92,6 +92,7 @@ + @@ -352,6 +353,12 @@ true true + + true + true + true + true + true diff --git a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters index 5954681eb9..709802caa9 100644 --- a/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters +++ b/Subtrees/beast/Builds/VisualStudio2012/beast.vcxproj.filters @@ -728,6 +728,9 @@ beast_core\text + + beast_basics\threads + @@ -1135,6 +1138,9 @@ beast_core\text + + beast_basics\threads + diff --git a/Subtrees/beast/modules/beast_basics/beast_basics.cpp b/Subtrees/beast/modules/beast_basics/beast_basics.cpp index 37eeb256cf..099cbe6027 100644 --- a/Subtrees/beast/modules/beast_basics/beast_basics.cpp +++ b/Subtrees/beast/modules/beast_basics/beast_basics.cpp @@ -56,6 +56,7 @@ namespace beast #include "threads/beast_ReadWriteMutex.cpp" #include "threads/beast_ThreadGroup.cpp" #include "threads/beast_ThreadWithCallQueue.cpp" +#include "threads/beast_Workers.cpp" } diff --git a/Subtrees/beast/modules/beast_basics/beast_basics.h b/Subtrees/beast/modules/beast_basics/beast_basics.h index 782bfa387c..d9a5b78ba7 100644 --- a/Subtrees/beast/modules/beast_basics/beast_basics.h +++ b/Subtrees/beast/modules/beast_basics/beast_basics.h @@ -268,6 +268,7 @@ namespace beast #include "threads/beast_ManualCallQueue.h" #include "threads/beast_ParallelFor.h" #include "threads/beast_ThreadWithCallQueue.h" +#include "threads/beast_Workers.h" } diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp new file mode 100644 index 0000000000..33e558e20e --- /dev/null +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp @@ -0,0 +1,175 @@ +//------------------------------------------------------------------------------ +/* + This file is part of Beast: https://github.com/vinniefalco/Beast + Copyright 2013, Vinnie Falco + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +// +// Workers::Worker +// + +Workers::Worker::Worker (Workers& workers) + : Thread ("Worker") + , m_workers (workers) +{ + startThread (); +} + +Workers::Worker::~Worker () +{ + stopThread (); +} + +void Workers::Worker::run () +{ + while (! threadShouldExit ()) + { + m_workers.m_allPaused.reset (); + + ++m_workers.m_activeCount; + + for (;;) + { + m_workers.m_semaphore.wait (); + + // See if we should pause + int pauseCount = m_workers.m_pauseCount.get (); + + if (pauseCount > 0) + { + // Try to decrement + pauseCount = --m_workers.m_pauseCount; + + // Did we get paused> + if (pauseCount >= 0) + { + // Yes, so signal again if we need more threads to pause + if (pauseCount > 0) + m_workers.addTask (); + + break; + } + else + { + // Not paused, undo the decrement + ++m_workers.m_pauseCount; + } + } + + m_workers.m_callback.processTask (); + } + + // must happen before decrementing m_activeCount + m_workers.m_paused.push_front (this); + + if (--m_workers.m_activeCount == 0) + m_workers.m_allPaused.signal (); + + // If we get here then this thread became sidelined via + // a call to setNumberOfThreads. We block on the thread event + // instead of exiting the thread, because it is bad form for + // a server process to constantly create and destroy threads. + // + // The thread event is signaled either to make the thread + // resume participating in tasks, or to make it exit. + // + + wait (); + } +} + +//------------------------------------------------------------------------------ + +// +// Workers +// + +Workers::Workers (Callback& callback, int numberOfThreads) + : m_callback (callback) + , m_semaphore (0) + , m_numberOfThreads (0) +{ + setNumberOfThreads (numberOfThreads); +} + +Workers::~Workers () +{ + setNumberOfThreads (0); + + m_allPaused.wait (); + + deleteWorkers (m_active); + deleteWorkers (m_paused); +} + +void Workers::setNumberOfThreads (int numberOfThreads) +{ + if (numberOfThreads > m_numberOfThreads) + { + int const amount = numberOfThreads - m_numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + Worker* worker = m_paused.pop_front (); + + if (worker != nullptr) + { + worker->notify (); + } + else + { + worker = new Worker (*this); + } + + m_active.push_front (worker); + } + } + else if (numberOfThreads < m_numberOfThreads) + { + int const amount = m_numberOfThreads - numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + ++m_pauseCount; + } + + // pausing threads counts as an "internal task" + m_semaphore.signal (); + } +} + +void Workers::addTask () +{ + m_semaphore.signal (); +} + +void Workers::deleteWorkers (LockFreeStack & stack) +{ + for (;;) + { + Worker* worker = stack.pop_front (); + + if (worker != nullptr) + { + // This call blocks until the thread orderly exits + delete worker; + } + else + { + break; + } + } +} diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h new file mode 100644 index 0000000000..404139d712 --- /dev/null +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h @@ -0,0 +1,94 @@ +//------------------------------------------------------------------------------ +/* + This file is part of Beast: https://github.com/vinniefalco/Beast + Copyright 2013, Vinnie Falco + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef BEAST_WORKERS_H_INCLUDED +#define BEAST_WORKERS_H_INCLUDED + +/** A group of threads that process tasks. +*/ +class Workers +{ +public: + /** Called to perform tasks as needed. */ + struct Callback + { + /** Perform a task. + The call is made on a thread owned by Workers. + */ + virtual void processTask () = 0; + }; + + /** Create the object. + + A number of initial threads may be optionally specified. The + default is to create one thread per CPU. + */ + explicit Workers (Callback& callback, int numberOfThreads = SystemStats::getNumCpus ()); + + ~Workers (); + + /** Set the desired number of threads. + + @note This function is not thread-safe. + */ + void setNumberOfThreads (int numberOfThreads); + + /** Increment the number of tasks. + + The callback will be called for each task. + + @note This function is thread-safe. + */ + void addTask (); + + //-------------------------------------------------------------------------- + +private: + class Worker + : public LockFreeStack ::Node + , public Thread + { + public: + explicit Worker (Workers& workers); + + ~Worker (); + + private: + void run (); + + private: + Workers& m_workers; + }; + +private: + static void deleteWorkers (LockFreeStack & stack); + +private: + Callback& m_callback; + Semaphore m_semaphore; + int m_numberOfThreads; + + WaitableEvent m_allPaused; + Atomic m_activeCount; + Atomic m_pauseCount; + LockFreeStack m_active; + LockFreeStack m_paused; +}; + +#endif