Reduce Beast dependencies by leveraging C++11 features:

* Remove beast::Atomic (RIPD-728):
  * Use std-provided alternatives
  * Eliminate atomic variables where possible

* Cleanup beast::Thread interface:
  * Use std::string instead of beast::String
  * Remove unused functions and parameters

* Remove unused code:
  * beast::ThreadLocalValue
  * beast::ServiceQueue
This commit is contained in:
Nik Bougalis
2014-12-22 02:45:36 -08:00
parent c01b4e6baa
commit 1d6721d345
20 changed files with 47 additions and 2229 deletions

View File

@@ -1,429 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_ATOMIC_H_INCLUDED
#define BEAST_ATOMIC_H_INCLUDED
#include <beast/Config.h>
#include <beast/utility/noexcept.h>
#include <cstdint>
namespace beast {
//==============================================================================
/**
Simple class to hold a primitive value and perform atomic operations on it.
The type used must be a 32 or 64 bit primitive, like an int, pointer, etc.
There are methods to perform most of the basic atomic operations.
*/
template <typename Type>
class Atomic
{
// This class can only be used for types which are 32 or 64 bits in size.
static_assert (sizeof (Type) == 4 || sizeof (Type) == 8,
"Atomic arguments must be 32- or 64-bit long primitive types.");
public:
/** Creates a new value, initialised to zero. */
inline Atomic() noexcept
: value (0)
{
}
/** Creates a new value, with a given initial value. */
inline Atomic (const Type initialValue) noexcept
: value (initialValue)
{
}
/** Copies another value (atomically). */
inline Atomic (const Atomic& other) noexcept
: value (other.get())
{
}
/** Destructor. */
inline ~Atomic() noexcept
{
}
/** Atomically reads and returns the current value. */
Type get() const noexcept;
/** Copies another value onto this one (atomically). */
Atomic& operator= (const Atomic& other) noexcept
{ exchange (other.get()); return *this; }
/** Copies another value onto this one (atomically). */
Atomic& operator= (const Type newValue) noexcept
{ exchange (newValue); return *this; }
/** Atomically sets the current value. */
void set (Type newValue) noexcept
{ exchange (newValue); }
/** Atomically sets the current value, returning the value that was replaced. */
Type exchange (Type value) noexcept;
/** Atomically adds a number to this value, returning the new value. */
Type operator+= (Type amountToAdd) noexcept;
/** Atomically subtracts a number from this value, returning the new value. */
Type operator-= (Type amountToSubtract) noexcept;
/** Atomically increments this value, returning the new value. */
Type operator++() noexcept;
/** Atomically decrements this value, returning the new value. */
Type operator--() noexcept;
/** Atomically compares this value with a target value, and if it is equal, sets
this to be equal to a new value.
This operation is the atomic equivalent of doing this:
@code
bool compareAndSetBool (Type newValue, Type valueToCompare)
{
if (get() == valueToCompare)
{
set (newValue);
return true;
}
return false;
}
@endcode
@returns true if the comparison was true and the value was replaced; false if
the comparison failed and the value was left unchanged.
@see compareAndSetValue
*/
bool compareAndSetBool (Type newValue, Type valueToCompare) noexcept;
/** Atomically compares this value with a target value, and if it is equal, sets
this to be equal to a new value.
This operation is the atomic equivalent of doing this:
@code
Type compareAndSetValue (Type newValue, Type valueToCompare)
{
Type oldValue = get();
if (oldValue == valueToCompare)
set (newValue);
return oldValue;
}
@endcode
@returns the old value before it was changed.
@see compareAndSetBool
*/
Type compareAndSetValue (Type newValue, Type valueToCompare) noexcept;
//==============================================================================
#if BEAST_64BIT
BEAST_ALIGN (8)
#else
BEAST_ALIGN (4)
#endif
/** The raw value that this class operates on.
This is exposed publically in case you need to manipulate it directly
for performance reasons.
*/
volatile Type value;
private:
template <typename Dest, typename Source>
static inline Dest castTo (Source value) noexcept { union { Dest d; Source s; } u; u.s = value; return u.d; }
static inline Type castFrom32Bit (std::int32_t value) noexcept { return castTo <Type, std::int32_t> (value); }
static inline Type castFrom64Bit (std::int64_t value) noexcept { return castTo <Type, std::int64_t> (value); }
static inline std::int32_t castTo32Bit (Type value) noexcept { return castTo <std::int32_t, Type> (value); }
static inline std::int64_t castTo64Bit (Type value) noexcept { return castTo <std::int64_t, Type> (value); }
Type operator++ (int); // better to just use pre-increment with atomics..
Type operator-- (int);
/** This templated negate function will negate pointers as well as integers */
template <typename ValueType>
inline ValueType negateValue (ValueType n) noexcept
{
return sizeof (ValueType) == 1 ? (ValueType) -(signed char) n
: (sizeof (ValueType) == 2 ? (ValueType) -(short) n
: (sizeof (ValueType) == 4 ? (ValueType) -(int) n
: ((ValueType) -(std::int64_t) n)));
}
/** This templated negate function will negate pointers as well as integers */
template <typename PointerType>
inline PointerType* negateValue (PointerType* n) noexcept
{
return reinterpret_cast <PointerType*> (-reinterpret_cast <std::intptr_t> (n));
}
};
//==============================================================================
/*
The following code is in the header so that the atomics can be inlined where possible...
*/
#if BEAST_IOS || (BEAST_MAC && (BEAST_PPC || BEAST_CLANG || __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 2)))
#define BEAST_ATOMICS_MAC 1 // Older OSX builds using gcc4.1 or earlier
#if MAC_OS_X_VERSION_MIN_REQUIRED < MAC_OS_X_VERSION_10_5
#define BEAST_MAC_ATOMICS_VOLATILE
#else
#define BEAST_MAC_ATOMICS_VOLATILE volatile
#endif
#if BEAST_PPC || BEAST_IOS
// None of these atomics are available for PPC or for iOS 3.1 or earlier!!
template <typename Type> static Type OSAtomicAdd64Barrier (Type b, BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return *a += b; }
template <typename Type> static Type OSAtomicIncrement64Barrier (BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return ++*a; }
template <typename Type> static Type OSAtomicDecrement64Barrier (BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return --*a; }
template <typename Type> static bool OSAtomicCompareAndSwap64Barrier (Type old, Type newValue, BEAST_MAC_ATOMICS_VOLATILE Type* value) noexcept
{ bassertfalse; if (old == *value) { *value = newValue; return true; } return false; }
#define BEAST_64BIT_ATOMICS_UNAVAILABLE 1
#endif
#elif BEAST_CLANG && BEAST_LINUX
#define BEAST_ATOMICS_GCC 1
//==============================================================================
#elif BEAST_GCC
#define BEAST_ATOMICS_GCC 1 // GCC with intrinsics
#if BEAST_IOS || BEAST_ANDROID // (64-bit ops will compile but not link on these mobile OSes)
#define BEAST_64BIT_ATOMICS_UNAVAILABLE 1
#endif
//==============================================================================
#else
#define BEAST_ATOMICS_WINDOWS 1 // Windows with intrinsics
#if BEAST_USE_INTRINSICS
#ifndef __INTEL_COMPILER
#pragma intrinsic (_InterlockedExchange, _InterlockedIncrement, _InterlockedDecrement, _InterlockedCompareExchange, \
_InterlockedCompareExchange64, _InterlockedExchangeAdd, _ReadWriteBarrier)
#endif
#define beast_InterlockedExchange(a, b) _InterlockedExchange(a, b)
#define beast_InterlockedIncrement(a) _InterlockedIncrement(a)
#define beast_InterlockedDecrement(a) _InterlockedDecrement(a)
#define beast_InterlockedExchangeAdd(a, b) _InterlockedExchangeAdd(a, b)
#define beast_InterlockedCompareExchange(a, b, c) _InterlockedCompareExchange(a, b, c)
#define beast_InterlockedCompareExchange64(a, b, c) _InterlockedCompareExchange64(a, b, c)
#define beast_MemoryBarrier _ReadWriteBarrier
#else
long beast_InterlockedExchange (volatile long* a, long b) noexcept;
long beast_InterlockedIncrement (volatile long* a) noexcept;
long beast_InterlockedDecrement (volatile long* a) noexcept;
long beast_InterlockedExchangeAdd (volatile long* a, long b) noexcept;
long beast_InterlockedCompareExchange (volatile long* a, long b, long c) noexcept;
__int64 beast_InterlockedCompareExchange64 (volatile __int64* a, __int64 b, __int64 c) noexcept;
inline void beast_MemoryBarrier() noexcept { long x = 0; beast_InterlockedIncrement (&x); }
#endif
#if BEAST_64BIT
#ifndef __INTEL_COMPILER
#pragma intrinsic (_InterlockedExchangeAdd64, _InterlockedExchange64, _InterlockedIncrement64, _InterlockedDecrement64)
#endif
#define beast_InterlockedExchangeAdd64(a, b) _InterlockedExchangeAdd64(a, b)
#define beast_InterlockedExchange64(a, b) _InterlockedExchange64(a, b)
#define beast_InterlockedIncrement64(a) _InterlockedIncrement64(a)
#define beast_InterlockedDecrement64(a) _InterlockedDecrement64(a)
#else
// None of these atomics are available in a 32-bit Windows build!!
template <typename Type> static Type beast_InterlockedExchangeAdd64 (volatile Type* a, Type b) noexcept { bassertfalse; Type old = *a; *a += b; return old; }
template <typename Type> static Type beast_InterlockedExchange64 (volatile Type* a, Type b) noexcept { bassertfalse; Type old = *a; *a = b; return old; }
template <typename Type> static Type beast_InterlockedIncrement64 (volatile Type* a) noexcept { bassertfalse; return ++*a; }
template <typename Type> static Type beast_InterlockedDecrement64 (volatile Type* a) noexcept { bassertfalse; return --*a; }
#define BEAST_64BIT_ATOMICS_UNAVAILABLE 1
#endif
#endif
#if BEAST_MSVC
#pragma warning (push)
#pragma warning (disable: 4311) // (truncation warning)
#endif
//==============================================================================
template <typename Type>
inline Type Atomic<Type>::get() const noexcept
{
#if BEAST_ATOMICS_MAC
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) OSAtomicAdd32Barrier ((int32_t) 0, (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value))
: castFrom64Bit ((std::int64_t) OSAtomicAdd64Barrier ((int64_t) 0, (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value));
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedExchangeAdd ((volatile long*) &value, (long) 0))
: castFrom64Bit ((std::int64_t) beast_InterlockedExchangeAdd64 ((volatile __int64*) &value, (__int64) 0));
#elif BEAST_ATOMICS_GCC
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) __sync_add_and_fetch ((volatile std::int32_t*) &value, 0))
: castFrom64Bit ((std::int64_t) __sync_add_and_fetch ((volatile std::int64_t*) &value, 0));
#endif
}
template <typename Type>
inline Type Atomic<Type>::exchange (const Type newValue) noexcept
{
#if BEAST_ATOMICS_MAC || BEAST_ATOMICS_GCC
Type currentVal = value;
while (! compareAndSetBool (newValue, currentVal)) { currentVal = value; }
return currentVal;
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedExchange ((volatile long*) &value, (long) castTo32Bit (newValue)))
: castFrom64Bit ((std::int64_t) beast_InterlockedExchange64 ((volatile __int64*) &value, (__int64) castTo64Bit (newValue)));
#endif
}
template <typename Type>
inline Type Atomic<Type>::operator+= (const Type amountToAdd) noexcept
{
#if BEAST_ATOMICS_MAC
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast"
# pragma clang diagnostic ignored "-Wint-to-pointer-cast"
# endif
return sizeof (Type) == 4 ? (Type) OSAtomicAdd32Barrier ((int32_t) castTo32Bit (amountToAdd), (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value)
: (Type) OSAtomicAdd64Barrier ((int64_t) amountToAdd, (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value);
# ifdef __clang__
# pragma clang diagnostic pop
# endif
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? (Type) (beast_InterlockedExchangeAdd ((volatile long*) &value, (long) amountToAdd) + (long) amountToAdd)
: (Type) (beast_InterlockedExchangeAdd64 ((volatile __int64*) &value, (__int64) amountToAdd) + (__int64) amountToAdd);
#elif BEAST_ATOMICS_GCC
return (Type) __sync_add_and_fetch (&value, amountToAdd);
#endif
}
template <typename Type>
inline Type Atomic<Type>::operator-= (const Type amountToSubtract) noexcept
{
return operator+= (negateValue (amountToSubtract));
}
template <typename Type>
inline Type Atomic<Type>::operator++() noexcept
{
#if BEAST_ATOMICS_MAC
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast"
# pragma clang diagnostic ignored "-Wint-to-pointer-cast"
# endif
return sizeof (Type) == 4 ? (Type) OSAtomicIncrement32Barrier ((BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value)
: (Type) OSAtomicIncrement64Barrier ((BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value);
# ifdef __clang__
# pragma clang diagnostic pop
# endif
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? (Type) beast_InterlockedIncrement ((volatile long*) &value)
: (Type) beast_InterlockedIncrement64 ((volatile __int64*) &value);
#elif BEAST_ATOMICS_GCC
return (Type) __sync_add_and_fetch (&value, (Type) 1);
#endif
}
template <typename Type>
inline Type Atomic<Type>::operator--() noexcept
{
#if BEAST_ATOMICS_MAC
# ifdef __clang__
# pragma clang diagnostic push
# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast"
# pragma clang diagnostic ignored "-Wint-to-pointer-cast"
# endif
return sizeof (Type) == 4 ? (Type) OSAtomicDecrement32Barrier ((BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value)
: (Type) OSAtomicDecrement64Barrier ((BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value);
# ifdef __clang__
# pragma clang diagnostic pop
# endif
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? (Type) beast_InterlockedDecrement ((volatile long*) &value)
: (Type) beast_InterlockedDecrement64 ((volatile __int64*) &value);
#elif BEAST_ATOMICS_GCC
return (Type) __sync_add_and_fetch (&value, (Type) -1);
#endif
}
template <typename Type>
inline bool Atomic<Type>::compareAndSetBool (const Type newValue, const Type valueToCompare) noexcept
{
#if BEAST_ATOMICS_MAC
return sizeof (Type) == 4 ? OSAtomicCompareAndSwap32Barrier ((int32_t) castTo32Bit (valueToCompare), (int32_t) castTo32Bit (newValue), (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value)
: OSAtomicCompareAndSwap64Barrier ((int64_t) castTo64Bit (valueToCompare), (int64_t) castTo64Bit (newValue), (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value);
#elif BEAST_ATOMICS_WINDOWS
return compareAndSetValue (newValue, valueToCompare) == valueToCompare;
#elif BEAST_ATOMICS_GCC
return sizeof (Type) == 4 ? __sync_bool_compare_and_swap ((volatile std::int32_t*) &value, castTo32Bit (valueToCompare), castTo32Bit (newValue))
: __sync_bool_compare_and_swap ((volatile std::int64_t*) &value, castTo64Bit (valueToCompare), castTo64Bit (newValue));
#endif
}
template <typename Type>
inline Type Atomic<Type>::compareAndSetValue (const Type newValue, const Type valueToCompare) noexcept
{
#if BEAST_ATOMICS_MAC
for (;;) // Annoying workaround for only having a bool CAS operation..
{
if (compareAndSetBool (newValue, valueToCompare))
return valueToCompare;
const Type result = value;
if (result != valueToCompare)
return result;
}
#elif BEAST_ATOMICS_WINDOWS
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedCompareExchange ((volatile long*) &value, (long) castTo32Bit (newValue), (long) castTo32Bit (valueToCompare)))
: castFrom64Bit ((std::int64_t) beast_InterlockedCompareExchange64 ((volatile __int64*) &value, (__int64) castTo64Bit (newValue), (__int64) castTo64Bit (valueToCompare)));
#elif BEAST_ATOMICS_GCC
return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) __sync_val_compare_and_swap ((volatile std::int32_t*) &value, castTo32Bit (valueToCompare), castTo32Bit (newValue)))
: castFrom64Bit ((std::int64_t) __sync_val_compare_and_swap ((volatile std::int64_t*) &value, castTo64Bit (valueToCompare), castTo64Bit (newValue)));
#endif
}
inline void memoryBarrier() noexcept
{
#if BEAST_ATOMICS_MAC
OSMemoryBarrier();
#elif BEAST_ATOMICS_GCC
__sync_synchronize();
#elif BEAST_ATOMICS_WINDOWS
beast_MemoryBarrier();
#endif
}
#if BEAST_MSVC
#pragma warning (pop)
#endif
}
#endif

View File

@@ -25,11 +25,9 @@
#include <beast/threads/SharedLockGuard.h> #include <beast/threads/SharedLockGuard.h>
#include <beast/threads/SharedMutexAdapter.h> #include <beast/threads/SharedMutexAdapter.h>
#include <beast/threads/SharedData.h> #include <beast/threads/SharedData.h>
#include <beast/threads/ServiceQueue.h>
#include <beast/threads/SpinLock.h> #include <beast/threads/SpinLock.h>
#include <beast/threads/Stoppable.h> #include <beast/threads/Stoppable.h>
#include <beast/threads/Thread.h> #include <beast/threads/Thread.h>
#include <beast/threads/ThreadLocalValue.h>
#include <beast/threads/WaitableEvent.h> #include <beast/threads/WaitableEvent.h>
#include <beast/threads/ScopedWrapperContext.h> #include <beast/threads/ScopedWrapperContext.h>

View File

@@ -41,7 +41,6 @@
// New header-only library modeled more closely according to boost // New header-only library modeled more closely according to boost
#include <beast/SmartPtr.h> #include <beast/SmartPtr.h>
#include <beast/Atomic.h>
#include <beast/Arithmetic.h> #include <beast/Arithmetic.h>
#include <beast/ByteOrder.h> #include <beast/ByteOrder.h>
#include <beast/HeapBlock.h> #include <beast/HeapBlock.h>

View File

@@ -24,6 +24,8 @@
#include <beast/smart_ptr/SharedPtr.h> #include <beast/smart_ptr/SharedPtr.h>
#include <beast/module/core/time/AtExitHook.h> #include <beast/module/core/time/AtExitHook.h>
#include <atomic>
namespace beast namespace beast
{ {
@@ -99,7 +101,7 @@ public:
bassert (lifetime == SingletonLifetime::createOnDemand || ! staticData.destructorCalled); bassert (lifetime == SingletonLifetime::createOnDemand || ! staticData.destructorCalled);
staticData.instance = &staticData.object; staticData.instance = &staticData.object;
new (staticData.instance) SharedSingleton (lifetime); new (staticData.instance) SharedSingleton (lifetime);
memoryBarrier(); std::atomic_thread_fence (std::memory_order_seq_cst);
instance = staticData.instance; instance = staticData.instance;
} }
} }

View File

@@ -141,7 +141,7 @@ void Workers::deleteWorkers (LockFreeStack <Worker>& stack)
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
Workers::Worker::Worker (Workers& workers, String const& threadName) Workers::Worker::Worker (Workers& workers, std::string const& threadName)
: Thread (threadName) : Thread (threadName)
, m_workers (workers) , m_workers (workers)
{ {
@@ -232,7 +232,6 @@ void Workers::Worker::run ()
class Workers_test : public unit_test::suite class Workers_test : public unit_test::suite
{ {
public: public:
struct TestCallback : Workers::Callback struct TestCallback : Workers::Callback
{ {
explicit TestCallback (int count_) explicit TestCallback (int count_)
@@ -251,13 +250,6 @@ public:
std::atomic <int> count; std::atomic <int> count;
}; };
template <class T1, class T2>
bool
expectEquals (T1 const& t1, T2 const& t2)
{
return expect (t1 == t2);
}
void testThreads (int const threadCount) void testThreads (int const threadCount)
{ {
testcase ("threadCount = " + std::to_string (threadCount)); testcase ("threadCount = " + std::to_string (threadCount));
@@ -276,14 +268,12 @@ public:
// 10 seconds should be enough to finish on any system // 10 seconds should be enough to finish on any system
// //
bool signaled = cb.finished.wait (10 * 1000); bool signaled = cb.finished.wait (10 * 1000);
expect (signaled, "timed out"); expect (signaled, "timed out");
w.pauseAllThreadsAndWait (); w.pauseAllThreadsAndWait ();
int const count (cb.count.load ()); // We had better finished all our work!
expect (cb.count.load () == 0, "Did not complete task!");
expectEquals (count, 0);
} }
void run () void run ()

View File

@@ -122,7 +122,7 @@ private:
, public Thread , public Thread
{ {
public: public:
Worker (Workers& workers, String const& threadName); Worker (Workers& workers, std::string const& threadName);
~Worker (); ~Worker ();
@@ -138,7 +138,7 @@ private:
private: private:
Callback& m_callback; Callback& m_callback;
String m_threadNames; // The name to give each thread std::string m_threadNames; // The name to give each thread
WaitableEvent m_allPaused; // signaled when all threads paused WaitableEvent m_allPaused; // signaled when all threads paused
semaphore m_semaphore; // each pending task is 1 resource semaphore m_semaphore; // each pending task is 1 resource
int m_numberOfThreads; // how many we want active now int m_numberOfThreads; // how many we want active now

View File

@@ -247,31 +247,6 @@ std::uint32_t Time::getApproximateMillisecondCounter() noexcept
return TimeHelpers::lastMSCounterValue; return TimeHelpers::lastMSCounterValue;
} }
void Time::waitForMillisecondCounter (const std::uint32_t targetTime) noexcept
{
for (;;)
{
const std::uint32_t now = getMillisecondCounter();
if (now >= targetTime)
break;
const int toWait = (int) (targetTime - now);
if (toWait > 2)
{
Thread::sleep (std::min (20, toWait >> 1));
}
else
{
// xxx should consider using mutex_pause on the mac as it apparently
// makes it seem less like a spinlock and avoids lowering the thread pri.
for (int i = 10; --i >= 0;)
std::this_thread::yield();
}
}
}
//============================================================================== //==============================================================================
double Time::highResolutionTicksToSeconds (const std::int64_t ticks) noexcept double Time::highResolutionTicksToSeconds (const std::int64_t ticks) noexcept
{ {

View File

@@ -324,12 +324,6 @@ public:
*/ */
static double getMillisecondCounterHiRes() noexcept; static double getMillisecondCounterHiRes() noexcept;
/** Waits until the getMillisecondCounter() reaches a given value.
This will make the thread sleep as efficiently as it can while it's waiting.
*/
static void waitForMillisecondCounter (std::uint32_t targetTime) noexcept;
/** Less-accurate but faster version of getMillisecondCounter(). /** Less-accurate but faster version of getMillisecondCounter().
This will return the last value that getMillisecondCounter() returned, so doesn't This will return the last value that getMillisecondCounter() returned, so doesn't

View File

@@ -56,11 +56,10 @@ namespace beast {
Once a new SharedObject has been assigned to a pointer, be Once a new SharedObject has been assigned to a pointer, be
careful not to delete the object manually. careful not to delete the object manually.
This class uses an Atomic<int> value to hold the reference count, so that it This class uses an std::atomic<int> value to hold the reference count, so
the pointers can be passed between threads safely. For a faster but non-thread-safe that the pointers can be passed between threads safely.
version, use SingleThreadedSharedObject instead.
@see SharedPtr, SharedObjectArray, SingleThreadedSharedObject @see SharedPtr, SharedObjectArray
*/ */
class SharedObject class SharedObject
{ {

View File

@@ -25,10 +25,9 @@
#define BEAST_CHARPOINTER_UTF16_H_INCLUDED #define BEAST_CHARPOINTER_UTF16_H_INCLUDED
#include <beast/Config.h> #include <beast/Config.h>
#include <beast/Atomic.h>
#include <beast/strings/CharacterFunctions.h> #include <beast/strings/CharacterFunctions.h>
#include <atomic>
#include <cstdint> #include <cstdint>
namespace beast { namespace beast {
@@ -474,7 +473,7 @@ public:
/** Atomically swaps this pointer for a new value, returning the previous value. */ /** Atomically swaps this pointer for a new value, returning the previous value. */
CharPointer_UTF16 atomicSwap (const CharPointer_UTF16 newValue) CharPointer_UTF16 atomicSwap (const CharPointer_UTF16 newValue)
{ {
return CharPointer_UTF16 (reinterpret_cast <Atomic<CharType*>&> (data).exchange (newValue.data)); return CharPointer_UTF16 (reinterpret_cast <std::atomic<CharType*>&> (data).exchange (newValue.data));
} }
/** These values are the byte-order-mark (BOM) values for a UTF-16 stream. */ /** These values are the byte-order-mark (BOM) values for a UTF-16 stream. */

View File

@@ -25,10 +25,9 @@
#define BEAST_CHARPOINTER_UTF32_H_INCLUDED #define BEAST_CHARPOINTER_UTF32_H_INCLUDED
#include <beast/Config.h> #include <beast/Config.h>
#include <beast/Atomic.h>
#include <beast/strings/CharacterFunctions.h> #include <beast/strings/CharacterFunctions.h>
#include <atomic>
#include <cwchar> #include <cwchar>
namespace beast { namespace beast {
@@ -370,7 +369,7 @@ public:
/** Atomically swaps this pointer for a new value, returning the previous value. */ /** Atomically swaps this pointer for a new value, returning the previous value. */
CharPointer_UTF32 atomicSwap (const CharPointer_UTF32 newValue) CharPointer_UTF32 atomicSwap (const CharPointer_UTF32 newValue)
{ {
return CharPointer_UTF32 (reinterpret_cast <Atomic<CharType*>&> (data).exchange (newValue.data)); return CharPointer_UTF32 (reinterpret_cast <std::atomic<CharType*>&> (data).exchange (newValue.data));
} }
private: private:

View File

@@ -25,10 +25,9 @@
#define BEAST_CHARPOINTER_UTF8_H_INCLUDED #define BEAST_CHARPOINTER_UTF8_H_INCLUDED
#include <beast/Config.h> #include <beast/Config.h>
#include <beast/Atomic.h>
#include <beast/strings/CharacterFunctions.h> #include <beast/strings/CharacterFunctions.h>
#include <atomic>
#include <cstdlib> #include <cstdlib>
#include <cstring> #include <cstring>
@@ -552,7 +551,7 @@ public:
/** Atomically swaps this pointer for a new value, returning the previous value. */ /** Atomically swaps this pointer for a new value, returning the previous value. */
CharPointer_UTF8 atomicSwap (const CharPointer_UTF8 newValue) CharPointer_UTF8 atomicSwap (const CharPointer_UTF8 newValue)
{ {
return CharPointer_UTF8 (reinterpret_cast <Atomic<CharType*>&> (data).exchange (newValue.data)); return CharPointer_UTF8 (reinterpret_cast <std::atomic<CharType*>&> (data).exchange (newValue.data));
} }
/** These values are the byte-order mark (BOM) values for a UTF-8 stream. */ /** These values are the byte-order mark (BOM) values for a UTF-8 stream. */

View File

@@ -36,6 +36,7 @@
#include <stdarg.h> #include <stdarg.h>
#include <algorithm> #include <algorithm>
#include <atomic>
namespace beast { namespace beast {
@@ -86,8 +87,9 @@ public:
//============================================================================== //==============================================================================
static CharPointerType createUninitialisedBytes (const size_t numBytes) static CharPointerType createUninitialisedBytes (const size_t numBytes)
{ {
StringHolder* const s = reinterpret_cast <StringHolder*> (new char [sizeof (StringHolder) - sizeof (CharType) + numBytes]); StringHolder* const s = reinterpret_cast <StringHolder*> (
s->refCount.value = 0; new char [sizeof (StringHolder) - sizeof (CharType) + numBytes]);
s->refCount.store (0);
s->allocatedNumBytes = numBytes; s->allocatedNumBytes = numBytes;
return CharPointerType (s->text); return CharPointerType (s->text);
} }
@@ -198,7 +200,7 @@ public:
{ {
StringHolder* const b = bufferFromText (text); StringHolder* const b = bufferFromText (text);
if (b->refCount.get() <= 0) if (b->refCount.load() <= 0)
return text; return text;
CharPointerType newText (createUninitialisedBytes (b->allocatedNumBytes)); CharPointerType newText (createUninitialisedBytes (b->allocatedNumBytes));
@@ -212,7 +214,7 @@ public:
{ {
StringHolder* const b = bufferFromText (text); StringHolder* const b = bufferFromText (text);
if (b->refCount.get() <= 0 && b->allocatedNumBytes >= numBytes) if (b->refCount.load() <= 0 && b->allocatedNumBytes >= numBytes)
return text; return text;
CharPointerType newText (createUninitialisedBytes (std::max (b->allocatedNumBytes, numBytes))); CharPointerType newText (createUninitialisedBytes (std::max (b->allocatedNumBytes, numBytes)));
@@ -228,7 +230,7 @@ public:
} }
//============================================================================== //==============================================================================
Atomic<int> refCount; std::atomic<int> refCount;
size_t allocatedNumBytes; size_t allocatedNumBytes;
CharType text[1]; CharType text[1];

View File

@@ -1,623 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_THREADS_SERVICEQUEUE_H_INCLUDED
#define BEAST_THREADS_SERVICEQUEUE_H_INCLUDED
#include <beast/intrusive/List.h>
#include <beast/threads/SharedData.h>
#include <beast/threads/ThreadLocalValue.h>
#include <beast/threads/WaitableEvent.h>
#include <beast/threads/detail/DispatchedHandler.h>
namespace beast {
namespace detail {
//------------------------------------------------------------------------------
// VFALCO NOTE This allocator is a work in progress
#if 0
class ServiceQueueAllocatorArena : public SharedObject
{
public:
typedef std::size_t size_type;
class Page : public LockFreeStack <Page>::Node
{
public:
size_type const m_pageBytes;
Atomic <int> m_refs;
char* m_pos;
bool m_full;
size_type m_count;
static std::size_t overhead()
{ return sizeof (Page); }
static std::size_t pointer_overhead()
{ return sizeof (Page*); }
// pageBytes doesn't include the Page structure
explicit Page (size_type pageBytes)
: m_pageBytes (pageBytes)
, m_pos (begin())
, m_full (false)
, m_count (0)
{
}
~Page ()
{
// This means someone forgot to deallocate something
bassert (!m_full && m_refs.get() == 0);
}
static Page* create (size_type pageBytes)
{
return new (new std::uint8_t[pageBytes + overhead()]) Page (pageBytes);
}
static void destroy (Page* page)
{
page->~Page();
delete[] ((std::uint8_t*)page);
}
void reset ()
{
m_refs.set (0);
m_pos = begin();
m_full = false;
m_count = 0;
}
bool full() const
{
return m_full;
}
void* allocate (size_type n)
{
size_type const needed (n + pointer_overhead());
char* pos = m_pos + needed;
if (pos > end())
{
m_full = true;
return nullptr;
}
++m_refs;
void* p (m_pos + pointer_overhead());
get_page(p) = this;
m_pos = pos;
++m_count;
return p;
}
char* begin() const
{
return const_cast <char*> (
reinterpret_cast <char const*> (this) + overhead());
}
char const* end() const
{
return begin() + m_pageBytes;
}
// Returns true if the page can be recycled
bool deallocate (void* p, size_type)
{
bool const unused ((--m_refs) == 0);
return unused && m_full;
}
// Returns a reference to the per-allocation overhead area
static Page*& get_page (void* p)
{
return *reinterpret_cast <Page**>(
static_cast <char*>(p) - pointer_overhead());
}
};
struct State
{
State()
{
}
~State()
{
// If this goes off, someone forgot to call deallocate!
bassert (full.get() == 0);
destroy (active);
destroy (recycle);
}
void destroy (LockFreeStack <Page>& stack)
{
for(;;)
{
Page* const page (stack.pop_front());
if (page == nullptr)
break;
Page::destroy (page);
}
}
Atomic <int> full;
Atomic <int> page_count;
LockFreeStack <Page> active;
LockFreeStack <Page> recycle;
};
typedef SharedData <State> SharedState;
size_type const m_maxBytes;
SharedState m_state;
explicit ServiceQueueAllocatorArena (size_type maxBytes = 16 * 1024)
: m_maxBytes (maxBytes)
{
}
~ServiceQueueAllocatorArena()
{
}
void* allocate (size_type n)
{
SharedState::UnlockedAccess state (m_state);
// Loop until we satisfy the allocation from an
// active page, or we run out of active pages.
//
for (;;)
{
// Acquire ownership of an active page
// This prevents other threads from seeing it.
Page* page (state->active.pop_front());
if (page == nullptr)
break;
void* p = page->allocate (n);
if (p != nullptr)
{
// Put the page back so other threads can use it
state->active.push_front (page);
return p;
}
// Page is full, count it for diagnostics
++state->full;
}
// No active page, get a recycled page or create a new page.
//
Page* page (state->recycle.pop_front());
if (page == nullptr)
{
page = Page::create (std::max (m_maxBytes, n));
++state->page_count;
}
void* p = page->allocate (n);
bassert (p != nullptr);
// Throw page into the active list so other threads can use it
state->active.push_front (page);
return p;
}
void deallocate (void* p, size_type n)
{
SharedState::UnlockedAccess state (m_state);
Page* const page (Page::get_page(p));
if (page->deallocate (p, n))
{
--state->full;
page->reset();
Page::destroy (page);
//state->recycle.push_front (page);
}
}
};
//------------------------------------------------------------------------------
template <typename T>
struct ServiceQueueAllocator
{
typedef T value_type;
typedef T* pointer;
typedef T& reference;
typedef T const* const_pointer;
typedef T const& const_reference;
typedef std::size_t size_type;
typedef std::ptrdiff_t difference_type;
ServiceQueueAllocator ()
: m_arena (new ServiceQueueAllocatorArena)
{
}
ServiceQueueAllocator (ServiceQueueAllocator const& other)
: m_arena (other.m_arena)
{
}
template <typename U>
ServiceQueueAllocator (ServiceQueueAllocator <U> const& other)
: m_arena (other.m_arena)
{
}
template <typename U>
struct rebind
{
typedef ServiceQueueAllocator <U> other;
};
pointer address (reference x) const
{
return &x;
}
const_pointer address (const_reference x) const
{
return &x;
}
pointer allocate (size_type n,
std::allocator<void>::const_pointer = nullptr) const
{
size_type const bytes (n * sizeof (value_type));
pointer const p (static_cast <pointer> (
m_arena->allocate (bytes)));
return p;
}
void deallocate (pointer p, size_type n) const
{
size_type const bytes = (n * sizeof (value_type));
m_arena->deallocate (p, bytes);
}
size_type max_size () const
{
return std::numeric_limits <size_type>::max () / sizeof (value_type);
}
void construct (pointer p, const_reference val) const
{
new ((void *)p) value_type (val);
}
void destroy (pointer p) const
{
p->~value_type ();
}
private:
template <typename>
friend struct ServiceQueueAllocator;
SharedPtr <ServiceQueueAllocatorArena> m_arena;
};
#endif
}
//------------------------------------------------------------------------------
class ServiceQueueBase
{
public:
ServiceQueueBase();
~ServiceQueueBase();
std::size_t poll();
std::size_t poll_one();
std::size_t run();
std::size_t run_one();
void stop();
bool stopped() const
{ return m_stopped.get() != 0; }
void reset();
protected:
class Item;
class Waiter;
class ScopedServiceThread;
void wait();
virtual void enqueue (Item* item);
bool empty();
virtual std::size_t dequeue() = 0;
virtual Waiter* new_waiter() = 0;
//--------------------------------------------------------------------------
class Item : public List <Item>::Node
{
public:
virtual ~Item() { }
virtual void operator()() = 0;
virtual std::size_t size() const = 0;
};
//--------------------------------------------------------------------------
class Waiter : public List <Waiter>::Node
{
public:
Waiter()
{ }
void wait()
{ m_event.wait(); }
void signal()
{ m_event.signal(); }
private:
WaitableEvent m_event;
};
//--------------------------------------------------------------------------
struct State
{
// handlers
List <Item> handlers;
List <Waiter> waiting;
List <Waiter> unused;
};
typedef SharedData <State> SharedState;
SharedState m_state;
Atomic <int> m_stopped;
static ThreadLocalValue <ServiceQueueBase*> s_service;
};
//------------------------------------------------------------------------------
/** A queue for disatching function calls on other threads.
Handlers are guaranteed to be called only from threads that are currently
calling run, run_one, poll, or poll_one.
*/
template <class Allocator = std::allocator <char> >
class ServiceQueueType : public ServiceQueueBase
{
private:
using ServiceQueueBase::Item;
using ServiceQueueBase::Waiter;
template <typename Handler>
class ItemType : public Item
{
public:
explicit ItemType (BEAST_MOVE_ARG(Handler) handler)
: m_handler (BEAST_MOVE_CAST(Handler)(handler))
{ }
void operator() ()
{ m_handler(); }
std::size_t size() const
{ return sizeof (*this); }
private:
Handler m_handler;
};
public:
typedef Allocator allocator_type; // for std::uses_allocator<>
explicit ServiceQueueType (std::size_t expectedConcurrency = 1,
Allocator alloc = Allocator())
: m_alloc (alloc)
{
typename Allocator::template rebind <Waiter>::other a (m_alloc);
SharedState::Access state (m_state);
while (expectedConcurrency--)
state->unused.push_front (
*new (a.allocate (1)) Waiter);
}
~ServiceQueueType()
{
SharedState::Access state (m_state);
// Must be empty
//bassert (state->handlers.empty());
// Cannot destroy while threads are waiting
bassert (state->waiting.empty());
typename Allocator::template rebind <Waiter>::other a (m_alloc);
while (! state->unused.empty ())
{
Waiter* const waiter (&state->unused.front ());
state->unused.pop_front ();
a.destroy (waiter);
a.deallocate (waiter, 1);
}
}
/** Returns the allocator associated with the container. */
allocator_type get_allocator() const
{
return m_alloc;
}
/** Returns `true` if the current thread is processing events.
If the current thread of execution is inside a call to run,
run_one, poll, or poll_one, this function returns `true`.
*/
bool is_service_thread() const
{ return s_service.get() == this; }
/** Run the handler on a service thread.
If the current thread of execution is a service thread then this
function wil dispatch the handler on the caller's thread before
returning.
The function signature of the handler must be:
@code
void handler();
@endcode
*/
template <typename Handler>
void dispatch (BEAST_MOVE_ARG(Handler) handler)
{
if (is_service_thread())
{
handler();
}
else
{
typename Allocator::template rebind <ItemType <Handler> >::other a (m_alloc);
enqueue (new (a.allocate (1))
ItemType <Handler> (BEAST_MOVE_CAST(Handler)(handler)));
}
}
/** Request the handler to run on a service thread.
This returns immediately, even if the current thread of execution is
a service thread.
The function signature of the handler must be:
@code
void handler();
@endcode
*/
template <typename Handler>
void post (BEAST_MOVE_ARG(Handler) handler)
{
typename Allocator::template rebind <ItemType <Handler> >::other a (m_alloc);
enqueue (new (a.allocate (1))
ItemType <Handler> (BEAST_MOVE_CAST(Handler)(handler)));
}
/** Return a new handler that dispatches the wrapped handler on the queue. */
template <typename Handler>
detail::DispatchedHandler <ServiceQueueType&, Handler> wrap (
BEAST_MOVE_ARG(Handler) handler)
{
return detail::DispatchedHandler <ServiceQueueType&, Handler> (
*this, BEAST_MOVE_CAST(Handler)(handler));
}
/** Run the event loop to execute ready handlers.
This runs handlers that are ready to run, without blocking, until
there are no more handlers ready or the service queue has been stopped.
@return The number of handlers that were executed.
*/
std::size_t poll ()
{ return ServiceQueueBase::poll(); }
/** Run the event loop to execute at most one ready handler.
This will run zero or one handlers, without blocking, depending on
whether or not there is handler immediately ready to run.
@return The number of handlers that were executed.
*/
std::size_t poll_one ()
{ return ServiceQueueBase::poll_one(); }
/** Runs the queue's processing loop.
The current thread of execution becomes a service thread. This call
blocks until there is no more work remaining.
@return The number of handlers that were executed.
*/
std::size_t run ()
{ return ServiceQueueBase::run(); }
/** Runs the queue's processing loop to execute at most one handler.
@return The number of handlers that were executed.
*/
std::size_t run_one ()
{ return ServiceQueueBase::run_one(); }
/** Stop the queue's processing loop.
All threads executing run or run_one will return as soon as possible.
Future calls to run, run_one, poll, or poll_one will return immediately
until reset is called.
@see reset
*/
void stop()
{ return ServiceQueueBase::stop(); }
/** Returns `true` if the queue has been stopped.
When a queue is stopped, calls to run, run_one, poll, or poll_one will
return immediately without invoking any handlers.
*/
bool stopped() const
{ return ServiceQueueBase::stopped(); }
/** Reset the queue after a stop.
This allows the event loop to be restarted. This may not be called while
there are any threads currently executing the run, run_one, poll, or
poll_one functions, or undefined behavior will result.
*/
void reset()
{ return ServiceQueueBase::reset(); }
private:
// Dispatch a single queued handler if possible.
// Returns the number of handlers dispatched (0 or 1)
//
std::size_t dequeue ()
{
if (stopped())
return 0;
Item* item (nullptr);
{
SharedState::Access state (m_state);
if (state->handlers.empty())
return 0;
item = &state->handlers.front();
state->handlers.erase (
state->handlers.iterator_to (*item));
}
(*item)();
typename Allocator::template rebind <std::uint8_t>::other a (m_alloc);
std::size_t const size (item->size());
item->~Item();
a.deallocate (reinterpret_cast<std::uint8_t*>(item), size);
return 1;
}
// Create a new Waiter
Waiter* new_waiter()
{
typename Allocator::template rebind <Waiter>::other a (m_alloc);
return new (a.allocate (1)) Waiter;
}
Allocator m_alloc;
};
typedef ServiceQueueType <std::allocator <char> > ServiceQueue;
}
#endif

View File

@@ -24,11 +24,12 @@
#ifndef BEAST_THREADS_THREAD_H_INCLUDED #ifndef BEAST_THREADS_THREAD_H_INCLUDED
#define BEAST_THREADS_THREAD_H_INCLUDED #define BEAST_THREADS_THREAD_H_INCLUDED
#include <beast/strings/String.h>
#include <beast/utility/LeakChecked.h> #include <beast/utility/LeakChecked.h>
#include <beast/threads/RecursiveMutex.h> #include <beast/threads/RecursiveMutex.h>
#include <beast/threads/WaitableEvent.h> #include <beast/threads/WaitableEvent.h>
#include <string>
namespace beast { namespace beast {
//============================================================================== //==============================================================================
@@ -39,9 +40,6 @@ namespace beast {
do their business. The thread can then be started with the startThread() method do their business. The thread can then be started with the startThread() method
and controlled with various other methods. and controlled with various other methods.
This class also contains some thread-related static methods, such
as sleep(), yield(), getCurrentThreadId() etc.
@see CriticalSection, WaitableEvent, Process, ThreadWithProgressWindow, @see CriticalSection, WaitableEvent, Process, ThreadWithProgressWindow,
MessageManagerLock MessageManagerLock
*/ */
@@ -55,7 +53,7 @@ public:
When first created, the thread is not running. Use the startThread() When first created, the thread is not running. Use the startThread()
method to start it. method to start it.
*/ */
explicit Thread (const String& threadName); explicit Thread (std::string const& threadName);
Thread (Thread const&) = delete; Thread (Thread const&) = delete;
Thread& operator= (Thread const&) = delete; Thread& operator= (Thread const&) = delete;
@@ -89,36 +87,12 @@ public:
*/ */
void startThread(); void startThread();
/** Starts the thread with a given priority.
Launches the thread with a given priority, where 0 = lowest, 10 = highest.
If the thread is already running, its priority will be changed.
@see startThread, setPriority
*/
void startThread (int priority);
/** Attempts to stop the thread running. /** Attempts to stop the thread running.
This method will cause the threadShouldExit() method to return true This method will cause the threadShouldExit() method to return true
and call notify() in case the thread is currently waiting. and call notify() in case the thread is currently waiting.
Hopefully the thread will then respond to this by exiting cleanly, and
the stopThread method will wait for a given time-period for this to
happen.
If the thread is stuck and fails to respond after the time-out, it gets
forcibly killed, which is a very bad thing to happen, as it could still
be holding locks, etc. which are needed by other parts of your program.
@param timeOutMilliseconds The number of milliseconds to wait for the
thread to finish before killing it by force. A negative
value in here will wait forever.
@see signalThreadShouldExit, threadShouldExit, waitForThreadToExit, isThreadRunning
@returns true if the thread exits, or false if the timeout expires first.
*/ */
bool stopThread (int timeOutMilliseconds = -1); void stopThread ();
/** Stop the thread without blocking. /** Stop the thread without blocking.
This calls signalThreadShouldExit followed by notify. This calls signalThreadShouldExit followed by notify.
@@ -150,57 +124,13 @@ public:
@see signalThreadShouldExit @see signalThreadShouldExit
*/ */
inline bool threadShouldExit() const { return shouldExit; } inline bool threadShouldExit() const { return shouldExit; }
/** Waits for the thread to stop. /** Waits for the thread to stop.
This will waits until isThreadRunning() is false or until a timeout expires. This will waits until isThreadRunning() is false.
@param timeOutMilliseconds the time to wait, in milliseconds. If this value
is less than zero, it will wait forever.
@returns true if the thread exits, or false if the timeout expires first.
*/ */
bool waitForThreadToExit (int timeOutMilliseconds = -1) const; void waitForThreadToExit () const;
//==============================================================================
/** Changes the thread's priority.
May return false if for some reason the priority can't be changed.
@param priority the new priority, in the range 0 (lowest) to 10 (highest). A priority
of 5 is normal.
*/
bool setPriority (int priority);
/** Changes the priority of the caller thread.
Similar to setPriority(), but this static method acts on the caller thread.
May return false if for some reason the priority can't be changed.
@see setPriority
*/
static bool setCurrentThreadPriority (int priority);
//==============================================================================
/** Sets the affinity mask for the thread.
This will only have an effect next time the thread is started - i.e. if the
thread is already running when called, it'll have no effect.
@see setCurrentThreadAffinityMask
*/
void setAffinityMask (std::uint32_t affinityMask);
/** Changes the affinity mask for the caller thread.
This will change the affinity mask for the thread that calls this static method.
@see setAffinityMask
*/
static void setCurrentThreadAffinityMask (std::uint32_t affinityMask);
//==============================================================================
// this can be called from any thread that needs to pause..
static void sleep (int milliseconds);
//============================================================================== //==============================================================================
/** Makes the thread wait for a notification. /** Makes the thread wait for a notification.
@@ -223,59 +153,24 @@ public:
void notify() const; void notify() const;
//============================================================================== //==============================================================================
/** A value type used for thread IDs.
@see getCurrentThreadId(), getThreadId()
*/
typedef void* ThreadID;
/** Returns an id that identifies the caller thread.
To find the ID of a particular thread object, use getThreadId().
@returns a unique identifier that identifies the calling thread.
@see getThreadId
*/
static ThreadID getCurrentThreadId();
/** Finds the thread object that is currently running.
Note that the main UI thread (or other non-Beast threads) don't have a Thread
object associated with them, so this will return 0.
*/
static Thread* getCurrentThread();
/** Returns the ID of this thread.
That means the ID of this thread object - not of the thread that's calling the method.
This can change when the thread is started and stopped, and will be invalid if the
thread's not actually running.
@see getCurrentThreadId
*/
ThreadID getThreadId() const noexcept { return threadId; }
/** Returns the name of the thread. /** Returns the name of the thread.
This is the name that gets set in the constructor. This is the name that gets set in the constructor.
*/ */
const String& getThreadName() const { return threadName; } std::string const& getThreadName() const { return threadName; }
/** Changes the name of the caller thread. /** Changes the name of the caller thread.
Different OSes may place different length or content limits on this name. Different OSes may place different length or content limits on this name.
*/ */
static void setCurrentThreadName (const String& newThreadName); static void setCurrentThreadName (std::string const& newThreadName);
private: private:
//============================================================================== //==============================================================================
const String threadName; std::string const threadName;
void* volatile threadHandle; void* volatile threadHandle;
ThreadID threadId;
RecursiveMutex startStopLock; RecursiveMutex startStopLock;
WaitableEvent startSuspensionEvent, defaultEvent; WaitableEvent startSuspensionEvent, defaultEvent;
int threadPriority;
std::uint32_t affinityMask;
bool volatile shouldExit; bool volatile shouldExit;
#ifndef DOXYGEN #ifndef DOXYGEN
@@ -284,9 +179,7 @@ private:
void launchThread(); void launchThread();
void closeThreadHandle(); void closeThreadHandle();
void killThread();
void threadEntryPoint(); void threadEntryPoint();
static bool setThreadPriority (void*, int);
}; };
} }

View File

@@ -1,199 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_THREADS_THREADLOCALVALUE_H_INCLUDED
#define BEAST_THREADS_THREADLOCALVALUE_H_INCLUDED
#include <beast/Config.h>
#include <beast/threads/SpinLock.h>
#include <beast/threads/Thread.h>
namespace beast {
// (NB: on win32, native thread-locals aren't possible in a dynamically loaded DLL in XP).
#if ! ((BEAST_MSVC && (BEAST_64BIT || ! defined (BeastPlugin_PluginCode))) \
|| (BEAST_MAC && BEAST_CLANG && defined (MAC_OS_X_VERSION_10_7) \
&& MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_7))
#define BEAST_NO_COMPILER_THREAD_LOCAL 1
#endif
//==============================================================================
/**
Provides cross-platform support for thread-local objects.
This class holds an internal list of objects of the templated type, keeping
an instance for each thread that requests one. The first time a thread attempts
to access its value, an object is created and added to the list for that thread.
Typically, you'll probably want to create a static instance of a ThreadLocalValue
object, or hold one within a singleton.
The templated class for your value could be a primitive type, or any class that
has a default constructor and copy operator.
When a thread no longer needs to use its value, it can call releaseCurrentThreadStorage()
to allow the storage to be re-used by another thread. If a thread exits without calling
this method, the object storage will be left allocated until the ThreadLocalValue object
is deleted.
*/
template <typename Type>
class ThreadLocalValue
{
public:
/** */
ThreadLocalValue() = default;
ThreadLocalValue (ThreadLocalValue const&) = delete;
ThreadLocalValue& operator= (ThreadLocalValue const&) = delete;
/** Destructor.
When this object is deleted, all the value objects for all threads will be deleted.
*/
~ThreadLocalValue()
{
#if BEAST_NO_COMPILER_THREAD_LOCAL
for (ObjectHolder* o = first.value; o != nullptr;)
{
ObjectHolder* const next = o->next;
delete o;
o = next;
}
#endif
}
/** Returns a reference to this thread's instance of the value.
Note that the first time a thread tries to access the value, an instance of the
value object will be created - so if your value's class has a non-trivial
constructor, be aware that this method could invoke it.
*/
Type& operator*() const noexcept { return get(); }
/** Returns a pointer to this thread's instance of the value.
Note that the first time a thread tries to access the value, an instance of the
value object will be created - so if your value's class has a non-trivial
constructor, be aware that this method could invoke it.
*/
operator Type*() const noexcept { return &get(); }
/** Accesses a method or field of the value object.
Note that the first time a thread tries to access the value, an instance of the
value object will be created - so if your value's class has a non-trivial
constructor, be aware that this method could invoke it.
*/
Type* operator->() const noexcept { return &get(); }
/** Assigns a new value to the thread-local object. */
ThreadLocalValue& operator= (const Type& newValue) { get() = newValue; return *this; }
/** Returns a reference to this thread's instance of the value.
Note that the first time a thread tries to access the value, an instance of the
value object will be created - so if your value's class has a non-trivial
constructor, be aware that this method could invoke it.
*/
Type& get() const noexcept
{
#if BEAST_NO_COMPILER_THREAD_LOCAL
const Thread::ThreadID threadId = Thread::getCurrentThreadId();
for (ObjectHolder* o = first.get(); o != nullptr; o = o->next)
if (o->threadId == threadId)
return o->object;
for (ObjectHolder* o = first.get(); o != nullptr; o = o->next)
{
if (o->threadId == nullptr)
{
{
SpinLock::ScopedLockType sl (lock);
if (o->threadId != nullptr)
continue;
o->threadId = threadId;
}
o->object = Type();
return o->object;
}
}
ObjectHolder* const newObject = new ObjectHolder (threadId);
do
{
newObject->next = first.get();
}
while (! first.compareAndSetBool (newObject, newObject->next));
return newObject->object;
#elif BEAST_MAC
static __thread Type object;
return object;
#elif BEAST_MSVC
static __declspec(thread) Type object;
return object;
#endif
}
/** Called by a thread before it terminates, to allow this class to release
any storage associated with the thread.
*/
void releaseCurrentThreadStorage()
{
#if BEAST_NO_COMPILER_THREAD_LOCAL
const Thread::ThreadID threadId = Thread::getCurrentThreadId();
for (ObjectHolder* o = first.get(); o != nullptr; o = o->next)
{
if (o->threadId == threadId)
{
SpinLock::ScopedLockType sl (lock);
o->threadId = nullptr;
}
}
#endif
}
private:
//==============================================================================
#if BEAST_NO_COMPILER_THREAD_LOCAL
struct ObjectHolder
{
ObjectHolder (const Thread::ThreadID& tid)
: threadId (tid), object()
{}
ObjectHolder (ObjectHolder const&) = delete;
ObjectHolder& operator= (ObjectHolder const&) = delete;
Thread::ThreadID threadId;
ObjectHolder* next;
Type object;
};
Atomic<ObjectHolder*> mutable first;
SpinLock mutable lock;
#endif
};
}
#endif

View File

@@ -22,10 +22,7 @@
#endif #endif
#include <beast/threads/impl/RecursiveMutex.cpp> #include <beast/threads/impl/RecursiveMutex.cpp>
#include <beast/threads/impl/ServiceQueue.cpp>
#include <beast/threads/impl/Stoppable.cpp> #include <beast/threads/impl/Stoppable.cpp>
#include <beast/threads/impl/Stoppable.test.cpp> #include <beast/threads/impl/Stoppable.test.cpp>
#include <beast/threads/impl/Thread.cpp> #include <beast/threads/impl/Thread.cpp>
#include <beast/threads/impl/WaitableEvent.cpp> #include <beast/threads/impl/WaitableEvent.cpp>
#include <beast/threads/tests/ServiceQueue.cpp>

View File

@@ -1,186 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <beast/threads/ServiceQueue.h>
namespace beast {
class ServiceQueueBase::ScopedServiceThread : public List <ScopedServiceThread>::Node
{
public:
explicit ScopedServiceThread (ServiceQueueBase* queue)
: m_saved (ServiceQueueBase::s_service.get())
{
ServiceQueueBase::s_service.get() = queue;
}
~ScopedServiceThread()
{
ServiceQueueBase::s_service.get() = m_saved;
}
private:
ServiceQueueBase* m_saved;
};
//------------------------------------------------------------------------------
ServiceQueueBase::ServiceQueueBase()
{
}
ServiceQueueBase::~ServiceQueueBase()
{
}
std::size_t ServiceQueueBase::poll ()
{
std::size_t total (0);
ScopedServiceThread thread (this);
for (;;)
{
std::size_t const n (dequeue());
if (! n)
break;
total += n;
}
return total;
}
std::size_t ServiceQueueBase::poll_one ()
{
ScopedServiceThread thread (this);
return dequeue();
}
std::size_t ServiceQueueBase::run ()
{
std::size_t total (0);
ScopedServiceThread thread (this);
while (! stopped())
{
total += poll ();
wait ();
}
return total;
}
std::size_t ServiceQueueBase::run_one ()
{
std::size_t n;
ScopedServiceThread (this);
for (;;)
{
n = poll_one();
if (n != 0)
break;
wait();
}
return n;
}
void ServiceQueueBase::stop ()
{
SharedState::Access state (m_state);
m_stopped.set (1);
while (! state->waiting.empty ())
{
Waiter& waiting (state->waiting.front());
state->waiting.pop_front ();
waiting.signal ();
}
}
void ServiceQueueBase::reset()
{
// Must be stopped
bassert (m_stopped.get () != 0);
m_stopped.set (0);
}
// Block on the event if there are no items
// in the queue and we are not stopped.
//
void ServiceQueueBase::wait ()
{
Waiter* waiter (nullptr);
{
SharedState::Access state (m_state);
if (stopped ())
return;
if (! state->handlers.empty())
return;
if (state->unused.empty ())
{
waiter = new_waiter();
}
else
{
waiter = &state->unused.front ();
state->unused.pop_front ();
}
state->waiting.push_front (*waiter);
}
waiter->wait();
// Waiter got taken off the waiting list
{
SharedState::Access state (m_state);
state->unused.push_front (*waiter);
}
}
void ServiceQueueBase::enqueue (Item* item)
{
Waiter* waiter (nullptr);
{
SharedState::Access state (m_state);
state->handlers.push_back (*item);
if (! state->waiting.empty ())
{
waiter = &state->waiting.front ();
state->waiting.pop_front ();
}
}
if (waiter != nullptr)
waiter->signal();
}
bool ServiceQueueBase::empty()
{
SharedState::Access state (m_state);
return state->handlers.empty();
}
// A thread can only be blocked on one ServiceQueue so we store the pointer
// to which ServiceQueue it is blocked on to determine if the thread belongs
// to that queue.
//
ThreadLocalValue <ServiceQueueBase*> ServiceQueueBase::s_service;
}

View File

@@ -27,15 +27,13 @@
#include <beast/module/core/time/Time.h> #include <beast/module/core/time/Time.h>
#include <cassert> #include <cassert>
#include <thread>
namespace beast { namespace beast {
Thread::Thread (const String& threadName_) Thread::Thread (std::string const& threadName_)
: threadName (threadName_), : threadName (threadName_),
threadHandle (nullptr), threadHandle (nullptr),
threadId (0),
threadPriority (5),
affinityMask (0),
shouldExit (false) shouldExit (false)
{ {
} }
@@ -55,53 +53,14 @@ Thread::~Thread()
} }
//============================================================================== //==============================================================================
// Use a ref-counted object to hold this shared data, so that it can outlive its static
// shared pointer when threads are still running during static shutdown.
struct CurrentThreadHolder : public SharedObject
{
CurrentThreadHolder() noexcept {}
typedef SharedPtr <CurrentThreadHolder> Ptr;
ThreadLocalValue<Thread*> value;
};
static char currentThreadHolderLock [sizeof (SpinLock)]; // (statically initialised to zeros).
static SpinLock* castToSpinLockWithoutAliasingWarning (void* s)
{
return static_cast<SpinLock*> (s);
}
static CurrentThreadHolder::Ptr getCurrentThreadHolder()
{
static CurrentThreadHolder::Ptr currentThreadHolder;
SpinLock::ScopedLockType lock (*castToSpinLockWithoutAliasingWarning (currentThreadHolderLock));
if (currentThreadHolder == nullptr)
currentThreadHolder = new CurrentThreadHolder();
return currentThreadHolder;
}
void Thread::threadEntryPoint() void Thread::threadEntryPoint()
{ {
const CurrentThreadHolder::Ptr currentThreadHolder (getCurrentThreadHolder()); if (!threadName.empty ())
currentThreadHolder->value = this;
if (threadName.isNotEmpty())
setCurrentThreadName (threadName); setCurrentThreadName (threadName);
if (startSuspensionEvent.wait (10000)) if (startSuspensionEvent.wait (10000))
{
bassert (getCurrentThreadId() == threadId);
if (affinityMask != 0)
setCurrentThreadAffinityMask (affinityMask);
run(); run();
}
currentThreadHolder->value.releaseCurrentThreadStorage();
closeThreadHandle(); closeThreadHandle();
} }
@@ -121,100 +80,37 @@ void Thread::startThread()
if (threadHandle == nullptr) if (threadHandle == nullptr)
{ {
launchThread(); launchThread();
setThreadPriority (threadHandle, threadPriority);
startSuspensionEvent.signal(); startSuspensionEvent.signal();
} }
} }
void Thread::startThread (const int priority)
{
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (threadHandle == nullptr)
{
threadPriority = priority;
startThread();
}
else
{
setPriority (priority);
}
}
bool Thread::isThreadRunning() const bool Thread::isThreadRunning() const
{ {
return threadHandle != nullptr; return threadHandle != nullptr;
} }
Thread* Thread::getCurrentThread()
{
return getCurrentThreadHolder()->value.get();
}
//============================================================================== //==============================================================================
void Thread::signalThreadShouldExit() void Thread::signalThreadShouldExit()
{ {
shouldExit = true; shouldExit = true;
} }
bool Thread::waitForThreadToExit (const int timeOutMilliseconds) const void Thread::waitForThreadToExit () const
{ {
// Doh! So how exactly do you expect this thread to wait for itself to stop??
bassert (getThreadId() != getCurrentThreadId() || getCurrentThreadId() == 0);
const std::uint32_t timeoutEnd = Time::getMillisecondCounter() + (std::uint32_t) timeOutMilliseconds;
while (isThreadRunning()) while (isThreadRunning())
{ std::this_thread::sleep_for (std::chrono::milliseconds (10));
if (timeOutMilliseconds >= 0 && Time::getMillisecondCounter() > timeoutEnd)
return false;
sleep (2);
}
return true;
} }
bool Thread::stopThread (const int timeOutMilliseconds) void Thread::stopThread ()
{ {
bool cleanExit = true;
// agh! You can't stop the thread that's calling this method! How on earth
// would that work??
bassert (getCurrentThreadId() != getThreadId());
const RecursiveMutex::ScopedLockType sl (startStopLock); const RecursiveMutex::ScopedLockType sl (startStopLock);
if (isThreadRunning()) if (isThreadRunning())
{ {
signalThreadShouldExit(); signalThreadShouldExit();
notify(); notify();
waitForThreadToExit ();
if (timeOutMilliseconds != 0)
{
cleanExit = waitForThreadToExit (timeOutMilliseconds);
}
if (isThreadRunning())
{
bassert (! cleanExit);
// very bad karma if this point is reached, as there are bound to be
// locks and events left in silly states when a thread is killed by force..
killThread();
threadHandle = nullptr;
threadId = 0;
cleanExit = false;
}
else
{
cleanExit = true;
}
} }
return cleanExit;
} }
void Thread::stopThreadAsync () void Thread::stopThreadAsync ()
@@ -228,35 +124,6 @@ void Thread::stopThreadAsync ()
} }
} }
//==============================================================================
bool Thread::setPriority (const int newPriority)
{
// NB: deadlock possible if you try to set the thread prio from the thread itself,
// so using setCurrentThreadPriority instead in that case.
if (getCurrentThreadId() == getThreadId())
return setCurrentThreadPriority (newPriority);
const RecursiveMutex::ScopedLockType sl (startStopLock);
if (setThreadPriority (threadHandle, newPriority))
{
threadPriority = newPriority;
return true;
}
return false;
}
bool Thread::setCurrentThreadPriority (const int newPriority)
{
return setThreadPriority (0, newPriority);
}
void Thread::setAffinityMask (const std::uint32_t newAffinityMask)
{
affinityMask = newAffinityMask;
}
//============================================================================== //==============================================================================
bool Thread::wait (const int timeOutMilliseconds) const bool Thread::wait (const int timeOutMilliseconds) const
{ {
@@ -300,28 +167,15 @@ void Thread::launchThread()
{ {
unsigned int newThreadId; unsigned int newThreadId;
threadHandle = (void*) _beginthreadex (0, 0, &threadEntryProc, this, 0, &newThreadId); threadHandle = (void*) _beginthreadex (0, 0, &threadEntryProc, this, 0, &newThreadId);
threadId = (ThreadID) newThreadId;
} }
void Thread::closeThreadHandle() void Thread::closeThreadHandle()
{ {
CloseHandle ((HANDLE) threadHandle); CloseHandle ((HANDLE) threadHandle);
threadId = 0;
threadHandle = 0; threadHandle = 0;
} }
void Thread::killThread() void Thread::setCurrentThreadName (std::string const& name)
{
if (threadHandle != 0)
{
#if BEAST_DEBUG
OutputDebugStringA ("** Warning - Forced thread termination **\n");
#endif
TerminateThread (threadHandle, 0);
}
}
void Thread::setCurrentThreadName (const String& name)
{ {
#if BEAST_DEBUG && BEAST_MSVC #if BEAST_DEBUG && BEAST_MSVC
struct struct
@@ -333,7 +187,7 @@ void Thread::setCurrentThreadName (const String& name)
} info; } info;
info.dwType = 0x1000; info.dwType = 0x1000;
info.szName = name.toUTF8(); info.szName = name.c_str ();
info.dwThreadID = GetCurrentThreadId(); info.dwThreadID = GetCurrentThreadId();
info.dwFlags = 0; info.dwFlags = 0;
@@ -348,70 +202,6 @@ void Thread::setCurrentThreadName (const String& name)
#endif #endif
} }
Thread::ThreadID Thread::getCurrentThreadId()
{
return (ThreadID) (std::intptr_t) GetCurrentThreadId();
}
bool Thread::setThreadPriority (void* handle, int priority)
{
int pri = THREAD_PRIORITY_TIME_CRITICAL;
if (priority < 1) pri = THREAD_PRIORITY_IDLE;
else if (priority < 2) pri = THREAD_PRIORITY_LOWEST;
else if (priority < 5) pri = THREAD_PRIORITY_BELOW_NORMAL;
else if (priority < 7) pri = THREAD_PRIORITY_NORMAL;
else if (priority < 9) pri = THREAD_PRIORITY_ABOVE_NORMAL;
else if (priority < 10) pri = THREAD_PRIORITY_HIGHEST;
if (handle == 0)
handle = GetCurrentThread();
return SetThreadPriority (handle, pri) != FALSE;
}
void Thread::setCurrentThreadAffinityMask (const std::uint32_t affinityMask)
{
SetThreadAffinityMask (GetCurrentThread(), affinityMask);
}
struct SleepEvent
{
SleepEvent() noexcept
: handle (CreateEvent (nullptr, FALSE, FALSE,
#if BEAST_DEBUG
_T("BEAST Sleep Event")))
#else
nullptr))
#endif
{}
~SleepEvent() noexcept
{
CloseHandle (handle);
handle = 0;
}
HANDLE handle;
};
static SleepEvent sleepEvent;
void Thread::sleep (const int millisecs)
{
if (millisecs >= 10 || sleepEvent.handle == 0)
{
Sleep ((DWORD) millisecs);
}
else
{
// unlike Sleep() this is guaranteed to return to the current thread after
// the time expires, so we'll use this for short waits, which are more likely
// to need to be accurate
WaitForSingleObject (sleepEvent.handle, (DWORD) millisecs);
}
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -440,14 +230,6 @@ namespace beast{
namespace beast { namespace beast {
void Thread::sleep (int millisecs)
{
struct timespec time;
time.tv_sec = millisecs / 1000;
time.tv_nsec = (millisecs % 1000) * 1000000;
nanosleep (&time, nullptr);
}
void beast_threadEntryPoint (void*); void beast_threadEntryPoint (void*);
extern "C" void* threadEntryProcBeast (void*); extern "C" void* threadEntryProcBeast (void*);
@@ -480,108 +262,30 @@ void Thread::launchThread()
{ {
pthread_detach (handle); pthread_detach (handle);
threadHandle = (void*) handle; threadHandle = (void*) handle;
threadId = (ThreadID) threadHandle;
} }
} }
void Thread::closeThreadHandle() void Thread::closeThreadHandle()
{ {
threadId = 0;
threadHandle = 0; threadHandle = 0;
} }
void Thread::killThread() void Thread::setCurrentThreadName (std::string const& name)
{
if (threadHandle != 0)
{
#if BEAST_ANDROID
bassertfalse; // pthread_cancel not available!
#else
pthread_cancel ((pthread_t) threadHandle);
#endif
}
}
void Thread::setCurrentThreadName (const String& name)
{ {
#if BEAST_IOS || (BEAST_MAC && defined (MAC_OS_X_VERSION_10_5) && MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_5) #if BEAST_IOS || (BEAST_MAC && defined (MAC_OS_X_VERSION_10_5) && MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_5)
BEAST_AUTORELEASEPOOL BEAST_AUTORELEASEPOOL
{ {
[[NSThread currentThread] setName: beastStringToNS (name)]; [[NSThread currentThread] setName: beastStringToNS (beast::String (name))];
} }
#elif BEAST_LINUX #elif BEAST_LINUX
#if (__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2012 #if (__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2012
pthread_setname_np (pthread_self(), name.toRawUTF8()); pthread_setname_np (pthread_self(), name.c_str ());
#else #else
prctl (PR_SET_NAME, name.toRawUTF8(), 0, 0, 0); prctl (PR_SET_NAME, name.c_str (), 0, 0, 0);
#endif #endif
#endif #endif
} }
bool Thread::setThreadPriority (void* handle, int priority)
{
struct sched_param param;
int policy;
priority = blimit (0, 10, priority);
if (handle == nullptr)
handle = (void*) pthread_self();
if (pthread_getschedparam ((pthread_t) handle, &policy, &param) != 0)
return false;
policy = priority == 0 ? SCHED_OTHER : SCHED_RR;
const int minPriority = sched_get_priority_min (policy);
const int maxPriority = sched_get_priority_max (policy);
param.sched_priority = ((maxPriority - minPriority) * priority) / 10 + minPriority;
return pthread_setschedparam ((pthread_t) handle, policy, &param) == 0;
}
Thread::ThreadID Thread::getCurrentThreadId()
{
return (ThreadID) pthread_self();
}
//==============================================================================
/* Remove this macro if you're having problems compiling the cpu affinity
calls (the API for these has changed about quite a bit in various Linux
versions, and a lot of distros seem to ship with obsolete versions)
*/
#if defined (CPU_ISSET) && ! defined (SUPPORT_AFFINITIES)
#define SUPPORT_AFFINITIES 1
#endif
void Thread::setCurrentThreadAffinityMask (const std::uint32_t affinityMask)
{
#if SUPPORT_AFFINITIES
cpu_set_t affinity;
CPU_ZERO (&affinity);
for (int i = 0; i < 32; ++i)
if ((affinityMask & (1 << i)) != 0)
CPU_SET (i, &affinity);
/*
N.B. If this line causes a compile error, then you've probably not got the latest
version of glibc installed.
If you don't want to update your copy of glibc and don't care about cpu affinities,
then you can just disable all this stuff by setting the SUPPORT_AFFINITIES macro to 0.
*/
sched_setaffinity (getpid(), sizeof (cpu_set_t), &affinity);
sched_yield();
#else
/* affinities aren't supported because either the appropriate header files weren't found,
or the SUPPORT_AFFINITIES macro was turned off
*/
bassertfalse;
(void) affinityMask;
#endif
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -1,295 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <beast/threads/ServiceQueue.h>
#include <beast/unit_test/suite.h>
#include <beast/module/core/time/Time.h>
#include <beast/module/core/maths/Random.h>
#include <functional>
#include <sstream>
namespace beast {
class ServiceQueue_timing_test : public unit_test::suite
{
public:
class Stopwatch
{
public:
Stopwatch () { start(); }
void start () { m_startTime = Time::getHighResolutionTicks (); }
double getElapsed ()
{
std::int64_t const now = Time::getHighResolutionTicks();
return Time::highResolutionTicksToSeconds (now - m_startTime);
}
private:
std::int64_t m_startTime;
};
static int const callsPerThread = 50000;
//--------------------------------------------------------------------------
template <typename ServiceType>
struct Consumer : Thread
{
ServiceType& m_service;
Random m_random;
String m_string;
Consumer (int id, std::int64_t seedValue, ServiceType& service)
: Thread ("C#" + String::fromNumber (id))
, m_service (service)
, m_random (seedValue)
{ startThread(); }
~Consumer ()
{ stopThread(); }
static Consumer*& thread()
{
static ThreadLocalValue <Consumer*> local;
return local.get();
}
static void stop_one ()
{ thread()->signalThreadShouldExit(); }
static void handler ()
{ thread()->do_handler(); }
void do_handler()
{
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
}
void run ()
{
thread() = this;
while (! threadShouldExit())
m_service.run_one();
}
};
//--------------------------------------------------------------------------
template <typename ServiceType>
struct Producer : Thread
{
ServiceType& m_service;
Random m_random;
String m_string;
Producer (int id, std::int64_t seedValue, ServiceType& service)
: Thread ("P#" + String::fromNumber (id))
, m_service (service)
, m_random (seedValue)
{ }
~Producer ()
{ stopThread(); }
void run ()
{
for (std::size_t i = 0; i < callsPerThread; ++i)
{
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
m_service.dispatch (std::bind (&Consumer<ServiceType>::handler));
}
}
};
//--------------------------------------------------------------------------
template <typename Allocator>
void testThreads (int nConsumers, int nProducers)
{
std::stringstream ss;
ss <<
nConsumers << " consumers, " <<
nProducers << " producers, Allocator = " <<
typeid(Allocator).name();
testcase (ss.str());
typedef ServiceQueueType <Allocator> ServiceType;
ServiceType service (nConsumers);
std::vector <std::unique_ptr <Consumer <ServiceType> > > consumers;
std::vector <std::unique_ptr <Producer <ServiceType> > > producers;
consumers.reserve (nConsumers);
producers.reserve (nProducers);
Random r;
for (int i = 0; i < nConsumers; ++i)
consumers.emplace_back (new Consumer <ServiceType> (i + 1,
r.nextInt64(), service));
for (int i = 0; i < nProducers; ++i)
producers.emplace_back (new Producer <ServiceType> (i + 1,
r.nextInt64(), service));
Stopwatch t;
for (std::size_t i = 0; i < producers.size(); ++i)
producers[i]->startThread();
for (std::size_t i = 0; i < producers.size(); ++i)
producers[i]->waitForThreadToExit();
for (std::size_t i = 0; i < consumers.size(); ++i)
service.dispatch (std::bind (&Consumer <ServiceType>::stop_one));
for (std::size_t i = 0; i < consumers.size(); ++i)
consumers[i]->waitForThreadToExit();
double const seconds (t.getElapsed());
log << seconds << " seconds";
pass();
}
void run()
{
#if 1
testThreads <std::allocator<char> > (1, 1);
testThreads <std::allocator<char> > (1, 4);
testThreads <std::allocator<char> > (1, 16);
testThreads <std::allocator<char> > (4, 1);
testThreads <std::allocator<char> > (8, 16);
#endif
#if 0
testThreads <detail::ServiceQueueAllocator<char> > (1, 1);
testThreads <detail::ServiceQueueAllocator<char> > (1, 4);
testThreads <detail::ServiceQueueAllocator<char> > (1, 16);
testThreads <detail::ServiceQueueAllocator<char> > (4, 1);
testThreads <detail::ServiceQueueAllocator<char> > (8, 16);
#endif
}
};
BEAST_DEFINE_TESTSUITE_MANUAL(ServiceQueue_timing,threads,beast);
//------------------------------------------------------------------------------
class ServiceQueue_test : public unit_test::suite
{
public:
struct ServiceThread : Thread
{
Random m_random;
ServiceQueue& m_service;
String m_string;
ServiceThread (int id, std::int64_t seedValue,
ServiceQueue& service)
: Thread ("#" + String::fromNumber (id))
, m_random (seedValue)
, m_service (service)
{
startThread();
}
~ServiceThread ()
{
stopThread();
}
static ServiceThread*& thread()
{
static ThreadLocalValue <ServiceThread*> local;
return local.get();
}
static void stop_one ()
{
thread()->signalThreadShouldExit();
}
static void handler ()
{
thread()->do_handler();
}
void do_handler()
{
#if 1
String const s (String::fromNumber (m_random.nextInt()));
m_string += s;
if (m_string.length() > 100)
m_string = String::empty;
#endif
}
void run ()
{
thread() = this;
while (! threadShouldExit())
m_service.run_one();
}
};
static std::size_t const totalCalls = 10000;
void testThreads (int n)
{
std::stringstream ss;
ss << n << " threads";
testcase (ss.str());
Random r;
std::size_t const callsPerThread (totalCalls / n);
ServiceQueue service (n);
std::vector <std::unique_ptr <ServiceThread> > threads;
threads.reserve (n);
for (int i = 0; i < n; ++i)
threads.emplace_back (new ServiceThread (i + 1,
r.nextInt64(), service));
for (std::size_t i = n * callsPerThread; i; --i)
service.dispatch (std::bind (&ServiceThread::handler));
for (std::size_t i = 0; i < threads.size(); ++i)
service.dispatch (std::bind (&ServiceThread::stop_one));
for (std::size_t i = 0; i < threads.size(); ++i)
threads[i]->waitForThreadToExit();
pass();
}
void run()
{
testThreads (1);
testThreads (4);
testThreads (16);
}
};
BEAST_DEFINE_TESTSUITE(ServiceQueue,threads,beast);
}