Move many Thread related classes

This commit is contained in:
Vinnie Falco
2013-10-04 13:49:24 -07:00
parent 93e9d8622e
commit 6c7f5d093c
48 changed files with 1043 additions and 2158 deletions

View File

@@ -0,0 +1,133 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../../../modules/beast_core/beast_core.h" // for UnitTest
namespace beast {
class AtomicTests : public UnitTest
{
public:
AtomicTests() : UnitTest ("Atomic", "beast") {}
template <typename Type>
void testFloat ()
{
Atomic<Type> a, b;
a = (Type) 21;
memoryBarrier();
/* These are some simple test cases to check the atomics - let me know
if any of these assertions fail on your system!
*/
expect (a.get() == (Type) 21);
expect (a.compareAndSetValue ((Type) 100, (Type) 50) == (Type) 21);
expect (a.get() == (Type) 21);
expect (a.compareAndSetValue ((Type) 101, a.get()) == (Type) 21);
expect (a.get() == (Type) 101);
expect (! a.compareAndSetBool ((Type) 300, (Type) 200));
expect (a.get() == (Type) 101);
expect (a.compareAndSetBool ((Type) 200, a.get()));
expect (a.get() == (Type) 200);
expect (a.exchange ((Type) 300) == (Type) 200);
expect (a.get() == (Type) 300);
b = a;
expect (b.get() == a.get());
}
template <typename Type>
void testInteger ()
{
Atomic<Type> a, b;
a.set ((Type) 10);
expect (a.value == (Type) 10);
expect (a.get() == (Type) 10);
a += (Type) 15;
expect (a.get() == (Type) 25);
memoryBarrier();
a -= (Type) 5;
expect (a.get() == (Type) 20);
expect (++a == (Type) 21);
++a;
expect (--a == (Type) 21);
expect (a.get() == (Type) 21);
memoryBarrier();
testFloat <Type> ();
}
void runTest()
{
beginTestCase ("Misc");
char a1[7];
expect (numElementsInArray(a1) == 7);
int a2[3];
expect (numElementsInArray(a2) == 3);
expect (ByteOrder::swap ((uint16) 0x1122) == 0x2211);
expect (ByteOrder::swap ((uint32) 0x11223344) == 0x44332211);
expect (ByteOrder::swap ((uint64) literal64bit (0x1122334455667788)) == literal64bit (0x8877665544332211));
beginTestCase ("int");
testInteger <int> ();
beginTestCase ("unsigned int");
testInteger <unsigned int> ();
beginTestCase ("int32");
testInteger <int32> ();
beginTestCase ("uint32");
testInteger <uint32> ();
beginTestCase ("long");
testInteger <long> ();
beginTestCase ("void*");
testInteger <void*> ();
beginTestCase ("int*");
testInteger <int*> ();
beginTestCase ("float");
testFloat <float> ();
#if ! BEAST_64BIT_ATOMICS_UNAVAILABLE // 64-bit intrinsics aren't available on some old platforms
beginTestCase ("int64");
testInteger <int64> ();
beginTestCase ("uint64");
testInteger <uint64> ();
beginTestCase ("double");
testFloat <double> ();
#endif
}
};
static AtomicTests atomicTests;
}

View File

@@ -0,0 +1,111 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../RecursiveMutex.h"
#if BEAST_WINDOWS
#include "../StaticAssert.h"
#include <Windows.h>
#undef check
#undef direct
#undef max
#undef min
#undef TYPE_BOOL
namespace beast {
RecursiveMutex::RecursiveMutex()
{
// (just to check the MS haven't changed this structure and broken things...)
#if BEAST_VC7_OR_EARLIER
static_bassert (sizeof (CRITICAL_SECTION) <= 24);
#else
static_bassert (sizeof (CRITICAL_SECTION) <= sizeof (section));
#endif
InitializeCriticalSection ((CRITICAL_SECTION*) section);
}
RecursiveMutex::~RecursiveMutex()
{
DeleteCriticalSection ((CRITICAL_SECTION*) section);
}
void RecursiveMutex::lock() const
{
EnterCriticalSection ((CRITICAL_SECTION*) section);
}
void RecursiveMutex::unlock() const
{
LeaveCriticalSection ((CRITICAL_SECTION*) section);
}
bool RecursiveMutex::try_lock() const
{
return TryEnterCriticalSection ((CRITICAL_SECTION*) section) != FALSE;
}
}
#else
namespace beast {
RecursiveMutex::RecursiveMutex()
{
pthread_mutexattr_t atts;
pthread_mutexattr_init (&atts);
pthread_mutexattr_settype (&atts, PTHREAD_MUTEX_RECURSIVE);
#if ! BEAST_ANDROID
pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT);
#endif
pthread_mutex_init (&mutex, &atts);
pthread_mutexattr_destroy (&atts);
}
RecursiveMutex::~RecursiveMutex()
{
pthread_mutex_destroy (&mutex);
}
void RecursiveMutex::lock() const
{
pthread_mutex_lock (&mutex);
}
void RecursiveMutex::unlock() const
{
pthread_mutex_unlock (&mutex);
}
bool RecursiveMutex::try_lock() const
{
return pthread_mutex_trylock (&mutex) == 0;
}
}
#endif

View File

@@ -0,0 +1,465 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../ServiceQueue.h"
#include "../../../modules/beast_core/beast_core.h" // for UnitTest
namespace beast {
namespace detail {
//------------------------------------------------------------------------------
class ServiceQueueBase::ScopedServiceThread : public List <ScopedServiceThread>::Node
{
public:
explicit ScopedServiceThread (ServiceQueueBase* queue)
: m_saved (ServiceQueueBase::s_service.get())
{
ServiceQueueBase::s_service.get() = queue;
}
~ScopedServiceThread()
{
ServiceQueueBase::s_service.get() = m_saved;
}
private:
ServiceQueueBase* m_saved;
};
//------------------------------------------------------------------------------
ServiceQueueBase::ServiceQueueBase()
{
}
ServiceQueueBase::~ServiceQueueBase()
{
}
std::size_t ServiceQueueBase::poll ()
{
CPUMeter::ScopedActiveTime interval (m_cpuMeter);
std::size_t total (0);
ScopedServiceThread thread (this);
for (;;)
{
std::size_t const n (dequeue());
if (! n)
break;
total += n;
}
return total;
}
std::size_t ServiceQueueBase::poll_one ()
{
CPUMeter::ScopedActiveTime interval (m_cpuMeter);
ScopedServiceThread thread (this);
return dequeue();
}
std::size_t ServiceQueueBase::run ()
{
std::size_t total (0);
ScopedServiceThread thread (this);
while (! stopped())
{
{
CPUMeter::ScopedActiveTime interval (m_cpuMeter);
total += poll ();
}
{
CPUMeter::ScopedIdleTime interval (m_cpuMeter);
wait ();
}
}
return total;
}
std::size_t ServiceQueueBase::run_one ()
{
std::size_t n;
ScopedServiceThread (this);
for (;;)
{
{
CPUMeter::ScopedActiveTime interval (m_cpuMeter);
n = poll_one();
if (n != 0)
break;
}
{
CPUMeter::ScopedIdleTime interval (m_cpuMeter);
wait();
}
}
return n;
}
void ServiceQueueBase::stop ()
{
SharedState::Access state (m_state);
m_stopped.set (1);
for(;;)
{
Waiter* waiting (state->waiting.pop_front());
if (waiting == nullptr)
break;
waiting->signal();
}
}
void ServiceQueueBase::reset()
{
// Must be stopped
bassert (m_stopped.get () != 0);
m_stopped.set (0);
}
// Block on the event if there are no items
// in the queue and we are not stopped.
//
void ServiceQueueBase::wait ()
{
Waiter* waiter (nullptr);
{
SharedState::Access state (m_state);
if (stopped ())
return;
if (! state->handlers.empty())
return;
waiter = state->unused.pop_front();
if (! waiter)
waiter = new_waiter();
state->waiting.push_front (waiter);
}
waiter->wait();
// Waiter got taken off the waiting list
{
SharedState::Access state (m_state);
state->unused.push_front (waiter);
}
}
void ServiceQueueBase::enqueue (Item* item)
{
Waiter* waiter;
{
SharedState::Access state (m_state);
state->handlers.push_back (*item);
// Signal a Waiter if one exists
waiter = state->waiting.pop_front();
}
if (waiter != nullptr)
waiter->signal();
}
// A thread can only be blocked on one ServiceQueue so we store the pointer
// to which ServiceQueue it is blocked on to determine if the thread belongs
// to that queue.
//
ThreadLocalValue <ServiceQueueBase*> ServiceQueueBase::s_service;
}
//------------------------------------------------------------------------------
namespace detail
{
//------------------------------------------------------------------------------
class ServiceQueueTimingTests
: public UnitTest
{
public:
class Stopwatch
{
public:
Stopwatch () { start(); }
void start () { m_startTime = Time::getHighResolutionTicks (); }
double getElapsed ()
{
int64 const now = Time::getHighResolutionTicks();
return Time::highResolutionTicksToSeconds (now - m_startTime);
}
private:
int64 m_startTime;
};
static int const callsPerThread = 50000;
//--------------------------------------------------------------------------
template <typename ServiceType>
struct Consumer : Thread
{
ServiceType& m_service;
Random m_random;
String m_string;
Consumer (int id, int64 seedValue, ServiceType& service)
: Thread ("C#" + String::fromNumber (id))
, m_service (service)
, m_random (seedValue)
{ startThread(); }
~Consumer ()
{ stopThread(); }
static Consumer*& thread()
{
static ThreadLocalValue <Consumer*> local;
return local.get();
}
static void stop_one ()
{ thread()->signalThreadShouldExit(); }
static void handler ()
{ thread()->do_handler(); }
void do_handler()
{
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
}
void run ()
{
thread() = this;
while (! threadShouldExit())
m_service.run_one();
}
};
//--------------------------------------------------------------------------
template <typename ServiceType>
struct Producer : Thread
{
ServiceType& m_service;
Random m_random;
String m_string;
Producer (int id, int64 seedValue, ServiceType& service)
: Thread ("P#" + String::fromNumber (id))
, m_service (service)
, m_random (seedValue)
{ }
~Producer ()
{ stopThread(); }
void run ()
{
for (std::size_t i = 0; i < callsPerThread; ++i)
{
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
m_service.dispatch (bind (&Consumer<ServiceType>::handler));
}
}
};
//--------------------------------------------------------------------------
template <typename Allocator>
void testThreads (std::size_t nConsumers, std::size_t nProducers)
{
beginTestCase (String::fromNumber (nConsumers) + " consumers, " +
String::fromNumber (nProducers) + " producers, " +
"Allocator = " + std::string(typeid(Allocator).name()));
typedef ServiceQueueType <Allocator> ServiceType;
ServiceType service (nConsumers);
std::vector <ScopedPointer <Consumer <ServiceType> > > consumers;
std::vector <ScopedPointer <Producer <ServiceType> > > producers;
consumers.reserve (nConsumers);
producers.reserve (nProducers);
for (std::size_t i = 0; i < nConsumers; ++i)
consumers.push_back (new Consumer <ServiceType> (i + 1,
random().nextInt64(), service));
for (std::size_t i = 0; i < nProducers; ++i)
producers.push_back (new Producer <ServiceType> (i + 1,
random().nextInt64(), service));
Stopwatch t;
for (std::size_t i = 0; i < producers.size(); ++i)
producers[i]->startThread();
for (std::size_t i = 0; i < producers.size(); ++i)
producers[i]->waitForThreadToExit();
for (std::size_t i = 0; i < consumers.size(); ++i)
service.dispatch (bind (&Consumer <ServiceType>::stop_one));
for (std::size_t i = 0; i < consumers.size(); ++i)
consumers[i]->waitForThreadToExit();
double const seconds (t.getElapsed());
logMessage (String (seconds, 2) + " seconds");
pass();
}
void runTest()
{
#if 1
testThreads <std::allocator<char> > (1, 1);
testThreads <std::allocator<char> > (1, 4);
testThreads <std::allocator<char> > (1, 16);
testThreads <std::allocator<char> > (4, 1);
testThreads <std::allocator<char> > (8, 16);
#endif
#if 0
testThreads <detail::ServiceQueueAllocator<char> > (1, 1);
testThreads <detail::ServiceQueueAllocator<char> > (1, 4);
testThreads <detail::ServiceQueueAllocator<char> > (1, 16);
testThreads <detail::ServiceQueueAllocator<char> > (4, 1);
testThreads <detail::ServiceQueueAllocator<char> > (8, 16);
#endif
}
ServiceQueueTimingTests () : UnitTest ("ServiceQueueTiming", "beast", runManual)
{
}
};
static ServiceQueueTimingTests serviceQueueTimingTests;
//------------------------------------------------------------------------------
class ServiceQueueTests
: public UnitTest
{
public:
struct ServiceThread : Thread
{
Random m_random;
ServiceQueue& m_service;
String m_string;
ServiceThread (int id, int64 seedValue,
ServiceQueue& service)
: Thread ("#" + String::fromNumber (id))
, m_random (seedValue)
, m_service (service)
{
startThread();
}
~ServiceThread ()
{
stopThread();
}
static ServiceThread*& thread()
{
static ThreadLocalValue <ServiceThread*> local;
return local.get();
}
static void stop_one ()
{
thread()->signalThreadShouldExit();
}
static void handler ()
{
thread()->do_handler();
}
void do_handler()
{
#if 1
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
#endif
}
void run ()
{
thread() = this;
while (! threadShouldExit())
m_service.run_one();
}
};
static int const callsPerThread = 10000;
void testThreads (std::size_t n)
{
beginTestCase (String::fromNumber (n) + " threads");
ServiceQueue service (n);
std::vector <ScopedPointer <ServiceThread> > threads;
threads.reserve (n);
for (std::size_t i = 0; i < n; ++i)
threads.push_back (new ServiceThread (i + 1,
random().nextInt64(), service));
for (std::size_t i = n * callsPerThread; i; --i)
service.dispatch (bind (&ServiceThread::handler));
for (std::size_t i = 0; i < threads.size(); ++i)
service.dispatch (bind (&ServiceThread::stop_one));
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->waitForThreadToExit();
pass();
}
void runTest()
{
testThreads (1);
testThreads (4);
testThreads (16);
}
ServiceQueueTests () : UnitTest ("ServiceQueue", "beast")
{
}
};
static ServiceQueueTests serviceQueueTests;
}
}

View File

@@ -0,0 +1,198 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../Stoppable.h"
namespace beast {
Stoppable::Stoppable (char const* name, RootStoppable& root)
: m_name (name)
, m_root (root)
, m_child (this)
, m_stopped (false)
, m_childrenStopped (false)
{
}
Stoppable::Stoppable (char const* name, Stoppable& parent)
: m_name (name)
, m_root (parent.m_root)
, m_child (this)
, m_stopped (false)
, m_childrenStopped (false)
{
// Must not have stopping parent.
bassert (! parent.isStopping());
parent.m_children.push_front (&m_child);
}
Stoppable::~Stoppable ()
{
// Children must be stopped.
bassert (m_childrenStopped);
}
bool Stoppable::isStopping() const
{
return m_root.isStopping();
}
bool Stoppable::isStopped () const
{
return m_stopped;
}
bool Stoppable::areChildrenStopped () const
{
return m_childrenStopped;
}
void Stoppable::stopped ()
{
m_stoppedEvent.signal();
}
void Stoppable::onPrepare ()
{
}
void Stoppable::onStart ()
{
}
void Stoppable::onStop ()
{
stopped();
}
void Stoppable::onChildrenStopped ()
{
}
//------------------------------------------------------------------------------
void Stoppable::prepareRecursive ()
{
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
iter->stoppable->prepareRecursive ();
onPrepare ();
}
void Stoppable::startRecursive ()
{
onStart ();
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
iter->stoppable->startRecursive ();
}
void Stoppable::stopAsyncRecursive ()
{
onStop ();
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
iter->stoppable->stopAsyncRecursive ();
}
void Stoppable::stopRecursive (Journal journal)
{
// Block on each child from the bottom of the tree up.
//
for (Children::const_iterator iter (m_children.cbegin ());
iter != m_children.cend(); ++iter)
iter->stoppable->stopRecursive (journal);
// if we get here then all children have stopped
//
memoryBarrier ();
m_childrenStopped = true;
onChildrenStopped ();
// Now block on this Stoppable.
//
bool const timedOut (! m_stoppedEvent.wait (1 * 1000)); // milliseconds
if (timedOut)
{
journal.warning << "Waiting for '" << m_name << "' to stop";
m_stoppedEvent.wait ();
}
// once we get here, we know the stoppable has stopped.
m_stopped = true;
}
//------------------------------------------------------------------------------
RootStoppable::RootStoppable (char const* name)
: Stoppable (name, *this)
{
}
RootStoppable::~RootStoppable ()
{
}
bool RootStoppable::isStopping() const
{
return m_calledStopAsync.get() != 0;
}
void RootStoppable::prepare ()
{
if (! m_prepared.compareAndSetBool (1, 0))
return;
prepareRecursive ();
}
void RootStoppable::start ()
{
// Courtesy call to prepare.
if (m_prepared.compareAndSetBool (1, 0))
prepareRecursive ();
if (! m_started.compareAndSetBool (1, 0))
return;
startRecursive ();
}
void RootStoppable::stop (Journal journal)
{
if (! m_calledStop.compareAndSetBool (1, 0))
{
journal.warning << "Stoppable::stop called again";
return;
}
stopAsync ();
stopRecursive (journal);
}
void RootStoppable::stopAsync ()
{
if (! m_calledStopAsync.compareAndSetBool (1, 0))
return;
stopAsyncRecursive ();
}
}

View File

@@ -0,0 +1,599 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../Thread.h"
namespace beast {
Thread::Thread (const String& threadName_)
: threadName (threadName_),
threadHandle (nullptr),
threadId (0),
threadPriority (5),
affinityMask (0),
shouldExit (false)
{
}
Thread::~Thread()
{
/* If your thread class's destructor has been called without first stopping the thread, that
means that this partially destructed object is still performing some work - and that's
probably a Bad Thing!
To avoid this type of nastiness, always make sure you call stopThread() before or during
your subclass's destructor.
*/
check_precondition (! isThreadRunning());
stopThread ();
}
//==============================================================================
// Use a ref-counted object to hold this shared data, so that it can outlive its static
// shared pointer when threads are still running during static shutdown.
struct CurrentThreadHolder : public SharedObject
{
CurrentThreadHolder() noexcept {}
typedef SharedPtr <CurrentThreadHolder> Ptr;
ThreadLocalValue<Thread*> value;
};
static char currentThreadHolderLock [sizeof (SpinLock)]; // (statically initialised to zeros).
static SpinLock* castToSpinLockWithoutAliasingWarning (void* s)
{
return static_cast<SpinLock*> (s);
}
static CurrentThreadHolder::Ptr getCurrentThreadHolder()
{
static CurrentThreadHolder::Ptr currentThreadHolder;
SpinLock::ScopedLockType lock (*castToSpinLockWithoutAliasingWarning (currentThreadHolderLock));
if (currentThreadHolder == nullptr)
currentThreadHolder = new CurrentThreadHolder();
return currentThreadHolder;
}
void Thread::threadEntryPoint()
{
const CurrentThreadHolder::Ptr currentThreadHolder (getCurrentThreadHolder());
currentThreadHolder->value = this;
if (threadName.isNotEmpty())
setCurrentThreadName (threadName);
if (startSuspensionEvent.wait (10000))
{
bassert (getCurrentThreadId() == threadId);
if (affinityMask != 0)
setCurrentThreadAffinityMask (affinityMask);
run();
}
currentThreadHolder->value.releaseCurrentThreadStorage();
closeThreadHandle();
}
// used to wrap the incoming call from the platform-specific code
void BEAST_API beast_threadEntryPoint (void* userData)
{
static_cast <Thread*> (userData)->threadEntryPoint();
}
//==============================================================================
void Thread::startThread()
{
const RecursiveMutex::ScopedLockType sl (startStopLock);
shouldExit = false;
if (threadHandle == nullptr)
{
launchThread();
setThreadPriority (threadHandle, threadPriority);
startSuspensionEvent.signal();
}
}
void Thread::startThread (const int priority)
{
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (threadHandle == nullptr)
{
threadPriority = priority;
startThread();
}
else
{
setPriority (priority);
}
}
bool Thread::isThreadRunning() const
{
return threadHandle != nullptr;
}
Thread* Thread::getCurrentThread()
{
return getCurrentThreadHolder()->value.get();
}
//==============================================================================
void Thread::signalThreadShouldExit()
{
shouldExit = true;
}
bool Thread::waitForThreadToExit (const int timeOutMilliseconds) const
{
// Doh! So how exactly do you expect this thread to wait for itself to stop??
bassert (getThreadId() != getCurrentThreadId() || getCurrentThreadId() == 0);
const uint32 timeoutEnd = Time::getMillisecondCounter() + (uint32) timeOutMilliseconds;
while (isThreadRunning())
{
if (timeOutMilliseconds >= 0 && Time::getMillisecondCounter() > timeoutEnd)
return false;
sleep (2);
}
return true;
}
bool Thread::stopThread (const int timeOutMilliseconds)
{
bool cleanExit = true;
// agh! You can't stop the thread that's calling this method! How on earth
// would that work??
bassert (getCurrentThreadId() != getThreadId());
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (isThreadRunning())
{
signalThreadShouldExit();
notify();
if (timeOutMilliseconds != 0)
{
cleanExit = waitForThreadToExit (timeOutMilliseconds);
}
if (isThreadRunning())
{
bassert (! cleanExit);
// very bad karma if this point is reached, as there are bound to be
// locks and events left in silly states when a thread is killed by force..
killThread();
threadHandle = nullptr;
threadId = 0;
cleanExit = false;
}
else
{
cleanExit = true;
}
}
return cleanExit;
}
void Thread::stopThreadAsync ()
{
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (isThreadRunning())
{
signalThreadShouldExit();
notify();
}
}
//==============================================================================
bool Thread::setPriority (const int newPriority)
{
// NB: deadlock possible if you try to set the thread prio from the thread itself,
// so using setCurrentThreadPriority instead in that case.
if (getCurrentThreadId() == getThreadId())
return setCurrentThreadPriority (newPriority);
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (setThreadPriority (threadHandle, newPriority))
{
threadPriority = newPriority;
return true;
}
return false;
}
bool Thread::setCurrentThreadPriority (const int newPriority)
{
return setThreadPriority (0, newPriority);
}
void Thread::setAffinityMask (const uint32 newAffinityMask)
{
affinityMask = newAffinityMask;
}
//==============================================================================
bool Thread::wait (const int timeOutMilliseconds) const
{
return defaultEvent.wait (timeOutMilliseconds);
}
void Thread::notify() const
{
defaultEvent.signal();
}
//==============================================================================
// This is here so we dont have circular includes
//
void SpinLock::enter() const noexcept
{
if (! tryEnter())
{
for (int i = 20; --i >= 0;)
if (tryEnter())
return;
while (! tryEnter())
Thread::yield();
}
}
}
//------------------------------------------------------------------------------
#if BEAST_WINDOWS
#include <windows.h>
#include <process.h>
#include <tchar.h>
namespace beast {
HWND beast_messageWindowHandle = 0; // (this is used by other parts of the codebase)
void BEAST_API beast_threadEntryPoint (void*);
static unsigned int __stdcall threadEntryProc (void* userData)
{
if (beast_messageWindowHandle != 0)
AttachThreadInput (GetWindowThreadProcessId (beast_messageWindowHandle, 0),
GetCurrentThreadId(), TRUE);
beast_threadEntryPoint (userData);
_endthreadex (0);
return 0;
}
void Thread::launchThread()
{
unsigned int newThreadId;
threadHandle = (void*) _beginthreadex (0, 0, &threadEntryProc, this, 0, &newThreadId);
threadId = (ThreadID) newThreadId;
}
void Thread::closeThreadHandle()
{
CloseHandle ((HANDLE) threadHandle);
threadId = 0;
threadHandle = 0;
}
void Thread::killThread()
{
if (threadHandle != 0)
{
#if BEAST_DEBUG
OutputDebugStringA ("** Warning - Forced thread termination **\n");
#endif
TerminateThread (threadHandle, 0);
}
}
void Thread::setCurrentThreadName (const String& name)
{
#if BEAST_DEBUG && BEAST_MSVC
struct
{
DWORD dwType;
LPCSTR szName;
DWORD dwThreadID;
DWORD dwFlags;
} info;
info.dwType = 0x1000;
info.szName = name.toUTF8();
info.dwThreadID = GetCurrentThreadId();
info.dwFlags = 0;
__try
{
RaiseException (0x406d1388 /*MS_VC_EXCEPTION*/, 0, sizeof (info) / sizeof (ULONG_PTR), (ULONG_PTR*) &info);
}
__except (EXCEPTION_CONTINUE_EXECUTION)
{}
#else
(void) name;
#endif
}
Thread::ThreadID Thread::getCurrentThreadId()
{
return (ThreadID) (pointer_sized_int) GetCurrentThreadId();
}
bool Thread::setThreadPriority (void* handle, int priority)
{
int pri = THREAD_PRIORITY_TIME_CRITICAL;
if (priority < 1) pri = THREAD_PRIORITY_IDLE;
else if (priority < 2) pri = THREAD_PRIORITY_LOWEST;
else if (priority < 5) pri = THREAD_PRIORITY_BELOW_NORMAL;
else if (priority < 7) pri = THREAD_PRIORITY_NORMAL;
else if (priority < 9) pri = THREAD_PRIORITY_ABOVE_NORMAL;
else if (priority < 10) pri = THREAD_PRIORITY_HIGHEST;
if (handle == 0)
handle = GetCurrentThread();
return SetThreadPriority (handle, pri) != FALSE;
}
void Thread::setCurrentThreadAffinityMask (const uint32 affinityMask)
{
SetThreadAffinityMask (GetCurrentThread(), affinityMask);
}
struct SleepEvent
{
SleepEvent() noexcept
: handle (CreateEvent (nullptr, FALSE, FALSE,
#if BEAST_DEBUG
_T("BEAST Sleep Event")))
#else
nullptr))
#endif
{}
~SleepEvent() noexcept
{
CloseHandle (handle);
handle = 0;
}
HANDLE handle;
};
static SleepEvent sleepEvent;
void BEAST_CALLTYPE Thread::sleep (const int millisecs)
{
if (millisecs >= 10 || sleepEvent.handle == 0)
{
Sleep ((DWORD) millisecs);
}
else
{
// unlike Sleep() this is guaranteed to return to the current thread after
// the time expires, so we'll use this for short waits, which are more likely
// to need to be accurate
WaitForSingleObject (sleepEvent.handle, (DWORD) millisecs);
}
}
void Thread::yield()
{
Sleep (0);
}
}
//------------------------------------------------------------------------------
#else
#include <time.h>
#if BEAST_BSD
// ???
#else
# include <sys/prctl.h>
#endif
namespace beast {
void BEAST_CALLTYPE Thread::sleep (int millisecs)
{
struct timespec time;
time.tv_sec = millisecs / 1000;
time.tv_nsec = (millisecs % 1000) * 1000000;
nanosleep (&time, nullptr);
}
void BEAST_API beast_threadEntryPoint (void*);
extern "C" void* threadEntryProc (void*);
extern "C" void* threadEntryProc (void* userData)
{
BEAST_AUTORELEASEPOOL
{
#if BEAST_ANDROID
struct AndroidThreadScope
{
AndroidThreadScope() { threadLocalJNIEnvHolder.attach(); }
~AndroidThreadScope() { threadLocalJNIEnvHolder.detach(); }
};
const AndroidThreadScope androidEnv;
#endif
beast_threadEntryPoint (userData);
}
return nullptr;
}
void Thread::launchThread()
{
threadHandle = 0;
pthread_t handle = 0;
if (pthread_create (&handle, 0, threadEntryProc, this) == 0)
{
pthread_detach (handle);
threadHandle = (void*) handle;
threadId = (ThreadID) threadHandle;
}
}
void Thread::closeThreadHandle()
{
threadId = 0;
threadHandle = 0;
}
void Thread::killThread()
{
if (threadHandle != 0)
{
#if BEAST_ANDROID
bassertfalse; // pthread_cancel not available!
#else
pthread_cancel ((pthread_t) threadHandle);
#endif
}
}
void Thread::setCurrentThreadName (const String& name)
{
#if BEAST_IOS || (BEAST_MAC && defined (MAC_OS_X_VERSION_10_5) && MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_5)
BEAST_AUTORELEASEPOOL
{
[[NSThread currentThread] setName: beastStringToNS (name)];
}
#elif BEAST_LINUX
#if (__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2012
pthread_setname_np (pthread_self(), name.toRawUTF8());
#else
prctl (PR_SET_NAME, name.toRawUTF8(), 0, 0, 0);
#endif
#endif
}
bool Thread::setThreadPriority (void* handle, int priority)
{
struct sched_param param;
int policy;
priority = blimit (0, 10, priority);
if (handle == nullptr)
handle = (void*) pthread_self();
if (pthread_getschedparam ((pthread_t) handle, &policy, &param) != 0)
return false;
policy = priority == 0 ? SCHED_OTHER : SCHED_RR;
const int minPriority = sched_get_priority_min (policy);
const int maxPriority = sched_get_priority_max (policy);
param.sched_priority = ((maxPriority - minPriority) * priority) / 10 + minPriority;
return pthread_setschedparam ((pthread_t) handle, policy, &param) == 0;
}
Thread::ThreadID Thread::getCurrentThreadId()
{
return (ThreadID) pthread_self();
}
void Thread::yield()
{
sched_yield();
}
//==============================================================================
/* Remove this macro if you're having problems compiling the cpu affinity
calls (the API for these has changed about quite a bit in various Linux
versions, and a lot of distros seem to ship with obsolete versions)
*/
#if defined (CPU_ISSET) && ! defined (SUPPORT_AFFINITIES)
#define SUPPORT_AFFINITIES 1
#endif
void Thread::setCurrentThreadAffinityMask (const uint32 affinityMask)
{
#if SUPPORT_AFFINITIES
cpu_set_t affinity;
CPU_ZERO (&affinity);
for (int i = 0; i < 32; ++i)
if ((affinityMask & (1 << i)) != 0)
CPU_SET (i, &affinity);
/*
N.B. If this line causes a compile error, then you've probably not got the latest
version of glibc installed.
If you don't want to update your copy of glibc and don't care about cpu affinities,
then you can just disable all this stuff by setting the SUPPORT_AFFINITIES macro to 0.
*/
sched_setaffinity (getpid(), sizeof (cpu_set_t), &affinity);
sched_yield();
#else
/* affinities aren't supported because either the appropriate header files weren't found,
or the SUPPORT_AFFINITIES macro was turned off
*/
bassertfalse;
(void) affinityMask;
#endif
}
}
//------------------------------------------------------------------------------
#endif

View File

@@ -0,0 +1,168 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "../WaitableEvent.h"
#if BEAST_WINDOWS
#include <Windows.h>
#undef check
#undef direct
#undef max
#undef min
#undef TYPE_BOOL
namespace beast {
WaitableEvent::WaitableEvent (const bool manualReset, bool initiallySignaled)
: handle (CreateEvent (0, manualReset ? TRUE : FALSE, initiallySignaled ? TRUE : FALSE, 0))
{
}
WaitableEvent::~WaitableEvent()
{
CloseHandle (handle);
}
void WaitableEvent::signal() const
{
SetEvent (handle);
}
void WaitableEvent::reset() const
{
ResetEvent (handle);
}
bool WaitableEvent::wait () const
{
return WaitForSingleObject (handle, INFINITE) == WAIT_OBJECT_0;
}
bool WaitableEvent::wait (const int timeOutMs) const
{
if (timeOutMs >= 0)
return WaitForSingleObject (handle,
(DWORD) timeOutMs) == WAIT_OBJECT_0;
return wait ();
}
}
#else
namespace beast {
WaitableEvent::WaitableEvent (const bool useManualReset, bool initiallySignaled)
: triggered (false), manualReset (useManualReset)
{
pthread_cond_init (&condition, 0);
pthread_mutexattr_t atts;
pthread_mutexattr_init (&atts);
#if ! BEAST_ANDROID
pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT);
#endif
pthread_mutex_init (&mutex, &atts);
if (initiallySignaled)
signal ();
}
WaitableEvent::~WaitableEvent()
{
pthread_cond_destroy (&condition);
pthread_mutex_destroy (&mutex);
}
bool WaitableEvent::wait () const
{
return wait (-1);
}
bool WaitableEvent::wait (const int timeOutMillisecs) const
{
pthread_mutex_lock (&mutex);
if (! triggered)
{
if (timeOutMillisecs < 0)
{
do
{
pthread_cond_wait (&condition, &mutex);
}
while (! triggered);
}
else
{
struct timeval now;
gettimeofday (&now, 0);
struct timespec time;
time.tv_sec = now.tv_sec + (timeOutMillisecs / 1000);
time.tv_nsec = (now.tv_usec + ((timeOutMillisecs % 1000) * 1000)) * 1000;
if (time.tv_nsec >= 1000000000)
{
time.tv_nsec -= 1000000000;
time.tv_sec++;
}
do
{
if (pthread_cond_timedwait (&condition, &mutex, &time) == ETIMEDOUT)
{
pthread_mutex_unlock (&mutex);
return false;
}
}
while (! triggered);
}
}
if (! manualReset)
triggered = false;
pthread_mutex_unlock (&mutex);
return true;
}
void WaitableEvent::signal() const
{
pthread_mutex_lock (&mutex);
triggered = true;
pthread_cond_broadcast (&condition);
pthread_mutex_unlock (&mutex);
}
void WaitableEvent::reset() const
{
pthread_mutex_lock (&mutex);
triggered = false;
pthread_mutex_unlock (&mutex);
}
}
#endif