diff --git a/beast/Atomic.h b/beast/Atomic.h deleted file mode 100644 index c2ec2d6a5f..0000000000 --- a/beast/Atomic.h +++ /dev/null @@ -1,429 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Portions of this file are from JUCE. - Copyright (c) 2013 - Raw Material Software Ltd. - Please visit http://www.juce.com - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef BEAST_ATOMIC_H_INCLUDED -#define BEAST_ATOMIC_H_INCLUDED - -#include - -#include - -#include - -namespace beast { - -//============================================================================== -/** - Simple class to hold a primitive value and perform atomic operations on it. - - The type used must be a 32 or 64 bit primitive, like an int, pointer, etc. - There are methods to perform most of the basic atomic operations. -*/ -template -class Atomic -{ - // This class can only be used for types which are 32 or 64 bits in size. - static_assert (sizeof (Type) == 4 || sizeof (Type) == 8, - "Atomic arguments must be 32- or 64-bit long primitive types."); - -public: - /** Creates a new value, initialised to zero. */ - inline Atomic() noexcept - : value (0) - { - } - - /** Creates a new value, with a given initial value. */ - inline Atomic (const Type initialValue) noexcept - : value (initialValue) - { - } - - /** Copies another value (atomically). */ - inline Atomic (const Atomic& other) noexcept - : value (other.get()) - { - } - - /** Destructor. */ - inline ~Atomic() noexcept - { - } - - /** Atomically reads and returns the current value. */ - Type get() const noexcept; - - /** Copies another value onto this one (atomically). */ - Atomic& operator= (const Atomic& other) noexcept - { exchange (other.get()); return *this; } - - /** Copies another value onto this one (atomically). */ - Atomic& operator= (const Type newValue) noexcept - { exchange (newValue); return *this; } - - /** Atomically sets the current value. */ - void set (Type newValue) noexcept - { exchange (newValue); } - - /** Atomically sets the current value, returning the value that was replaced. */ - Type exchange (Type value) noexcept; - - /** Atomically adds a number to this value, returning the new value. */ - Type operator+= (Type amountToAdd) noexcept; - - /** Atomically subtracts a number from this value, returning the new value. */ - Type operator-= (Type amountToSubtract) noexcept; - - /** Atomically increments this value, returning the new value. */ - Type operator++() noexcept; - - /** Atomically decrements this value, returning the new value. */ - Type operator--() noexcept; - - /** Atomically compares this value with a target value, and if it is equal, sets - this to be equal to a new value. - - This operation is the atomic equivalent of doing this: - @code - bool compareAndSetBool (Type newValue, Type valueToCompare) - { - if (get() == valueToCompare) - { - set (newValue); - return true; - } - - return false; - } - @endcode - - @returns true if the comparison was true and the value was replaced; false if - the comparison failed and the value was left unchanged. - @see compareAndSetValue - */ - bool compareAndSetBool (Type newValue, Type valueToCompare) noexcept; - - /** Atomically compares this value with a target value, and if it is equal, sets - this to be equal to a new value. - - This operation is the atomic equivalent of doing this: - @code - Type compareAndSetValue (Type newValue, Type valueToCompare) - { - Type oldValue = get(); - if (oldValue == valueToCompare) - set (newValue); - - return oldValue; - } - @endcode - - @returns the old value before it was changed. - @see compareAndSetBool - */ - Type compareAndSetValue (Type newValue, Type valueToCompare) noexcept; - - //============================================================================== - #if BEAST_64BIT - BEAST_ALIGN (8) - #else - BEAST_ALIGN (4) - #endif - - /** The raw value that this class operates on. - This is exposed publically in case you need to manipulate it directly - for performance reasons. - */ - volatile Type value; - -private: - template - static inline Dest castTo (Source value) noexcept { union { Dest d; Source s; } u; u.s = value; return u.d; } - - static inline Type castFrom32Bit (std::int32_t value) noexcept { return castTo (value); } - static inline Type castFrom64Bit (std::int64_t value) noexcept { return castTo (value); } - static inline std::int32_t castTo32Bit (Type value) noexcept { return castTo (value); } - static inline std::int64_t castTo64Bit (Type value) noexcept { return castTo (value); } - - - Type operator++ (int); // better to just use pre-increment with atomics.. - Type operator-- (int); - - /** This templated negate function will negate pointers as well as integers */ - template - inline ValueType negateValue (ValueType n) noexcept - { - return sizeof (ValueType) == 1 ? (ValueType) -(signed char) n - : (sizeof (ValueType) == 2 ? (ValueType) -(short) n - : (sizeof (ValueType) == 4 ? (ValueType) -(int) n - : ((ValueType) -(std::int64_t) n))); - } - - /** This templated negate function will negate pointers as well as integers */ - template - inline PointerType* negateValue (PointerType* n) noexcept - { - return reinterpret_cast (-reinterpret_cast (n)); - } -}; - - -//============================================================================== -/* - The following code is in the header so that the atomics can be inlined where possible... -*/ -#if BEAST_IOS || (BEAST_MAC && (BEAST_PPC || BEAST_CLANG || __GNUC__ < 4 || (__GNUC__ == 4 && __GNUC_MINOR__ < 2))) - #define BEAST_ATOMICS_MAC 1 // Older OSX builds using gcc4.1 or earlier - - #if MAC_OS_X_VERSION_MIN_REQUIRED < MAC_OS_X_VERSION_10_5 - #define BEAST_MAC_ATOMICS_VOLATILE - #else - #define BEAST_MAC_ATOMICS_VOLATILE volatile - #endif - - #if BEAST_PPC || BEAST_IOS - // None of these atomics are available for PPC or for iOS 3.1 or earlier!! - template static Type OSAtomicAdd64Barrier (Type b, BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return *a += b; } - template static Type OSAtomicIncrement64Barrier (BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return ++*a; } - template static Type OSAtomicDecrement64Barrier (BEAST_MAC_ATOMICS_VOLATILE Type* a) noexcept { bassertfalse; return --*a; } - template static bool OSAtomicCompareAndSwap64Barrier (Type old, Type newValue, BEAST_MAC_ATOMICS_VOLATILE Type* value) noexcept - { bassertfalse; if (old == *value) { *value = newValue; return true; } return false; } - #define BEAST_64BIT_ATOMICS_UNAVAILABLE 1 - #endif - -#elif BEAST_CLANG && BEAST_LINUX - #define BEAST_ATOMICS_GCC 1 - -//============================================================================== -#elif BEAST_GCC - #define BEAST_ATOMICS_GCC 1 // GCC with intrinsics - - #if BEAST_IOS || BEAST_ANDROID // (64-bit ops will compile but not link on these mobile OSes) - #define BEAST_64BIT_ATOMICS_UNAVAILABLE 1 - #endif - -//============================================================================== -#else - #define BEAST_ATOMICS_WINDOWS 1 // Windows with intrinsics - - #if BEAST_USE_INTRINSICS - #ifndef __INTEL_COMPILER - #pragma intrinsic (_InterlockedExchange, _InterlockedIncrement, _InterlockedDecrement, _InterlockedCompareExchange, \ - _InterlockedCompareExchange64, _InterlockedExchangeAdd, _ReadWriteBarrier) - #endif - #define beast_InterlockedExchange(a, b) _InterlockedExchange(a, b) - #define beast_InterlockedIncrement(a) _InterlockedIncrement(a) - #define beast_InterlockedDecrement(a) _InterlockedDecrement(a) - #define beast_InterlockedExchangeAdd(a, b) _InterlockedExchangeAdd(a, b) - #define beast_InterlockedCompareExchange(a, b, c) _InterlockedCompareExchange(a, b, c) - #define beast_InterlockedCompareExchange64(a, b, c) _InterlockedCompareExchange64(a, b, c) - #define beast_MemoryBarrier _ReadWriteBarrier - #else - long beast_InterlockedExchange (volatile long* a, long b) noexcept; - long beast_InterlockedIncrement (volatile long* a) noexcept; - long beast_InterlockedDecrement (volatile long* a) noexcept; - long beast_InterlockedExchangeAdd (volatile long* a, long b) noexcept; - long beast_InterlockedCompareExchange (volatile long* a, long b, long c) noexcept; - __int64 beast_InterlockedCompareExchange64 (volatile __int64* a, __int64 b, __int64 c) noexcept; - inline void beast_MemoryBarrier() noexcept { long x = 0; beast_InterlockedIncrement (&x); } - #endif - - #if BEAST_64BIT - #ifndef __INTEL_COMPILER - #pragma intrinsic (_InterlockedExchangeAdd64, _InterlockedExchange64, _InterlockedIncrement64, _InterlockedDecrement64) - #endif - #define beast_InterlockedExchangeAdd64(a, b) _InterlockedExchangeAdd64(a, b) - #define beast_InterlockedExchange64(a, b) _InterlockedExchange64(a, b) - #define beast_InterlockedIncrement64(a) _InterlockedIncrement64(a) - #define beast_InterlockedDecrement64(a) _InterlockedDecrement64(a) - #else - // None of these atomics are available in a 32-bit Windows build!! - template static Type beast_InterlockedExchangeAdd64 (volatile Type* a, Type b) noexcept { bassertfalse; Type old = *a; *a += b; return old; } - template static Type beast_InterlockedExchange64 (volatile Type* a, Type b) noexcept { bassertfalse; Type old = *a; *a = b; return old; } - template static Type beast_InterlockedIncrement64 (volatile Type* a) noexcept { bassertfalse; return ++*a; } - template static Type beast_InterlockedDecrement64 (volatile Type* a) noexcept { bassertfalse; return --*a; } - #define BEAST_64BIT_ATOMICS_UNAVAILABLE 1 - #endif -#endif - -#if BEAST_MSVC - #pragma warning (push) - #pragma warning (disable: 4311) // (truncation warning) -#endif - -//============================================================================== -template -inline Type Atomic::get() const noexcept -{ - #if BEAST_ATOMICS_MAC - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) OSAtomicAdd32Barrier ((int32_t) 0, (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value)) - : castFrom64Bit ((std::int64_t) OSAtomicAdd64Barrier ((int64_t) 0, (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value)); - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedExchangeAdd ((volatile long*) &value, (long) 0)) - : castFrom64Bit ((std::int64_t) beast_InterlockedExchangeAdd64 ((volatile __int64*) &value, (__int64) 0)); - #elif BEAST_ATOMICS_GCC - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) __sync_add_and_fetch ((volatile std::int32_t*) &value, 0)) - : castFrom64Bit ((std::int64_t) __sync_add_and_fetch ((volatile std::int64_t*) &value, 0)); - #endif -} - -template -inline Type Atomic::exchange (const Type newValue) noexcept -{ - #if BEAST_ATOMICS_MAC || BEAST_ATOMICS_GCC - Type currentVal = value; - while (! compareAndSetBool (newValue, currentVal)) { currentVal = value; } - return currentVal; - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedExchange ((volatile long*) &value, (long) castTo32Bit (newValue))) - : castFrom64Bit ((std::int64_t) beast_InterlockedExchange64 ((volatile __int64*) &value, (__int64) castTo64Bit (newValue))); - #endif -} - -template -inline Type Atomic::operator+= (const Type amountToAdd) noexcept -{ - #if BEAST_ATOMICS_MAC -# ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast" -# pragma clang diagnostic ignored "-Wint-to-pointer-cast" -# endif - return sizeof (Type) == 4 ? (Type) OSAtomicAdd32Barrier ((int32_t) castTo32Bit (amountToAdd), (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value) - : (Type) OSAtomicAdd64Barrier ((int64_t) amountToAdd, (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value); -# ifdef __clang__ -# pragma clang diagnostic pop -# endif - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? (Type) (beast_InterlockedExchangeAdd ((volatile long*) &value, (long) amountToAdd) + (long) amountToAdd) - : (Type) (beast_InterlockedExchangeAdd64 ((volatile __int64*) &value, (__int64) amountToAdd) + (__int64) amountToAdd); - #elif BEAST_ATOMICS_GCC - return (Type) __sync_add_and_fetch (&value, amountToAdd); - #endif -} - -template -inline Type Atomic::operator-= (const Type amountToSubtract) noexcept -{ - return operator+= (negateValue (amountToSubtract)); -} - -template -inline Type Atomic::operator++() noexcept -{ - #if BEAST_ATOMICS_MAC -# ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast" -# pragma clang diagnostic ignored "-Wint-to-pointer-cast" -# endif - return sizeof (Type) == 4 ? (Type) OSAtomicIncrement32Barrier ((BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value) - : (Type) OSAtomicIncrement64Barrier ((BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value); -# ifdef __clang__ -# pragma clang diagnostic pop -# endif - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? (Type) beast_InterlockedIncrement ((volatile long*) &value) - : (Type) beast_InterlockedIncrement64 ((volatile __int64*) &value); - #elif BEAST_ATOMICS_GCC - return (Type) __sync_add_and_fetch (&value, (Type) 1); - #endif -} - -template -inline Type Atomic::operator--() noexcept -{ - #if BEAST_ATOMICS_MAC -# ifdef __clang__ -# pragma clang diagnostic push -# pragma clang diagnostic ignored "-Wint-to-void-pointer-cast" -# pragma clang diagnostic ignored "-Wint-to-pointer-cast" -# endif - return sizeof (Type) == 4 ? (Type) OSAtomicDecrement32Barrier ((BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value) - : (Type) OSAtomicDecrement64Barrier ((BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value); -# ifdef __clang__ -# pragma clang diagnostic pop -# endif - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? (Type) beast_InterlockedDecrement ((volatile long*) &value) - : (Type) beast_InterlockedDecrement64 ((volatile __int64*) &value); - #elif BEAST_ATOMICS_GCC - return (Type) __sync_add_and_fetch (&value, (Type) -1); - #endif -} - -template -inline bool Atomic::compareAndSetBool (const Type newValue, const Type valueToCompare) noexcept -{ - #if BEAST_ATOMICS_MAC - return sizeof (Type) == 4 ? OSAtomicCompareAndSwap32Barrier ((int32_t) castTo32Bit (valueToCompare), (int32_t) castTo32Bit (newValue), (BEAST_MAC_ATOMICS_VOLATILE int32_t*) &value) - : OSAtomicCompareAndSwap64Barrier ((int64_t) castTo64Bit (valueToCompare), (int64_t) castTo64Bit (newValue), (BEAST_MAC_ATOMICS_VOLATILE int64_t*) &value); - #elif BEAST_ATOMICS_WINDOWS - return compareAndSetValue (newValue, valueToCompare) == valueToCompare; - #elif BEAST_ATOMICS_GCC - return sizeof (Type) == 4 ? __sync_bool_compare_and_swap ((volatile std::int32_t*) &value, castTo32Bit (valueToCompare), castTo32Bit (newValue)) - : __sync_bool_compare_and_swap ((volatile std::int64_t*) &value, castTo64Bit (valueToCompare), castTo64Bit (newValue)); - #endif -} - -template -inline Type Atomic::compareAndSetValue (const Type newValue, const Type valueToCompare) noexcept -{ - #if BEAST_ATOMICS_MAC - for (;;) // Annoying workaround for only having a bool CAS operation.. - { - if (compareAndSetBool (newValue, valueToCompare)) - return valueToCompare; - - const Type result = value; - if (result != valueToCompare) - return result; - } - - #elif BEAST_ATOMICS_WINDOWS - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) beast_InterlockedCompareExchange ((volatile long*) &value, (long) castTo32Bit (newValue), (long) castTo32Bit (valueToCompare))) - : castFrom64Bit ((std::int64_t) beast_InterlockedCompareExchange64 ((volatile __int64*) &value, (__int64) castTo64Bit (newValue), (__int64) castTo64Bit (valueToCompare))); - #elif BEAST_ATOMICS_GCC - return sizeof (Type) == 4 ? castFrom32Bit ((std::int32_t) __sync_val_compare_and_swap ((volatile std::int32_t*) &value, castTo32Bit (valueToCompare), castTo32Bit (newValue))) - : castFrom64Bit ((std::int64_t) __sync_val_compare_and_swap ((volatile std::int64_t*) &value, castTo64Bit (valueToCompare), castTo64Bit (newValue))); - #endif -} - -inline void memoryBarrier() noexcept -{ - #if BEAST_ATOMICS_MAC - OSMemoryBarrier(); - #elif BEAST_ATOMICS_GCC - __sync_synchronize(); - #elif BEAST_ATOMICS_WINDOWS - beast_MemoryBarrier(); - #endif -} - -#if BEAST_MSVC - #pragma warning (pop) -#endif - -} - -#endif diff --git a/beast/Threads.h b/beast/Threads.h index 9f349151ae..73a7ec77b6 100644 --- a/beast/Threads.h +++ b/beast/Threads.h @@ -25,11 +25,9 @@ #include #include #include -#include #include #include #include -#include #include #include diff --git a/beast/module/core/core.h b/beast/module/core/core.h index c38a27f0c1..01fd5fa1ba 100644 --- a/beast/module/core/core.h +++ b/beast/module/core/core.h @@ -41,7 +41,6 @@ // New header-only library modeled more closely according to boost #include -#include #include #include #include diff --git a/beast/module/core/memory/SharedSingleton.h b/beast/module/core/memory/SharedSingleton.h index c7bc82ec3f..ff1b5a2500 100644 --- a/beast/module/core/memory/SharedSingleton.h +++ b/beast/module/core/memory/SharedSingleton.h @@ -24,6 +24,8 @@ #include #include +#include + namespace beast { @@ -99,7 +101,7 @@ public: bassert (lifetime == SingletonLifetime::createOnDemand || ! staticData.destructorCalled); staticData.instance = &staticData.object; new (staticData.instance) SharedSingleton (lifetime); - memoryBarrier(); + std::atomic_thread_fence (std::memory_order_seq_cst); instance = staticData.instance; } } diff --git a/beast/module/core/thread/Workers.cpp b/beast/module/core/thread/Workers.cpp index 4e41fd02ce..da4cf9f984 100644 --- a/beast/module/core/thread/Workers.cpp +++ b/beast/module/core/thread/Workers.cpp @@ -141,7 +141,7 @@ void Workers::deleteWorkers (LockFreeStack & stack) //------------------------------------------------------------------------------ -Workers::Worker::Worker (Workers& workers, String const& threadName) +Workers::Worker::Worker (Workers& workers, std::string const& threadName) : Thread (threadName) , m_workers (workers) { @@ -232,7 +232,6 @@ void Workers::Worker::run () class Workers_test : public unit_test::suite { public: - struct TestCallback : Workers::Callback { explicit TestCallback (int count_) @@ -251,13 +250,6 @@ public: std::atomic count; }; - template - bool - expectEquals (T1 const& t1, T2 const& t2) - { - return expect (t1 == t2); - } - void testThreads (int const threadCount) { testcase ("threadCount = " + std::to_string (threadCount)); @@ -276,14 +268,12 @@ public: // 10 seconds should be enough to finish on any system // bool signaled = cb.finished.wait (10 * 1000); - expect (signaled, "timed out"); w.pauseAllThreadsAndWait (); - int const count (cb.count.load ()); - - expectEquals (count, 0); + // We had better finished all our work! + expect (cb.count.load () == 0, "Did not complete task!"); } void run () diff --git a/beast/module/core/thread/Workers.h b/beast/module/core/thread/Workers.h index 56d5297d9a..07665f2393 100644 --- a/beast/module/core/thread/Workers.h +++ b/beast/module/core/thread/Workers.h @@ -122,7 +122,7 @@ private: , public Thread { public: - Worker (Workers& workers, String const& threadName); + Worker (Workers& workers, std::string const& threadName); ~Worker (); @@ -138,7 +138,7 @@ private: private: Callback& m_callback; - String m_threadNames; // The name to give each thread + std::string m_threadNames; // The name to give each thread WaitableEvent m_allPaused; // signaled when all threads paused semaphore m_semaphore; // each pending task is 1 resource int m_numberOfThreads; // how many we want active now diff --git a/beast/module/core/time/Time.cpp b/beast/module/core/time/Time.cpp index bbdf78c8bf..bbf9e38d24 100644 --- a/beast/module/core/time/Time.cpp +++ b/beast/module/core/time/Time.cpp @@ -247,31 +247,6 @@ std::uint32_t Time::getApproximateMillisecondCounter() noexcept return TimeHelpers::lastMSCounterValue; } -void Time::waitForMillisecondCounter (const std::uint32_t targetTime) noexcept -{ - for (;;) - { - const std::uint32_t now = getMillisecondCounter(); - - if (now >= targetTime) - break; - - const int toWait = (int) (targetTime - now); - - if (toWait > 2) - { - Thread::sleep (std::min (20, toWait >> 1)); - } - else - { - // xxx should consider using mutex_pause on the mac as it apparently - // makes it seem less like a spinlock and avoids lowering the thread pri. - for (int i = 10; --i >= 0;) - std::this_thread::yield(); - } - } -} - //============================================================================== double Time::highResolutionTicksToSeconds (const std::int64_t ticks) noexcept { diff --git a/beast/module/core/time/Time.h b/beast/module/core/time/Time.h index 13b0428546..ba367ad769 100644 --- a/beast/module/core/time/Time.h +++ b/beast/module/core/time/Time.h @@ -324,12 +324,6 @@ public: */ static double getMillisecondCounterHiRes() noexcept; - /** Waits until the getMillisecondCounter() reaches a given value. - - This will make the thread sleep as efficiently as it can while it's waiting. - */ - static void waitForMillisecondCounter (std::uint32_t targetTime) noexcept; - /** Less-accurate but faster version of getMillisecondCounter(). This will return the last value that getMillisecondCounter() returned, so doesn't diff --git a/beast/smart_ptr/SharedObject.h b/beast/smart_ptr/SharedObject.h index 5c0e5efbe1..bd1b6d0d9e 100644 --- a/beast/smart_ptr/SharedObject.h +++ b/beast/smart_ptr/SharedObject.h @@ -56,11 +56,10 @@ namespace beast { Once a new SharedObject has been assigned to a pointer, be careful not to delete the object manually. - This class uses an Atomic value to hold the reference count, so that it - the pointers can be passed between threads safely. For a faster but non-thread-safe - version, use SingleThreadedSharedObject instead. + This class uses an std::atomic value to hold the reference count, so + that the pointers can be passed between threads safely. - @see SharedPtr, SharedObjectArray, SingleThreadedSharedObject + @see SharedPtr, SharedObjectArray */ class SharedObject { diff --git a/beast/strings/CharPointer_UTF16.h b/beast/strings/CharPointer_UTF16.h index c0afdfbe78..caf2a8b7a9 100644 --- a/beast/strings/CharPointer_UTF16.h +++ b/beast/strings/CharPointer_UTF16.h @@ -25,10 +25,9 @@ #define BEAST_CHARPOINTER_UTF16_H_INCLUDED #include -#include - #include +#include #include namespace beast { @@ -474,7 +473,7 @@ public: /** Atomically swaps this pointer for a new value, returning the previous value. */ CharPointer_UTF16 atomicSwap (const CharPointer_UTF16 newValue) { - return CharPointer_UTF16 (reinterpret_cast &> (data).exchange (newValue.data)); + return CharPointer_UTF16 (reinterpret_cast &> (data).exchange (newValue.data)); } /** These values are the byte-order-mark (BOM) values for a UTF-16 stream. */ diff --git a/beast/strings/CharPointer_UTF32.h b/beast/strings/CharPointer_UTF32.h index 3bf0b440f7..326617d442 100644 --- a/beast/strings/CharPointer_UTF32.h +++ b/beast/strings/CharPointer_UTF32.h @@ -25,10 +25,9 @@ #define BEAST_CHARPOINTER_UTF32_H_INCLUDED #include -#include - #include +#include #include namespace beast { @@ -370,7 +369,7 @@ public: /** Atomically swaps this pointer for a new value, returning the previous value. */ CharPointer_UTF32 atomicSwap (const CharPointer_UTF32 newValue) { - return CharPointer_UTF32 (reinterpret_cast &> (data).exchange (newValue.data)); + return CharPointer_UTF32 (reinterpret_cast &> (data).exchange (newValue.data)); } private: diff --git a/beast/strings/CharPointer_UTF8.h b/beast/strings/CharPointer_UTF8.h index d5e83fc03f..2e24ea00a7 100644 --- a/beast/strings/CharPointer_UTF8.h +++ b/beast/strings/CharPointer_UTF8.h @@ -25,10 +25,9 @@ #define BEAST_CHARPOINTER_UTF8_H_INCLUDED #include -#include - #include +#include #include #include @@ -552,7 +551,7 @@ public: /** Atomically swaps this pointer for a new value, returning the previous value. */ CharPointer_UTF8 atomicSwap (const CharPointer_UTF8 newValue) { - return CharPointer_UTF8 (reinterpret_cast &> (data).exchange (newValue.data)); + return CharPointer_UTF8 (reinterpret_cast &> (data).exchange (newValue.data)); } /** These values are the byte-order mark (BOM) values for a UTF-8 stream. */ diff --git a/beast/strings/impl/String.cpp b/beast/strings/impl/String.cpp index 2d3ffc2831..c2c0fd1757 100644 --- a/beast/strings/impl/String.cpp +++ b/beast/strings/impl/String.cpp @@ -36,6 +36,7 @@ #include #include +#include namespace beast { @@ -86,8 +87,9 @@ public: //============================================================================== static CharPointerType createUninitialisedBytes (const size_t numBytes) { - StringHolder* const s = reinterpret_cast (new char [sizeof (StringHolder) - sizeof (CharType) + numBytes]); - s->refCount.value = 0; + StringHolder* const s = reinterpret_cast ( + new char [sizeof (StringHolder) - sizeof (CharType) + numBytes]); + s->refCount.store (0); s->allocatedNumBytes = numBytes; return CharPointerType (s->text); } @@ -198,7 +200,7 @@ public: { StringHolder* const b = bufferFromText (text); - if (b->refCount.get() <= 0) + if (b->refCount.load() <= 0) return text; CharPointerType newText (createUninitialisedBytes (b->allocatedNumBytes)); @@ -212,7 +214,7 @@ public: { StringHolder* const b = bufferFromText (text); - if (b->refCount.get() <= 0 && b->allocatedNumBytes >= numBytes) + if (b->refCount.load() <= 0 && b->allocatedNumBytes >= numBytes) return text; CharPointerType newText (createUninitialisedBytes (std::max (b->allocatedNumBytes, numBytes))); @@ -228,7 +230,7 @@ public: } //============================================================================== - Atomic refCount; + std::atomic refCount; size_t allocatedNumBytes; CharType text[1]; diff --git a/beast/threads/ServiceQueue.h b/beast/threads/ServiceQueue.h deleted file mode 100644 index 76a933db6c..0000000000 --- a/beast/threads/ServiceQueue.h +++ /dev/null @@ -1,623 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef BEAST_THREADS_SERVICEQUEUE_H_INCLUDED -#define BEAST_THREADS_SERVICEQUEUE_H_INCLUDED - -#include -#include -#include -#include - -#include - -namespace beast { - -namespace detail { - -//------------------------------------------------------------------------------ - -// VFALCO NOTE This allocator is a work in progress - -#if 0 - -class ServiceQueueAllocatorArena : public SharedObject -{ -public: - typedef std::size_t size_type; - - class Page : public LockFreeStack ::Node - { - public: - size_type const m_pageBytes; - Atomic m_refs; - char* m_pos; - bool m_full; - size_type m_count; - - static std::size_t overhead() - { return sizeof (Page); } - - static std::size_t pointer_overhead() - { return sizeof (Page*); } - - // pageBytes doesn't include the Page structure - explicit Page (size_type pageBytes) - : m_pageBytes (pageBytes) - , m_pos (begin()) - , m_full (false) - , m_count (0) - { - } - - ~Page () - { - // This means someone forgot to deallocate something - bassert (!m_full && m_refs.get() == 0); - } - - static Page* create (size_type pageBytes) - { - return new (new std::uint8_t[pageBytes + overhead()]) Page (pageBytes); - } - - static void destroy (Page* page) - { - page->~Page(); - delete[] ((std::uint8_t*)page); - } - - void reset () - { - m_refs.set (0); - m_pos = begin(); - m_full = false; - m_count = 0; - } - - bool full() const - { - return m_full; - } - - void* allocate (size_type n) - { - size_type const needed (n + pointer_overhead()); - char* pos = m_pos + needed; - if (pos > end()) - { - m_full = true; - return nullptr; - } - ++m_refs; - void* p (m_pos + pointer_overhead()); - get_page(p) = this; - m_pos = pos; - ++m_count; - return p; - } - - char* begin() const - { - return const_cast ( - reinterpret_cast (this) + overhead()); - } - - char const* end() const - { - return begin() + m_pageBytes; - } - - // Returns true if the page can be recycled - bool deallocate (void* p, size_type) - { - bool const unused ((--m_refs) == 0); - return unused && m_full; - } - - // Returns a reference to the per-allocation overhead area - static Page*& get_page (void* p) - { - return *reinterpret_cast ( - static_cast (p) - pointer_overhead()); - } - }; - - struct State - { - State() - { - } - - ~State() - { - // If this goes off, someone forgot to call deallocate! - bassert (full.get() == 0); - - destroy (active); - destroy (recycle); - } - - void destroy (LockFreeStack & stack) - { - for(;;) - { - Page* const page (stack.pop_front()); - if (page == nullptr) - break; - Page::destroy (page); - } - } - - Atomic full; - Atomic page_count; - LockFreeStack active; - LockFreeStack recycle; - }; - - typedef SharedData SharedState; - - size_type const m_maxBytes; - SharedState m_state; - - explicit ServiceQueueAllocatorArena (size_type maxBytes = 16 * 1024) - : m_maxBytes (maxBytes) - { - } - - ~ServiceQueueAllocatorArena() - { - } - - void* allocate (size_type n) - { - SharedState::UnlockedAccess state (m_state); - - // Loop until we satisfy the allocation from an - // active page, or we run out of active pages. - // - for (;;) - { - // Acquire ownership of an active page - // This prevents other threads from seeing it. - Page* page (state->active.pop_front()); - if (page == nullptr) - break; - - void* p = page->allocate (n); - if (p != nullptr) - { - // Put the page back so other threads can use it - state->active.push_front (page); - return p; - } - - // Page is full, count it for diagnostics - ++state->full; - } - - // No active page, get a recycled page or create a new page. - // - Page* page (state->recycle.pop_front()); - if (page == nullptr) - { - page = Page::create (std::max (m_maxBytes, n)); - ++state->page_count; - } - - void* p = page->allocate (n); - bassert (p != nullptr); - // Throw page into the active list so other threads can use it - state->active.push_front (page); - return p; - } - - void deallocate (void* p, size_type n) - { - SharedState::UnlockedAccess state (m_state); - Page* const page (Page::get_page(p)); - if (page->deallocate (p, n)) - { - --state->full; - page->reset(); - Page::destroy (page); - //state->recycle.push_front (page); - } - } -}; - -//------------------------------------------------------------------------------ - -template -struct ServiceQueueAllocator -{ - typedef T value_type; - typedef T* pointer; - typedef T& reference; - typedef T const* const_pointer; - typedef T const& const_reference; - typedef std::size_t size_type; - typedef std::ptrdiff_t difference_type; - - ServiceQueueAllocator () - : m_arena (new ServiceQueueAllocatorArena) - { - } - - ServiceQueueAllocator (ServiceQueueAllocator const& other) - : m_arena (other.m_arena) - { - } - - template - ServiceQueueAllocator (ServiceQueueAllocator const& other) - : m_arena (other.m_arena) - { - } - - template - struct rebind - { - typedef ServiceQueueAllocator other; - }; - - pointer address (reference x) const - { - return &x; - } - - const_pointer address (const_reference x) const - { - return &x; - } - - pointer allocate (size_type n, - std::allocator::const_pointer = nullptr) const - { - size_type const bytes (n * sizeof (value_type)); - pointer const p (static_cast ( - m_arena->allocate (bytes))); - return p; - } - - void deallocate (pointer p, size_type n) const - { - size_type const bytes = (n * sizeof (value_type)); - m_arena->deallocate (p, bytes); - } - - size_type max_size () const - { - return std::numeric_limits ::max () / sizeof (value_type); - } - - void construct (pointer p, const_reference val) const - { - new ((void *)p) value_type (val); - } - - void destroy (pointer p) const - { - p->~value_type (); - } - -private: - template - friend struct ServiceQueueAllocator; - - SharedPtr m_arena; -}; - -#endif - -} - -//------------------------------------------------------------------------------ - -class ServiceQueueBase -{ -public: - ServiceQueueBase(); - ~ServiceQueueBase(); - - std::size_t poll(); - std::size_t poll_one(); - std::size_t run(); - std::size_t run_one(); - void stop(); - bool stopped() const - { return m_stopped.get() != 0; } - void reset(); - -protected: - class Item; - class Waiter; - class ScopedServiceThread; - - void wait(); - virtual void enqueue (Item* item); - bool empty(); - - virtual std::size_t dequeue() = 0; - virtual Waiter* new_waiter() = 0; - - //-------------------------------------------------------------------------- - - class Item : public List ::Node - { - public: - virtual ~Item() { } - virtual void operator()() = 0; - virtual std::size_t size() const = 0; - }; - - //-------------------------------------------------------------------------- - - class Waiter : public List ::Node - { - public: - Waiter() - { } - void wait() - { m_event.wait(); } - void signal() - { m_event.signal(); } - private: - WaitableEvent m_event; - }; - - //-------------------------------------------------------------------------- - - struct State - { - // handlers - List handlers; - List waiting; - List unused; - }; - - typedef SharedData SharedState; - SharedState m_state; - Atomic m_stopped; - - static ThreadLocalValue s_service; -}; - -//------------------------------------------------------------------------------ - -/** A queue for disatching function calls on other threads. - Handlers are guaranteed to be called only from threads that are currently - calling run, run_one, poll, or poll_one. -*/ -template > -class ServiceQueueType : public ServiceQueueBase -{ -private: - using ServiceQueueBase::Item; - using ServiceQueueBase::Waiter; - - template - class ItemType : public Item - { - public: - explicit ItemType (BEAST_MOVE_ARG(Handler) handler) - : m_handler (BEAST_MOVE_CAST(Handler)(handler)) - { } - void operator() () - { m_handler(); } - std::size_t size() const - { return sizeof (*this); } - private: - Handler m_handler; - }; - -public: - typedef Allocator allocator_type; // for std::uses_allocator<> - - explicit ServiceQueueType (std::size_t expectedConcurrency = 1, - Allocator alloc = Allocator()) - : m_alloc (alloc) - { - typename Allocator::template rebind ::other a (m_alloc); - SharedState::Access state (m_state); - while (expectedConcurrency--) - state->unused.push_front ( - *new (a.allocate (1)) Waiter); - } - - ~ServiceQueueType() - { - SharedState::Access state (m_state); - - // Must be empty - //bassert (state->handlers.empty()); - - // Cannot destroy while threads are waiting - bassert (state->waiting.empty()); - - typename Allocator::template rebind ::other a (m_alloc); - while (! state->unused.empty ()) - { - Waiter* const waiter (&state->unused.front ()); - state->unused.pop_front (); - a.destroy (waiter); - a.deallocate (waiter, 1); - } - } - - /** Returns the allocator associated with the container. */ - allocator_type get_allocator() const - { - return m_alloc; - } - - /** Returns `true` if the current thread is processing events. - If the current thread of execution is inside a call to run, - run_one, poll, or poll_one, this function returns `true`. - */ - bool is_service_thread() const - { return s_service.get() == this; } - - /** Run the handler on a service thread. - If the current thread of execution is a service thread then this - function wil dispatch the handler on the caller's thread before - returning. - The function signature of the handler must be: - @code - void handler(); - @endcode - */ - template - void dispatch (BEAST_MOVE_ARG(Handler) handler) - { - if (is_service_thread()) - { - handler(); - } - else - { - typename Allocator::template rebind >::other a (m_alloc); - enqueue (new (a.allocate (1)) - ItemType (BEAST_MOVE_CAST(Handler)(handler))); - } - } - - /** Request the handler to run on a service thread. - This returns immediately, even if the current thread of execution is - a service thread. - The function signature of the handler must be: - @code - void handler(); - @endcode - */ - template - void post (BEAST_MOVE_ARG(Handler) handler) - { - typename Allocator::template rebind >::other a (m_alloc); - enqueue (new (a.allocate (1)) - ItemType (BEAST_MOVE_CAST(Handler)(handler))); - } - - /** Return a new handler that dispatches the wrapped handler on the queue. */ - template - detail::DispatchedHandler wrap ( - BEAST_MOVE_ARG(Handler) handler) - { - return detail::DispatchedHandler ( - *this, BEAST_MOVE_CAST(Handler)(handler)); - } - - /** Run the event loop to execute ready handlers. - This runs handlers that are ready to run, without blocking, until - there are no more handlers ready or the service queue has been stopped. - @return The number of handlers that were executed. - */ - std::size_t poll () - { return ServiceQueueBase::poll(); } - - /** Run the event loop to execute at most one ready handler. - This will run zero or one handlers, without blocking, depending on - whether or not there is handler immediately ready to run. - @return The number of handlers that were executed. - */ - std::size_t poll_one () - { return ServiceQueueBase::poll_one(); } - - /** Runs the queue's processing loop. - The current thread of execution becomes a service thread. This call - blocks until there is no more work remaining. - @return The number of handlers that were executed. - */ - std::size_t run () - { return ServiceQueueBase::run(); } - - /** Runs the queue's processing loop to execute at most one handler. - @return The number of handlers that were executed. - */ - std::size_t run_one () - { return ServiceQueueBase::run_one(); } - - /** Stop the queue's processing loop. - All threads executing run or run_one will return as soon as possible. - Future calls to run, run_one, poll, or poll_one will return immediately - until reset is called. - @see reset - */ - void stop() - { return ServiceQueueBase::stop(); } - - /** Returns `true` if the queue has been stopped. - When a queue is stopped, calls to run, run_one, poll, or poll_one will - return immediately without invoking any handlers. - */ - bool stopped() const - { return ServiceQueueBase::stopped(); } - - /** Reset the queue after a stop. - This allows the event loop to be restarted. This may not be called while - there are any threads currently executing the run, run_one, poll, or - poll_one functions, or undefined behavior will result. - */ - void reset() - { return ServiceQueueBase::reset(); } - -private: - // Dispatch a single queued handler if possible. - // Returns the number of handlers dispatched (0 or 1) - // - std::size_t dequeue () - { - if (stopped()) - return 0; - - Item* item (nullptr); - - { - SharedState::Access state (m_state); - if (state->handlers.empty()) - return 0; - item = &state->handlers.front(); - state->handlers.erase ( - state->handlers.iterator_to (*item)); - } - - (*item)(); - - typename Allocator::template rebind ::other a (m_alloc); - std::size_t const size (item->size()); - item->~Item(); - a.deallocate (reinterpret_cast(item), size); - return 1; - } - - // Create a new Waiter - Waiter* new_waiter() - { - typename Allocator::template rebind ::other a (m_alloc); - return new (a.allocate (1)) Waiter; - } - - Allocator m_alloc; -}; - -typedef ServiceQueueType > ServiceQueue; - -} - -#endif diff --git a/beast/threads/Thread.h b/beast/threads/Thread.h index 9bd8bf72fc..8c4dcbc69c 100644 --- a/beast/threads/Thread.h +++ b/beast/threads/Thread.h @@ -24,11 +24,12 @@ #ifndef BEAST_THREADS_THREAD_H_INCLUDED #define BEAST_THREADS_THREAD_H_INCLUDED -#include #include #include #include +#include + namespace beast { //============================================================================== @@ -39,9 +40,6 @@ namespace beast { do their business. The thread can then be started with the startThread() method and controlled with various other methods. - This class also contains some thread-related static methods, such - as sleep(), yield(), getCurrentThreadId() etc. - @see CriticalSection, WaitableEvent, Process, ThreadWithProgressWindow, MessageManagerLock */ @@ -55,7 +53,7 @@ public: When first created, the thread is not running. Use the startThread() method to start it. */ - explicit Thread (const String& threadName); + explicit Thread (std::string const& threadName); Thread (Thread const&) = delete; Thread& operator= (Thread const&) = delete; @@ -89,36 +87,12 @@ public: */ void startThread(); - /** Starts the thread with a given priority. - - Launches the thread with a given priority, where 0 = lowest, 10 = highest. - If the thread is already running, its priority will be changed. - - @see startThread, setPriority - */ - void startThread (int priority); - /** Attempts to stop the thread running. This method will cause the threadShouldExit() method to return true and call notify() in case the thread is currently waiting. - - Hopefully the thread will then respond to this by exiting cleanly, and - the stopThread method will wait for a given time-period for this to - happen. - - If the thread is stuck and fails to respond after the time-out, it gets - forcibly killed, which is a very bad thing to happen, as it could still - be holding locks, etc. which are needed by other parts of your program. - - @param timeOutMilliseconds The number of milliseconds to wait for the - thread to finish before killing it by force. A negative - value in here will wait forever. - @see signalThreadShouldExit, threadShouldExit, waitForThreadToExit, isThreadRunning - - @returns true if the thread exits, or false if the timeout expires first. */ - bool stopThread (int timeOutMilliseconds = -1); + void stopThread (); /** Stop the thread without blocking. This calls signalThreadShouldExit followed by notify. @@ -150,57 +124,13 @@ public: @see signalThreadShouldExit */ - inline bool threadShouldExit() const { return shouldExit; } + inline bool threadShouldExit() const { return shouldExit; } /** Waits for the thread to stop. - This will waits until isThreadRunning() is false or until a timeout expires. - - @param timeOutMilliseconds the time to wait, in milliseconds. If this value - is less than zero, it will wait forever. - @returns true if the thread exits, or false if the timeout expires first. + This will waits until isThreadRunning() is false. */ - bool waitForThreadToExit (int timeOutMilliseconds = -1) const; - - //============================================================================== - /** Changes the thread's priority. - May return false if for some reason the priority can't be changed. - - @param priority the new priority, in the range 0 (lowest) to 10 (highest). A priority - of 5 is normal. - */ - bool setPriority (int priority); - - /** Changes the priority of the caller thread. - - Similar to setPriority(), but this static method acts on the caller thread. - May return false if for some reason the priority can't be changed. - - @see setPriority - */ - static bool setCurrentThreadPriority (int priority); - - //============================================================================== - /** Sets the affinity mask for the thread. - - This will only have an effect next time the thread is started - i.e. if the - thread is already running when called, it'll have no effect. - - @see setCurrentThreadAffinityMask - */ - void setAffinityMask (std::uint32_t affinityMask); - - /** Changes the affinity mask for the caller thread. - - This will change the affinity mask for the thread that calls this static method. - - @see setAffinityMask - */ - static void setCurrentThreadAffinityMask (std::uint32_t affinityMask); - - //============================================================================== - // this can be called from any thread that needs to pause.. - static void sleep (int milliseconds); + void waitForThreadToExit () const; //============================================================================== /** Makes the thread wait for a notification. @@ -223,59 +153,24 @@ public: void notify() const; //============================================================================== - /** A value type used for thread IDs. - @see getCurrentThreadId(), getThreadId() - */ - typedef void* ThreadID; - - /** Returns an id that identifies the caller thread. - - To find the ID of a particular thread object, use getThreadId(). - - @returns a unique identifier that identifies the calling thread. - @see getThreadId - */ - static ThreadID getCurrentThreadId(); - - /** Finds the thread object that is currently running. - - Note that the main UI thread (or other non-Beast threads) don't have a Thread - object associated with them, so this will return 0. - */ - static Thread* getCurrentThread(); - - /** Returns the ID of this thread. - - That means the ID of this thread object - not of the thread that's calling the method. - - This can change when the thread is started and stopped, and will be invalid if the - thread's not actually running. - - @see getCurrentThreadId - */ - ThreadID getThreadId() const noexcept { return threadId; } - /** Returns the name of the thread. This is the name that gets set in the constructor. */ - const String& getThreadName() const { return threadName; } + std::string const& getThreadName() const { return threadName; } /** Changes the name of the caller thread. Different OSes may place different length or content limits on this name. */ - static void setCurrentThreadName (const String& newThreadName); + static void setCurrentThreadName (std::string const& newThreadName); private: //============================================================================== - const String threadName; + std::string const threadName; void* volatile threadHandle; - ThreadID threadId; RecursiveMutex startStopLock; WaitableEvent startSuspensionEvent, defaultEvent; - int threadPriority; - std::uint32_t affinityMask; bool volatile shouldExit; #ifndef DOXYGEN @@ -284,9 +179,7 @@ private: void launchThread(); void closeThreadHandle(); - void killThread(); void threadEntryPoint(); - static bool setThreadPriority (void*, int); }; } diff --git a/beast/threads/ThreadLocalValue.h b/beast/threads/ThreadLocalValue.h deleted file mode 100644 index 58b213431d..0000000000 --- a/beast/threads/ThreadLocalValue.h +++ /dev/null @@ -1,199 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Portions of this file are from JUCE. - Copyright (c) 2013 - Raw Material Software Ltd. - Please visit http://www.juce.com - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef BEAST_THREADS_THREADLOCALVALUE_H_INCLUDED -#define BEAST_THREADS_THREADLOCALVALUE_H_INCLUDED - -#include -#include -#include - -namespace beast { - -// (NB: on win32, native thread-locals aren't possible in a dynamically loaded DLL in XP). -#if ! ((BEAST_MSVC && (BEAST_64BIT || ! defined (BeastPlugin_PluginCode))) \ - || (BEAST_MAC && BEAST_CLANG && defined (MAC_OS_X_VERSION_10_7) \ - && MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_7)) - #define BEAST_NO_COMPILER_THREAD_LOCAL 1 -#endif - -//============================================================================== -/** - Provides cross-platform support for thread-local objects. - - This class holds an internal list of objects of the templated type, keeping - an instance for each thread that requests one. The first time a thread attempts - to access its value, an object is created and added to the list for that thread. - - Typically, you'll probably want to create a static instance of a ThreadLocalValue - object, or hold one within a singleton. - - The templated class for your value could be a primitive type, or any class that - has a default constructor and copy operator. - - When a thread no longer needs to use its value, it can call releaseCurrentThreadStorage() - to allow the storage to be re-used by another thread. If a thread exits without calling - this method, the object storage will be left allocated until the ThreadLocalValue object - is deleted. -*/ -template -class ThreadLocalValue -{ -public: - /** */ - ThreadLocalValue() = default; - ThreadLocalValue (ThreadLocalValue const&) = delete; - ThreadLocalValue& operator= (ThreadLocalValue const&) = delete; - - /** Destructor. - When this object is deleted, all the value objects for all threads will be deleted. - */ - ~ThreadLocalValue() - { - #if BEAST_NO_COMPILER_THREAD_LOCAL - for (ObjectHolder* o = first.value; o != nullptr;) - { - ObjectHolder* const next = o->next; - delete o; - o = next; - } - #endif - } - - /** Returns a reference to this thread's instance of the value. - Note that the first time a thread tries to access the value, an instance of the - value object will be created - so if your value's class has a non-trivial - constructor, be aware that this method could invoke it. - */ - Type& operator*() const noexcept { return get(); } - - /** Returns a pointer to this thread's instance of the value. - Note that the first time a thread tries to access the value, an instance of the - value object will be created - so if your value's class has a non-trivial - constructor, be aware that this method could invoke it. - */ - operator Type*() const noexcept { return &get(); } - - /** Accesses a method or field of the value object. - Note that the first time a thread tries to access the value, an instance of the - value object will be created - so if your value's class has a non-trivial - constructor, be aware that this method could invoke it. - */ - Type* operator->() const noexcept { return &get(); } - - /** Assigns a new value to the thread-local object. */ - ThreadLocalValue& operator= (const Type& newValue) { get() = newValue; return *this; } - - /** Returns a reference to this thread's instance of the value. - Note that the first time a thread tries to access the value, an instance of the - value object will be created - so if your value's class has a non-trivial - constructor, be aware that this method could invoke it. - */ - Type& get() const noexcept - { - #if BEAST_NO_COMPILER_THREAD_LOCAL - const Thread::ThreadID threadId = Thread::getCurrentThreadId(); - - for (ObjectHolder* o = first.get(); o != nullptr; o = o->next) - if (o->threadId == threadId) - return o->object; - - for (ObjectHolder* o = first.get(); o != nullptr; o = o->next) - { - if (o->threadId == nullptr) - { - { - SpinLock::ScopedLockType sl (lock); - - if (o->threadId != nullptr) - continue; - - o->threadId = threadId; - } - - o->object = Type(); - return o->object; - } - } - - ObjectHolder* const newObject = new ObjectHolder (threadId); - - do - { - newObject->next = first.get(); - } - while (! first.compareAndSetBool (newObject, newObject->next)); - - return newObject->object; - #elif BEAST_MAC - static __thread Type object; - return object; - #elif BEAST_MSVC - static __declspec(thread) Type object; - return object; - #endif - } - - /** Called by a thread before it terminates, to allow this class to release - any storage associated with the thread. - */ - void releaseCurrentThreadStorage() - { - #if BEAST_NO_COMPILER_THREAD_LOCAL - const Thread::ThreadID threadId = Thread::getCurrentThreadId(); - - for (ObjectHolder* o = first.get(); o != nullptr; o = o->next) - { - if (o->threadId == threadId) - { - SpinLock::ScopedLockType sl (lock); - o->threadId = nullptr; - } - } - #endif - } - -private: - //============================================================================== - #if BEAST_NO_COMPILER_THREAD_LOCAL - struct ObjectHolder - { - ObjectHolder (const Thread::ThreadID& tid) - : threadId (tid), object() - {} - ObjectHolder (ObjectHolder const&) = delete; - ObjectHolder& operator= (ObjectHolder const&) = delete; - - Thread::ThreadID threadId; - ObjectHolder* next; - Type object; - }; - - Atomic mutable first; - SpinLock mutable lock; - #endif -}; - -} - -#endif diff --git a/beast/threads/Threads.unity.cpp b/beast/threads/Threads.unity.cpp index 4348d1baa9..ddb7343798 100644 --- a/beast/threads/Threads.unity.cpp +++ b/beast/threads/Threads.unity.cpp @@ -22,10 +22,7 @@ #endif #include -#include #include #include #include #include - -#include diff --git a/beast/threads/impl/ServiceQueue.cpp b/beast/threads/impl/ServiceQueue.cpp deleted file mode 100644 index 44a662daf2..0000000000 --- a/beast/threads/impl/ServiceQueue.cpp +++ /dev/null @@ -1,186 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include - -namespace beast { - -class ServiceQueueBase::ScopedServiceThread : public List ::Node -{ -public: - explicit ScopedServiceThread (ServiceQueueBase* queue) - : m_saved (ServiceQueueBase::s_service.get()) - { - ServiceQueueBase::s_service.get() = queue; - } - - ~ScopedServiceThread() - { - ServiceQueueBase::s_service.get() = m_saved; - } - -private: - ServiceQueueBase* m_saved; -}; - -//------------------------------------------------------------------------------ - -ServiceQueueBase::ServiceQueueBase() -{ -} - -ServiceQueueBase::~ServiceQueueBase() -{ -} - -std::size_t ServiceQueueBase::poll () -{ - std::size_t total (0); - ScopedServiceThread thread (this); - for (;;) - { - std::size_t const n (dequeue()); - if (! n) - break; - total += n; - } - return total; -} - -std::size_t ServiceQueueBase::poll_one () -{ - ScopedServiceThread thread (this); - return dequeue(); -} - -std::size_t ServiceQueueBase::run () -{ - std::size_t total (0); - ScopedServiceThread thread (this); - while (! stopped()) - { - total += poll (); - wait (); - } - return total; -} - -std::size_t ServiceQueueBase::run_one () -{ - std::size_t n; - ScopedServiceThread (this); - for (;;) - { - n = poll_one(); - if (n != 0) - break; - wait(); - } - return n; -} - -void ServiceQueueBase::stop () -{ - SharedState::Access state (m_state); - m_stopped.set (1); - while (! state->waiting.empty ()) - { - Waiter& waiting (state->waiting.front()); - state->waiting.pop_front (); - waiting.signal (); - } -} - -void ServiceQueueBase::reset() -{ - // Must be stopped - bassert (m_stopped.get () != 0); - m_stopped.set (0); -} - -// Block on the event if there are no items -// in the queue and we are not stopped. -// -void ServiceQueueBase::wait () -{ - Waiter* waiter (nullptr); - - { - SharedState::Access state (m_state); - - if (stopped ()) - return; - - if (! state->handlers.empty()) - return; - - if (state->unused.empty ()) - { - waiter = new_waiter(); - } - else - { - waiter = &state->unused.front (); - state->unused.pop_front (); - } - - state->waiting.push_front (*waiter); - } - - waiter->wait(); - - // Waiter got taken off the waiting list - - { - SharedState::Access state (m_state); - state->unused.push_front (*waiter); - } -} - -void ServiceQueueBase::enqueue (Item* item) -{ - Waiter* waiter (nullptr); - - { - SharedState::Access state (m_state); - state->handlers.push_back (*item); - if (! state->waiting.empty ()) - { - waiter = &state->waiting.front (); - state->waiting.pop_front (); - } - } - - if (waiter != nullptr) - waiter->signal(); -} - -bool ServiceQueueBase::empty() -{ - SharedState::Access state (m_state); - return state->handlers.empty(); -} - -// A thread can only be blocked on one ServiceQueue so we store the pointer -// to which ServiceQueue it is blocked on to determine if the thread belongs -// to that queue. -// -ThreadLocalValue ServiceQueueBase::s_service; - -} diff --git a/beast/threads/impl/Thread.cpp b/beast/threads/impl/Thread.cpp index 0fa367eb3f..83696e3215 100644 --- a/beast/threads/impl/Thread.cpp +++ b/beast/threads/impl/Thread.cpp @@ -27,15 +27,13 @@ #include #include +#include namespace beast { -Thread::Thread (const String& threadName_) +Thread::Thread (std::string const& threadName_) : threadName (threadName_), threadHandle (nullptr), - threadId (0), - threadPriority (5), - affinityMask (0), shouldExit (false) { } @@ -55,53 +53,14 @@ Thread::~Thread() } //============================================================================== -// Use a ref-counted object to hold this shared data, so that it can outlive its static -// shared pointer when threads are still running during static shutdown. -struct CurrentThreadHolder : public SharedObject -{ - CurrentThreadHolder() noexcept {} - - typedef SharedPtr Ptr; - ThreadLocalValue value; -}; - -static char currentThreadHolderLock [sizeof (SpinLock)]; // (statically initialised to zeros). - -static SpinLock* castToSpinLockWithoutAliasingWarning (void* s) -{ - return static_cast (s); -} - -static CurrentThreadHolder::Ptr getCurrentThreadHolder() -{ - static CurrentThreadHolder::Ptr currentThreadHolder; - SpinLock::ScopedLockType lock (*castToSpinLockWithoutAliasingWarning (currentThreadHolderLock)); - - if (currentThreadHolder == nullptr) - currentThreadHolder = new CurrentThreadHolder(); - - return currentThreadHolder; -} - void Thread::threadEntryPoint() { - const CurrentThreadHolder::Ptr currentThreadHolder (getCurrentThreadHolder()); - currentThreadHolder->value = this; - - if (threadName.isNotEmpty()) + if (!threadName.empty ()) setCurrentThreadName (threadName); if (startSuspensionEvent.wait (10000)) - { - bassert (getCurrentThreadId() == threadId); - - if (affinityMask != 0) - setCurrentThreadAffinityMask (affinityMask); - run(); - } - currentThreadHolder->value.releaseCurrentThreadStorage(); closeThreadHandle(); } @@ -121,100 +80,37 @@ void Thread::startThread() if (threadHandle == nullptr) { launchThread(); - setThreadPriority (threadHandle, threadPriority); startSuspensionEvent.signal(); } } -void Thread::startThread (const int priority) -{ - const RecursiveMutex::ScopedLockType sl (startStopLock); - - if (threadHandle == nullptr) - { - threadPriority = priority; - startThread(); - } - else - { - setPriority (priority); - } -} - bool Thread::isThreadRunning() const { return threadHandle != nullptr; } -Thread* Thread::getCurrentThread() -{ - return getCurrentThreadHolder()->value.get(); -} - //============================================================================== void Thread::signalThreadShouldExit() { shouldExit = true; } -bool Thread::waitForThreadToExit (const int timeOutMilliseconds) const +void Thread::waitForThreadToExit () const { - // Doh! So how exactly do you expect this thread to wait for itself to stop?? - bassert (getThreadId() != getCurrentThreadId() || getCurrentThreadId() == 0); - - const std::uint32_t timeoutEnd = Time::getMillisecondCounter() + (std::uint32_t) timeOutMilliseconds; - while (isThreadRunning()) - { - if (timeOutMilliseconds >= 0 && Time::getMillisecondCounter() > timeoutEnd) - return false; - - sleep (2); - } - - return true; + std::this_thread::sleep_for (std::chrono::milliseconds (10)); } -bool Thread::stopThread (const int timeOutMilliseconds) +void Thread::stopThread () { - bool cleanExit = true; - - // agh! You can't stop the thread that's calling this method! How on earth - // would that work?? - bassert (getCurrentThreadId() != getThreadId()); - const RecursiveMutex::ScopedLockType sl (startStopLock); if (isThreadRunning()) { signalThreadShouldExit(); notify(); - - if (timeOutMilliseconds != 0) - { - cleanExit = waitForThreadToExit (timeOutMilliseconds); - } - - if (isThreadRunning()) - { - bassert (! cleanExit); - - // very bad karma if this point is reached, as there are bound to be - // locks and events left in silly states when a thread is killed by force.. - killThread(); - - threadHandle = nullptr; - threadId = 0; - - cleanExit = false; - } - else - { - cleanExit = true; - } + waitForThreadToExit (); } - - return cleanExit; } void Thread::stopThreadAsync () @@ -228,35 +124,6 @@ void Thread::stopThreadAsync () } } -//============================================================================== -bool Thread::setPriority (const int newPriority) -{ - // NB: deadlock possible if you try to set the thread prio from the thread itself, - // so using setCurrentThreadPriority instead in that case. - if (getCurrentThreadId() == getThreadId()) - return setCurrentThreadPriority (newPriority); - - const RecursiveMutex::ScopedLockType sl (startStopLock); - - if (setThreadPriority (threadHandle, newPriority)) - { - threadPriority = newPriority; - return true; - } - - return false; -} - -bool Thread::setCurrentThreadPriority (const int newPriority) -{ - return setThreadPriority (0, newPriority); -} - -void Thread::setAffinityMask (const std::uint32_t newAffinityMask) -{ - affinityMask = newAffinityMask; -} - //============================================================================== bool Thread::wait (const int timeOutMilliseconds) const { @@ -300,28 +167,15 @@ void Thread::launchThread() { unsigned int newThreadId; threadHandle = (void*) _beginthreadex (0, 0, &threadEntryProc, this, 0, &newThreadId); - threadId = (ThreadID) newThreadId; } void Thread::closeThreadHandle() { CloseHandle ((HANDLE) threadHandle); - threadId = 0; threadHandle = 0; } -void Thread::killThread() -{ - if (threadHandle != 0) - { - #if BEAST_DEBUG - OutputDebugStringA ("** Warning - Forced thread termination **\n"); - #endif - TerminateThread (threadHandle, 0); - } -} - -void Thread::setCurrentThreadName (const String& name) +void Thread::setCurrentThreadName (std::string const& name) { #if BEAST_DEBUG && BEAST_MSVC struct @@ -333,7 +187,7 @@ void Thread::setCurrentThreadName (const String& name) } info; info.dwType = 0x1000; - info.szName = name.toUTF8(); + info.szName = name.c_str (); info.dwThreadID = GetCurrentThreadId(); info.dwFlags = 0; @@ -348,70 +202,6 @@ void Thread::setCurrentThreadName (const String& name) #endif } -Thread::ThreadID Thread::getCurrentThreadId() -{ - return (ThreadID) (std::intptr_t) GetCurrentThreadId(); -} - -bool Thread::setThreadPriority (void* handle, int priority) -{ - int pri = THREAD_PRIORITY_TIME_CRITICAL; - - if (priority < 1) pri = THREAD_PRIORITY_IDLE; - else if (priority < 2) pri = THREAD_PRIORITY_LOWEST; - else if (priority < 5) pri = THREAD_PRIORITY_BELOW_NORMAL; - else if (priority < 7) pri = THREAD_PRIORITY_NORMAL; - else if (priority < 9) pri = THREAD_PRIORITY_ABOVE_NORMAL; - else if (priority < 10) pri = THREAD_PRIORITY_HIGHEST; - - if (handle == 0) - handle = GetCurrentThread(); - - return SetThreadPriority (handle, pri) != FALSE; -} - -void Thread::setCurrentThreadAffinityMask (const std::uint32_t affinityMask) -{ - SetThreadAffinityMask (GetCurrentThread(), affinityMask); -} - -struct SleepEvent -{ - SleepEvent() noexcept - : handle (CreateEvent (nullptr, FALSE, FALSE, - #if BEAST_DEBUG - _T("BEAST Sleep Event"))) - #else - nullptr)) - #endif - {} - - ~SleepEvent() noexcept - { - CloseHandle (handle); - handle = 0; - } - - HANDLE handle; -}; - -static SleepEvent sleepEvent; - -void Thread::sleep (const int millisecs) -{ - if (millisecs >= 10 || sleepEvent.handle == 0) - { - Sleep ((DWORD) millisecs); - } - else - { - // unlike Sleep() this is guaranteed to return to the current thread after - // the time expires, so we'll use this for short waits, which are more likely - // to need to be accurate - WaitForSingleObject (sleepEvent.handle, (DWORD) millisecs); - } -} - } //------------------------------------------------------------------------------ @@ -440,14 +230,6 @@ namespace beast{ namespace beast { -void Thread::sleep (int millisecs) -{ - struct timespec time; - time.tv_sec = millisecs / 1000; - time.tv_nsec = (millisecs % 1000) * 1000000; - nanosleep (&time, nullptr); -} - void beast_threadEntryPoint (void*); extern "C" void* threadEntryProcBeast (void*); @@ -480,108 +262,30 @@ void Thread::launchThread() { pthread_detach (handle); threadHandle = (void*) handle; - threadId = (ThreadID) threadHandle; } } void Thread::closeThreadHandle() { - threadId = 0; threadHandle = 0; } -void Thread::killThread() -{ - if (threadHandle != 0) - { - #if BEAST_ANDROID - bassertfalse; // pthread_cancel not available! - #else - pthread_cancel ((pthread_t) threadHandle); - #endif - } -} - -void Thread::setCurrentThreadName (const String& name) +void Thread::setCurrentThreadName (std::string const& name) { #if BEAST_IOS || (BEAST_MAC && defined (MAC_OS_X_VERSION_10_5) && MAC_OS_X_VERSION_MIN_REQUIRED >= MAC_OS_X_VERSION_10_5) BEAST_AUTORELEASEPOOL { - [[NSThread currentThread] setName: beastStringToNS (name)]; + [[NSThread currentThread] setName: beastStringToNS (beast::String (name))]; } #elif BEAST_LINUX #if (__GLIBC__ * 1000 + __GLIBC_MINOR__) >= 2012 - pthread_setname_np (pthread_self(), name.toRawUTF8()); + pthread_setname_np (pthread_self(), name.c_str ()); #else - prctl (PR_SET_NAME, name.toRawUTF8(), 0, 0, 0); + prctl (PR_SET_NAME, name.c_str (), 0, 0, 0); #endif #endif } -bool Thread::setThreadPriority (void* handle, int priority) -{ - struct sched_param param; - int policy; - priority = blimit (0, 10, priority); - - if (handle == nullptr) - handle = (void*) pthread_self(); - - if (pthread_getschedparam ((pthread_t) handle, &policy, ¶m) != 0) - return false; - - policy = priority == 0 ? SCHED_OTHER : SCHED_RR; - - const int minPriority = sched_get_priority_min (policy); - const int maxPriority = sched_get_priority_max (policy); - - param.sched_priority = ((maxPriority - minPriority) * priority) / 10 + minPriority; - return pthread_setschedparam ((pthread_t) handle, policy, ¶m) == 0; -} - -Thread::ThreadID Thread::getCurrentThreadId() -{ - return (ThreadID) pthread_self(); -} - -//============================================================================== -/* Remove this macro if you're having problems compiling the cpu affinity - calls (the API for these has changed about quite a bit in various Linux - versions, and a lot of distros seem to ship with obsolete versions) -*/ -#if defined (CPU_ISSET) && ! defined (SUPPORT_AFFINITIES) - #define SUPPORT_AFFINITIES 1 -#endif - -void Thread::setCurrentThreadAffinityMask (const std::uint32_t affinityMask) -{ - #if SUPPORT_AFFINITIES - cpu_set_t affinity; - CPU_ZERO (&affinity); - - for (int i = 0; i < 32; ++i) - if ((affinityMask & (1 << i)) != 0) - CPU_SET (i, &affinity); - - /* - N.B. If this line causes a compile error, then you've probably not got the latest - version of glibc installed. - - If you don't want to update your copy of glibc and don't care about cpu affinities, - then you can just disable all this stuff by setting the SUPPORT_AFFINITIES macro to 0. - */ - sched_setaffinity (getpid(), sizeof (cpu_set_t), &affinity); - sched_yield(); - - #else - /* affinities aren't supported because either the appropriate header files weren't found, - or the SUPPORT_AFFINITIES macro was turned off - */ - bassertfalse; - (void) affinityMask; - #endif -} - } //------------------------------------------------------------------------------ diff --git a/beast/threads/tests/ServiceQueue.cpp b/beast/threads/tests/ServiceQueue.cpp deleted file mode 100644 index 708bb80432..0000000000 --- a/beast/threads/tests/ServiceQueue.cpp +++ /dev/null @@ -1,295 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include - -#include - -#include -#include - -#include -#include - -namespace beast { - -class ServiceQueue_timing_test : public unit_test::suite -{ -public: - class Stopwatch - { - public: - Stopwatch () { start(); } - void start () { m_startTime = Time::getHighResolutionTicks (); } - double getElapsed () - { - std::int64_t const now = Time::getHighResolutionTicks(); - return Time::highResolutionTicksToSeconds (now - m_startTime); - } - private: - std::int64_t m_startTime; - }; - - static int const callsPerThread = 50000; - - //-------------------------------------------------------------------------- - - template - struct Consumer : Thread - { - ServiceType& m_service; - Random m_random; - String m_string; - - Consumer (int id, std::int64_t seedValue, ServiceType& service) - : Thread ("C#" + String::fromNumber (id)) - , m_service (service) - , m_random (seedValue) - { startThread(); } - - ~Consumer () - { stopThread(); } - - static Consumer*& thread() - { - static ThreadLocalValue local; - return local.get(); - } - - static void stop_one () - { thread()->signalThreadShouldExit(); } - - static void handler () - { thread()->do_handler(); } - - void do_handler() - { - String const s (String::fromNumber (m_random.nextInt())); - m_string += s; - if (m_string.length() > 100) - m_string = String::empty; - } - - void run () - { - thread() = this; - while (! threadShouldExit()) - m_service.run_one(); - } - }; - - //-------------------------------------------------------------------------- - - template - struct Producer : Thread - { - ServiceType& m_service; - Random m_random; - String m_string; - - Producer (int id, std::int64_t seedValue, ServiceType& service) - : Thread ("P#" + String::fromNumber (id)) - , m_service (service) - , m_random (seedValue) - { } - - ~Producer () - { stopThread(); } - - void run () - { - for (std::size_t i = 0; i < callsPerThread; ++i) - { - String const s (String::fromNumber (m_random.nextInt())); - m_string += s; - if (m_string.length() > 100) - m_string = String::empty; - m_service.dispatch (std::bind (&Consumer::handler)); - } - } - }; - - //-------------------------------------------------------------------------- - - template - void testThreads (int nConsumers, int nProducers) - { - std::stringstream ss; - ss << - nConsumers << " consumers, " << - nProducers << " producers, Allocator = " << - typeid(Allocator).name(); - testcase (ss.str()); - - typedef ServiceQueueType ServiceType; - - ServiceType service (nConsumers); - std::vector > > consumers; - std::vector > > producers; - consumers.reserve (nConsumers); - producers.reserve (nProducers); - - Random r; - - for (int i = 0; i < nConsumers; ++i) - consumers.emplace_back (new Consumer (i + 1, - r.nextInt64(), service)); - - for (int i = 0; i < nProducers; ++i) - producers.emplace_back (new Producer (i + 1, - r.nextInt64(), service)); - - Stopwatch t; - - for (std::size_t i = 0; i < producers.size(); ++i) - producers[i]->startThread(); - - for (std::size_t i = 0; i < producers.size(); ++i) - producers[i]->waitForThreadToExit(); - - for (std::size_t i = 0; i < consumers.size(); ++i) - service.dispatch (std::bind (&Consumer ::stop_one)); - - for (std::size_t i = 0; i < consumers.size(); ++i) - consumers[i]->waitForThreadToExit(); - - double const seconds (t.getElapsed()); - log << seconds << " seconds"; - - pass(); - } - - void run() - { -#if 1 - testThreads > (1, 1); - testThreads > (1, 4); - testThreads > (1, 16); - testThreads > (4, 1); - testThreads > (8, 16); -#endif - -#if 0 - testThreads > (1, 1); - testThreads > (1, 4); - testThreads > (1, 16); - testThreads > (4, 1); - testThreads > (8, 16); -#endif - } -}; - -BEAST_DEFINE_TESTSUITE_MANUAL(ServiceQueue_timing,threads,beast); - -//------------------------------------------------------------------------------ - -class ServiceQueue_test : public unit_test::suite -{ -public: - struct ServiceThread : Thread - { - Random m_random; - ServiceQueue& m_service; - String m_string; - - ServiceThread (int id, std::int64_t seedValue, - ServiceQueue& service) - : Thread ("#" + String::fromNumber (id)) - , m_random (seedValue) - , m_service (service) - { - startThread(); - } - - ~ServiceThread () - { - stopThread(); - } - - static ServiceThread*& thread() - { - static ThreadLocalValue local; - return local.get(); - } - - static void stop_one () - { - thread()->signalThreadShouldExit(); - } - - static void handler () - { - thread()->do_handler(); - } - - void do_handler() - { -#if 1 - String const s (String::fromNumber (m_random.nextInt())); - m_string += s; - if (m_string.length() > 100) - m_string = String::empty; -#endif - } - - void run () - { - thread() = this; - while (! threadShouldExit()) - m_service.run_one(); - } - }; - - static std::size_t const totalCalls = 10000; - - void testThreads (int n) - { - std::stringstream ss; - ss << n << " threads"; - testcase (ss.str()); - - Random r; - std::size_t const callsPerThread (totalCalls / n); - - ServiceQueue service (n); - std::vector > threads; - threads.reserve (n); - for (int i = 0; i < n; ++i) - threads.emplace_back (new ServiceThread (i + 1, - r.nextInt64(), service)); - for (std::size_t i = n * callsPerThread; i; --i) - service.dispatch (std::bind (&ServiceThread::handler)); - for (std::size_t i = 0; i < threads.size(); ++i) - service.dispatch (std::bind (&ServiceThread::stop_one)); - for (std::size_t i = 0; i < threads.size(); ++i) - threads[i]->waitForThreadToExit(); - pass(); - } - - void run() - { - testThreads (1); - testThreads (4); - testThreads (16); - } -}; - -BEAST_DEFINE_TESTSUITE(ServiceQueue,threads,beast); - -}