Add class Workers

This commit is contained in:
Vinnie Falco
2013-07-29 08:36:22 -07:00
parent cfa5a3b9ca
commit 9c3f13d66c
3 changed files with 225 additions and 102 deletions

View File

@@ -17,9 +17,111 @@
*/ */
//============================================================================== //==============================================================================
Workers::Workers (Callback& callback, int numberOfThreads)
: m_callback (callback)
, m_allPaused (true, true)
, m_semaphore (0)
, m_numberOfThreads (0)
{
setNumberOfThreads (numberOfThreads);
}
Workers::~Workers ()
{
pauseAllThreadsAndWait ();
deleteWorkers (m_everyone);
}
int Workers::getNumberOfThreads () const noexcept
{
return m_numberOfThreads;
}
// VFALCO NOTE if this function is called quickly to reduce then
// increase the number of threads, it could result in
// more paused threads being created than expected.
// //
// Workers::Worker void Workers::setNumberOfThreads (int numberOfThreads)
// {
if (m_numberOfThreads != numberOfThreads)
{
if (numberOfThreads > m_numberOfThreads)
{
// Increasing the number of working threads
int const amount = numberOfThreads - m_numberOfThreads;
for (int i = 0; i < amount; ++i)
{
// See if we can reuse a paused worker
Worker* worker = m_paused.pop_front ();
if (worker != nullptr)
{
// If we got here then the worker thread is at [1]
// This will unblock their call to wait()
//
worker->notify ();
}
else
{
worker = new Worker (*this);
}
m_everyone.push_front (worker);
}
}
else if (numberOfThreads < m_numberOfThreads)
{
// Decreasing the number of working threads
int const amount = m_numberOfThreads - numberOfThreads;
for (int i = 0; i < amount; ++i)
{
++m_pauseCount;
// Pausing a thread counts as one "internal task"
m_semaphore.signal ();
}
}
m_numberOfThreads = numberOfThreads;
}
}
void Workers::pauseAllThreadsAndWait ()
{
setNumberOfThreads (0);
m_allPaused.wait ();
}
void Workers::addTask ()
{
m_semaphore.signal ();
}
void Workers::deleteWorkers (LockFreeStack <Worker>& stack)
{
for (;;)
{
Worker* const worker = stack.pop_front ();
if (worker != nullptr)
{
// This call blocks until the thread orderly exits
delete worker;
}
else
{
break;
}
}
}
//------------------------------------------------------------------------------
Workers::Worker::Worker (Workers& workers) Workers::Worker::Worker (Workers& workers)
: Thread ("Worker") : Thread ("Worker")
@@ -37,139 +139,138 @@ void Workers::Worker::run ()
{ {
while (! threadShouldExit ()) while (! threadShouldExit ())
{ {
m_workers.m_allPaused.reset (); // Increment the count of active workers, and if
// we are the first one then reset the "all paused" event
++m_workers.m_activeCount; //
if (++m_workers.m_activeCount == 1)
m_workers.m_allPaused.reset ();
for (;;) for (;;)
{ {
// Acquire a task or "internal task."
//
m_workers.m_semaphore.wait (); m_workers.m_semaphore.wait ();
// See if we should pause // See if there's a pause request. This
// counts as an "internal task."
//
int pauseCount = m_workers.m_pauseCount.get (); int pauseCount = m_workers.m_pauseCount.get ();
if (pauseCount > 0) if (pauseCount > 0)
{ {
// Try to decrement // Try to decrement
pauseCount = --m_workers.m_pauseCount; pauseCount = --m_workers.m_pauseCount;
// Did we get paused>
if (pauseCount >= 0) if (pauseCount >= 0)
{ {
// Yes, so signal again if we need more threads to pause // We got paused
if (pauseCount > 0)
m_workers.addTask ();
break; break;
} }
else else
{ {
// Not paused, undo the decrement // Undo our decrement
++m_workers.m_pauseCount; ++m_workers.m_pauseCount;
} }
} }
// We couldn't pause so we must have gotten
// unblocked in order to process a task.
//
m_workers.m_callback.processTask (); m_workers.m_callback.processTask ();
} }
// must happen before decrementing m_activeCount // Any worker that goes into the paused list must
// guarantee that it will eventually block on its
// event object.
//
m_workers.m_paused.push_front (this); m_workers.m_paused.push_front (this);
// Decrement the count of active workers, and if we
// are the last one then signal the "all paused" event.
//
if (--m_workers.m_activeCount == 0) if (--m_workers.m_activeCount == 0)
m_workers.m_allPaused.signal (); m_workers.m_allPaused.signal ();
// If we get here then this thread became sidelined via // [1] We will be here when the paused list is popped
// a call to setNumberOfThreads. We block on the thread event
// instead of exiting the thread, because it is bad form for
// a server process to constantly create and destroy threads.
// //
// The thread event is signaled either to make the thread // We block on our event object, a requirement of being
// resume participating in tasks, or to make it exit. // put into the paused list.
//
// This will get signaled on either a reactivate or a stopThread()
// //
wait (); wait ();
} }
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// class WorkersTests : public UnitTest
// Workers
//
Workers::Workers (Callback& callback, int numberOfThreads)
: m_callback (callback)
, m_semaphore (0)
, m_numberOfThreads (0)
{ {
setNumberOfThreads (numberOfThreads); public:
} WorkersTests () : UnitTest ("Workers", "beast")
Workers::~Workers ()
{
setNumberOfThreads (0);
m_allPaused.wait ();
deleteWorkers (m_active);
deleteWorkers (m_paused);
}
void Workers::setNumberOfThreads (int numberOfThreads)
{
if (numberOfThreads > m_numberOfThreads)
{ {
int const amount = numberOfThreads - m_numberOfThreads;
for (int i = 0; i < amount; ++i)
{
Worker* worker = m_paused.pop_front ();
if (worker != nullptr)
{
worker->notify ();
}
else
{
worker = new Worker (*this);
}
m_active.push_front (worker);
}
} }
else if (numberOfThreads < m_numberOfThreads)
struct TestCallback : Workers::Callback
{ {
int const amount = m_numberOfThreads - numberOfThreads; explicit TestCallback (int count_)
: finished (false, count_ == 0)
for (int i = 0; i < amount; ++i) , count (count_)
{ {
++m_pauseCount;
} }
// pausing threads counts as an "internal task" void processTask ()
m_semaphore.signal (); {
} if (--count == 0)
} finished.signal ();
}
void Workers::addTask () WaitableEvent finished;
{ Atomic <int> count;
m_semaphore.signal (); };
}
void Workers::deleteWorkers (LockFreeStack <Worker>& stack) void testThreads (int const threadCount)
{
for (;;)
{ {
Worker* worker = stack.pop_front (); String s;
s << "threadCount = " << String (threadCount);
beginTestCase (s);
if (worker != nullptr) TestCallback cb (threadCount);
{
// This call blocks until the thread orderly exits Workers w (cb, 0);
delete worker; expect (w.getNumberOfThreads () == 0);
}
else w.setNumberOfThreads (threadCount);
{ expect (w.getNumberOfThreads () == threadCount);
break;
} for (int i = 0; i < threadCount; ++i)
w.addTask ();
// 10 seconds should be enough to finish on any system
//
bool signaled = cb.finished.wait (10 * 1000);
expect (signaled, "timed out");
w.pauseAllThreadsAndWait ();
int const count (cb.count.get ());
expectEquals (count, 0);
} }
}
void runTest ()
{
testThreads (0);
testThreads (1);
testThreads (2);
testThreads (4);
testThreads (16);
testThreads (64);
testThreads (128);
testThreads (256);
testThreads (512);
}
};
static WorkersTests workersTests;

View File

@@ -43,16 +43,28 @@ public:
~Workers (); ~Workers ();
/** Set the desired number of threads. /** Retrieve the desired number of threads.
This just returns the number of active threads that were requested. If
there was a recent call to setNumberOfThreads, the actual number of active
threads may be temporarily different from what was last requested.
@note This function is not thread-safe.
*/
int getNumberOfThreads () const noexcept;
/** Set the desired number of threads.
@note This function is not thread-safe. @note This function is not thread-safe.
*/ */
void setNumberOfThreads (int numberOfThreads); void setNumberOfThreads (int numberOfThreads);
/** Pause and wait for all threads.
@note This function is not thread-safe.
*/
void pauseAllThreadsAndWait ();
/** Increment the number of tasks. /** Increment the number of tasks.
The callback will be called for each task. The callback will be called for each task.
@note This function is thread-safe. @note This function is thread-safe.
*/ */
void addTask (); void addTask ();
@@ -60,8 +72,20 @@ public:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
private: private:
struct PausedTag { };
/* A Worker executes tasks on its provided thread.
These are the states:
Active: Running the task processing loop.
Idle: Active, but blocked on waiting for a task.
Pausd: Blocked waiting to exit or become active.
*/
class Worker class Worker
: public LockFreeStack <Worker>::Node : public LockFreeStack <Worker>::Node
, public LockFreeStack <Worker, PausedTag>::Node
, public Thread , public Thread
{ {
public: public:
@@ -81,14 +105,13 @@ private:
private: private:
Callback& m_callback; Callback& m_callback;
Semaphore m_semaphore; WaitableEvent m_allPaused; // signaled when all threads paused
int m_numberOfThreads; Semaphore m_semaphore; // each pending task is 1 resource
int m_numberOfThreads; // how many we want active now
WaitableEvent m_allPaused; Atomic <int> m_activeCount; // to know when all are paused
Atomic <int> m_activeCount; Atomic <int> m_pauseCount; // how many threads need to pause now
Atomic <int> m_pauseCount; LockFreeStack <Worker> m_everyone; // holds all created workers
LockFreeStack <Worker> m_active; LockFreeStack <Worker, PausedTag> m_paused; // holds just paused workers
LockFreeStack <Worker> m_paused;
}; };
#endif #endif

View File

@@ -6,9 +6,8 @@ Vinnie's List: Changes day to day, descending priority
(Items marked '*' can be handled by others.) (Items marked '*' can be handled by others.)
- Show summary for text output of unit test results - Show summary for text output of unit test results
- Make ProofOfWorkTests manual since they aren't used
* Make everyone check GitHub Issues every day * Make everyone check GitHub Issues every day
- Make ProofOfWorkTests manual since they aren't used
- Do something about the throw() reporting weaknesses: - Do something about the throw() reporting weaknesses:
* Make sure all Sconstruct and .pro builds have debug symbols in release * Make sure all Sconstruct and .pro builds have debug symbols in release
* Replace all throw with beast::Throw() * Replace all throw with beast::Throw()