Add class Workers

This commit is contained in:
Vinnie Falco
2013-07-29 08:36:22 -07:00
parent 933ba3c7d6
commit 47456a1723
6 changed files with 284 additions and 0 deletions

View File

@@ -92,6 +92,7 @@
<ClInclude Include="..\..\modules\beast_basics\threads\beast_SerialFor.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_ThreadGroup.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_ThreadWithCallQueue.h" />
<ClInclude Include="..\..\modules\beast_basics\threads\beast_Workers.h" />
<ClInclude Include="..\..\modules\beast_core\beast_core.h" />
<ClInclude Include="..\..\modules\beast_core\containers\beast_AbstractFifo.h" />
<ClInclude Include="..\..\modules\beast_core\containers\beast_Array.h" />
@@ -352,6 +353,12 @@
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\modules\beast_basics\threads\beast_Workers.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|Win32'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|x64'">true</ExcludedFromBuild>
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Release|x64'">true</ExcludedFromBuild>
</ClCompile>
<ClCompile Include="..\..\modules\beast_core\beast_core.cpp" />
<ClCompile Include="..\..\modules\beast_core\containers\beast_AbstractFifo.cpp">
<ExcludedFromBuild Condition="'$(Configuration)|$(Platform)'=='Debug|Win32'">true</ExcludedFromBuild>

View File

@@ -728,6 +728,9 @@
<ClInclude Include="..\..\modules\beast_core\text\beast_LexicalCast.h">
<Filter>beast_core\text</Filter>
</ClInclude>
<ClInclude Include="..\..\modules\beast_basics\threads\beast_Workers.h">
<Filter>beast_basics\threads</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\modules\beast_core\beast_core.cpp">
@@ -1135,6 +1138,9 @@
<ClCompile Include="..\..\modules\beast_core\text\beast_LexicalCast.cpp">
<Filter>beast_core\text</Filter>
</ClCompile>
<ClCompile Include="..\..\modules\beast_basics\threads\beast_Workers.cpp">
<Filter>beast_basics\threads</Filter>
</ClCompile>
</ItemGroup>
<ItemGroup>
<Text Include="..\..\TODO.txt" />

View File

@@ -56,6 +56,7 @@ namespace beast
#include "threads/beast_ReadWriteMutex.cpp"
#include "threads/beast_ThreadGroup.cpp"
#include "threads/beast_ThreadWithCallQueue.cpp"
#include "threads/beast_Workers.cpp"
}

View File

@@ -268,6 +268,7 @@ namespace beast
#include "threads/beast_ManualCallQueue.h"
#include "threads/beast_ParallelFor.h"
#include "threads/beast_ThreadWithCallQueue.h"
#include "threads/beast_Workers.h"
}

View File

@@ -0,0 +1,175 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
//
// Workers::Worker
//
Workers::Worker::Worker (Workers& workers)
: Thread ("Worker")
, m_workers (workers)
{
startThread ();
}
Workers::Worker::~Worker ()
{
stopThread ();
}
void Workers::Worker::run ()
{
while (! threadShouldExit ())
{
m_workers.m_allPaused.reset ();
++m_workers.m_activeCount;
for (;;)
{
m_workers.m_semaphore.wait ();
// See if we should pause
int pauseCount = m_workers.m_pauseCount.get ();
if (pauseCount > 0)
{
// Try to decrement
pauseCount = --m_workers.m_pauseCount;
// Did we get paused>
if (pauseCount >= 0)
{
// Yes, so signal again if we need more threads to pause
if (pauseCount > 0)
m_workers.addTask ();
break;
}
else
{
// Not paused, undo the decrement
++m_workers.m_pauseCount;
}
}
m_workers.m_callback.processTask ();
}
// must happen before decrementing m_activeCount
m_workers.m_paused.push_front (this);
if (--m_workers.m_activeCount == 0)
m_workers.m_allPaused.signal ();
// If we get here then this thread became sidelined via
// a call to setNumberOfThreads. We block on the thread event
// instead of exiting the thread, because it is bad form for
// a server process to constantly create and destroy threads.
//
// The thread event is signaled either to make the thread
// resume participating in tasks, or to make it exit.
//
wait ();
}
}
//------------------------------------------------------------------------------
//
// Workers
//
Workers::Workers (Callback& callback, int numberOfThreads)
: m_callback (callback)
, m_semaphore (0)
, m_numberOfThreads (0)
{
setNumberOfThreads (numberOfThreads);
}
Workers::~Workers ()
{
setNumberOfThreads (0);
m_allPaused.wait ();
deleteWorkers (m_active);
deleteWorkers (m_paused);
}
void Workers::setNumberOfThreads (int numberOfThreads)
{
if (numberOfThreads > m_numberOfThreads)
{
int const amount = numberOfThreads - m_numberOfThreads;
for (int i = 0; i < amount; ++i)
{
Worker* worker = m_paused.pop_front ();
if (worker != nullptr)
{
worker->notify ();
}
else
{
worker = new Worker (*this);
}
m_active.push_front (worker);
}
}
else if (numberOfThreads < m_numberOfThreads)
{
int const amount = m_numberOfThreads - numberOfThreads;
for (int i = 0; i < amount; ++i)
{
++m_pauseCount;
}
// pausing threads counts as an "internal task"
m_semaphore.signal ();
}
}
void Workers::addTask ()
{
m_semaphore.signal ();
}
void Workers::deleteWorkers (LockFreeStack <Worker>& stack)
{
for (;;)
{
Worker* worker = stack.pop_front ();
if (worker != nullptr)
{
// This call blocks until the thread orderly exits
delete worker;
}
else
{
break;
}
}
}

View File

@@ -0,0 +1,94 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_WORKERS_H_INCLUDED
#define BEAST_WORKERS_H_INCLUDED
/** A group of threads that process tasks.
*/
class Workers
{
public:
/** Called to perform tasks as needed. */
struct Callback
{
/** Perform a task.
The call is made on a thread owned by Workers.
*/
virtual void processTask () = 0;
};
/** Create the object.
A number of initial threads may be optionally specified. The
default is to create one thread per CPU.
*/
explicit Workers (Callback& callback, int numberOfThreads = SystemStats::getNumCpus ());
~Workers ();
/** Set the desired number of threads.
@note This function is not thread-safe.
*/
void setNumberOfThreads (int numberOfThreads);
/** Increment the number of tasks.
The callback will be called for each task.
@note This function is thread-safe.
*/
void addTask ();
//--------------------------------------------------------------------------
private:
class Worker
: public LockFreeStack <Worker>::Node
, public Thread
{
public:
explicit Worker (Workers& workers);
~Worker ();
private:
void run ();
private:
Workers& m_workers;
};
private:
static void deleteWorkers (LockFreeStack <Worker>& stack);
private:
Callback& m_callback;
Semaphore m_semaphore;
int m_numberOfThreads;
WaitableEvent m_allPaused;
Atomic <int> m_activeCount;
Atomic <int> m_pauseCount;
LockFreeStack <Worker> m_active;
LockFreeStack <Worker> m_paused;
};
#endif