diff --git a/Builds/VisualStudio2012/beast.vcxproj b/Builds/VisualStudio2012/beast.vcxproj index f9fcc44d06..f39a231529 100644 --- a/Builds/VisualStudio2012/beast.vcxproj +++ b/Builds/VisualStudio2012/beast.vcxproj @@ -329,6 +329,7 @@ + @@ -1212,6 +1213,12 @@ true true + + true + true + true + true + true true diff --git a/Builds/VisualStudio2012/beast.vcxproj.filters b/Builds/VisualStudio2012/beast.vcxproj.filters index 77c34968df..da38087a55 100644 --- a/Builds/VisualStudio2012/beast.vcxproj.filters +++ b/Builds/VisualStudio2012/beast.vcxproj.filters @@ -38,40 +38,40 @@ _meta - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser beast_core - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 @@ -207,61 +207,61 @@ {27052a76-e315-4725-9d9a-1233c7d71aba} - + {92d1bb42-289a-4444-85c7-cb87540f2fff} - + {8832eb52-53f9-4850-8dc9-1d579a386a0e} - + {5904368f-a0f2-4d26-a031-8cbe4448dc3f} - + {5faa76ea-5691-4e63-8833-577f92991356} - + {93670bc9-a748-42bd-8118-8de30c468b16} - + {85158eb2-9340-4b3d-a136-f7631c7f1b7c} - + {56d34c67-7027-44ba-9f09-4591ce4afb36} - + {775ab0d6-aa5f-43d7-ab3b-3c01652a9ef1} - + {da8084c0-491b-4eb0-b750-97182a9deed4} - + {56ef157f-ad92-4da7-8fbf-00723f769732} - + {565f012b-42b7-42c9-81b7-9e93aa378000} - + {7eead15d-f9dc-4b4d-a653-57d9c090e697} - + {233e3c4d-e398-4c11-a42c-3483107eb8e9} - + {8d80e304-a42d-411a-9528-811eddff3191} - + {eabf472c-e198-409a-a65b-7c087ae911d0} - + {1fff3bd8-44ae-41df-8dd4-8bb6f07b2908} - + {9c1ef4c4-5623-4500-859f-12d6ce5ae362} - + {fc3d3f14-9ba1-43e4-b086-cbbd2f63b944} - + {44489531-f44a-439a-a6ea-d32c252b1e8b} @@ -825,28 +825,28 @@ beast_core\containers - beast_asio\async\beast\intrusive + beast\intrusive - beast_asio\async\beast\intrusive + beast\intrusive - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast\mpl + beast\mpl beast_core\memory @@ -972,7 +972,7 @@ beast_asio\http - beast_asio\async\beast\mpl + beast\mpl beast_asio\basics @@ -1011,64 +1011,64 @@ beast_core\memory - beast_asio\async\beast\net + beast\net - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast\type_traits + beast\type_traits - beast_asio\async\beast\type_traits + beast\type_traits - beast_asio\async\beast\type_traits + beast\type_traits - beast_asio\async\beast\type_traits + beast\type_traits - beast_asio\async\beast\utility + beast\utility - beast_asio\async\beast\utility + beast\utility - beast_asio\async\beast + beast - beast_asio\async\beast\mpl + beast\mpl - beast_asio\async\beast + beast - beast_asio\async\beast\thread + beast\thread - beast_asio\async\beast\thread + beast\thread - beast_asio\async\beast\thread + beast\thread - beast_asio\async\beast\thread + beast\thread - beast_asio\async\beast + beast beast_asio\http @@ -1080,116 +1080,116 @@ beast_asio\basics - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast\intrusive + beast\intrusive - beast_asio\async\beast\intrusive + beast\intrusive - beast_asio\async\beast\mpl + beast\mpl beast_core\thread - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast + beast - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast + beast - beast_asio\async\beast\config + beast\config - beast_asio\async\beast\config + beast\config - beast_asio\async\beast\config + beast\config - beast_asio\async\beast + beast - beast_asio\async\beast\config + beast\config - beast_asio\async\beast\config + beast\config - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast - beast_asio\async\beast + beast beast_core\system - beast_asio\async\beast\http + beast\http - beast_asio\async\beast\http + beast\http - beast_asio\async\beast + beast - beast_asio\async\beast\intrusive + beast\intrusive - beast_asio\async\beast + beast - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 - beast_asio\async\beast\crypto + beast\crypto beast_asio\async @@ -1197,6 +1197,9 @@ beast_asio\async + + beast_core\thread + @@ -1698,16 +1701,16 @@ beast_extras - beast_asio\async\beast\net\impl + beast\net\impl - beast_asio\async\beast\net + beast\net - beast_asio\async\beast\utility + beast\utility - beast_asio\async\beast\utility\impl + beast\utility\impl beast_asio\http @@ -1716,52 +1719,55 @@ beast_core\thread - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser + beast\http\impl\http-parser - beast_asio\async\beast\http\impl\http-parser\contrib + beast\http\impl\http-parser\contrib - beast_asio\async\beast\http\impl\http-parser\contrib + beast\http\impl\http-parser\contrib - beast_asio\async\beast\http + beast\http - beast_asio\async\beast\http\impl + beast\http\impl - beast_asio\async\beast\strings\impl + beast\strings\impl - beast_asio\async\beast\strings\impl + beast\strings\impl - beast_asio\async\beast\strings + beast\strings - beast_asio\async\beast\http\impl + beast\http\impl - beast_asio\async\beast\http\impl + beast\http\impl - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 - beast_asio\async\beast\crypto\impl\sha2 + beast\crypto\impl\sha2 - beast_asio\async\beast\crypto + beast\crypto - beast_asio\async\beast\crypto\impl + beast\crypto\impl + + + beast_core\thread diff --git a/modules/beast_core/beast_core.cpp b/modules/beast_core/beast_core.cpp index 8766591f71..0fa15dc8d8 100644 --- a/modules/beast_core/beast_core.cpp +++ b/modules/beast_core/beast_core.cpp @@ -290,6 +290,8 @@ namespace beast } +#include "thread/ServiceQueue.cpp" + // Has to be outside the beast namespace extern "C" { void beast_reportFatalError (char const* message, char const* fileName, int lineNumber) diff --git a/modules/beast_core/beast_core.h b/modules/beast_core/beast_core.h index 16f9df5463..82f93005b5 100644 --- a/modules/beast_core/beast_core.h +++ b/modules/beast_core/beast_core.h @@ -268,6 +268,8 @@ class FileOutputStream; } +#include "thread/ServiceQueue.h" + #if BEAST_MSVC #pragma warning (pop) #endif diff --git a/modules/beast_core/thread/ServiceQueue.cpp b/modules/beast_core/thread/ServiceQueue.cpp new file mode 100644 index 0000000000..e03b59071c --- /dev/null +++ b/modules/beast_core/thread/ServiceQueue.cpp @@ -0,0 +1,443 @@ +//------------------------------------------------------------------------------ +/* + This file is part of Beast: https://github.com/vinniefalco/Beast + Copyright 2013, Vinnie Falco + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +namespace beast { + +namespace detail { + +//------------------------------------------------------------------------------ + +class ServiceQueueBase::ScopedServiceThread : public List ::Node +{ +public: + explicit ScopedServiceThread (ServiceQueueBase* queue) + : m_saved (ServiceQueueBase::s_service.get()) + { + ServiceQueueBase::s_service.get() = queue; + } + + ~ScopedServiceThread() + { + ServiceQueueBase::s_service.get() = m_saved; + } + +private: + ServiceQueueBase* m_saved; +}; + +//------------------------------------------------------------------------------ + +ServiceQueueBase::ServiceQueueBase() +{ +} + +ServiceQueueBase::~ServiceQueueBase() +{ +} + +std::size_t ServiceQueueBase::poll () +{ + std::size_t total (0); + ScopedServiceThread thread (this); + for (;;) + { + std::size_t const n (dequeue()); + if (! n) + break; + total += n; + } + return total; +} + +std::size_t ServiceQueueBase::poll_one () +{ + ScopedServiceThread thread (this); + return dequeue(); +} + +std::size_t ServiceQueueBase::run () +{ + std::size_t total (0); + ScopedServiceThread thread (this); + while (! stopped()) + { + total += poll (); + wait (); + } + return total; +} + +std::size_t ServiceQueueBase::run_one () +{ + std::size_t n; + ScopedServiceThread (this); + for (;;) + { + n = poll_one(); + if (n != 0) + break; + wait(); + } + return n; +} + +void ServiceQueueBase::stop () +{ + SharedState::Access state (m_state); + m_stopped.set (1); + for(;;) + { + Waiter* waiting (state->waiting.pop_front()); + if (waiting == nullptr) + break; + waiting->signal(); + } +} + +void ServiceQueueBase::reset() +{ + // Must be stopped + bassert (m_stopped.get () != 0); + m_stopped.set (0); +} + +// Block on the event if there are no items +// in the queue and we are not stopped. +// +void ServiceQueueBase::wait () +{ + Waiter* waiter (nullptr); + + { + SharedState::Access state (m_state); + if (stopped ()) + return; + if (! state->handlers.empty()) + return; + waiter = state->unused.pop_front(); + if (! waiter) + waiter = new_waiter(); + state->waiting.push_front (waiter); + } + + waiter->wait(); + + // Waiter got taken off the waiting list + + { + SharedState::Access state (m_state); + state->unused.push_front (waiter); + } +} + +void ServiceQueueBase::enqueue (Item* item) +{ + Waiter* waiter; + + { + SharedState::Access state (m_state); + state->handlers.push_back (*item); + // Signal a Waiter if one exists + waiter = state->waiting.pop_front(); + } + + if (waiter != nullptr) + waiter->signal(); +} + +// A thread can only be blocked on one ServiceQueue so we store the pointer +// to which ServiceQueue it is blocked on to determine if the thread belongs +// to that queue. +// +ThreadLocalValue ServiceQueueBase::s_service; + +} + +//------------------------------------------------------------------------------ + +namespace detail +{ + +//------------------------------------------------------------------------------ + +class ServiceQueueTimingTests + : public UnitTest +{ +public: + class Stopwatch + { + public: + Stopwatch () { start(); } + void start () { m_startTime = Time::getHighResolutionTicks (); } + double getElapsed () + { + int64 const now = Time::getHighResolutionTicks(); + return Time::highResolutionTicksToSeconds (now - m_startTime); + } + private: + int64 m_startTime; + }; + + static int const callsPerThread = 50000; + + //-------------------------------------------------------------------------- + + template + struct Consumer : Thread + { + ServiceType& m_service; + Random m_random; + String m_string; + + Consumer (int id, int64 seedValue, ServiceType& service) + : Thread ("C#" + String::fromNumber (id)) + , m_service (service) + , m_random (seedValue) + { startThread(); } + + ~Consumer () + { stopThread(); } + + static Consumer*& thread() + { + static ThreadLocalValue local; + return local.get(); + } + + static void stop_one () + { thread()->signalThreadShouldExit(); } + + static void handler () + { thread()->do_handler(); } + + void do_handler() + { + String const s (String::fromNumber (m_random.nextInt())); + m_string += s; + if (m_string.length() > 100) + m_string = String::empty; + } + + void run () + { + thread() = this; + while (! threadShouldExit()) + m_service.run_one(); + } + }; + + //-------------------------------------------------------------------------- + + template + struct Producer : Thread + { + ServiceType& m_service; + Random m_random; + String m_string; + + Producer (int id, int64 seedValue, ServiceType& service) + : Thread ("P#" + String::fromNumber (id)) + , m_service (service) + , m_random (seedValue) + { } + + ~Producer () + { stopThread(); } + + void run () + { + for (std::size_t i = 0; i < callsPerThread; ++i) + { + String const s (String::fromNumber (m_random.nextInt())); + m_string += s; + if (m_string.length() > 100) + m_string = String::empty; + m_service.dispatch (bind (&Consumer::handler)); + } + } + }; + + //-------------------------------------------------------------------------- + + template + void testThreads (std::size_t nConsumers, std::size_t nProducers) + { + beginTestCase (String::fromNumber (nConsumers) + " consumers, " + + String::fromNumber (nProducers) + " producers, " + + "Allocator = " + std::string(typeid(Allocator).name())); + + typedef ServiceQueueType ServiceType; + + ServiceType service (nConsumers); + std::vector > > consumers; + std::vector > > producers; + consumers.reserve (nConsumers); + producers.reserve (nProducers); + + for (std::size_t i = 0; i < nConsumers; ++i) + consumers.push_back (new Consumer (i + 1, + random().nextInt64(), service)); + + for (std::size_t i = 0; i < nProducers; ++i) + producers.push_back (new Producer (i + 1, + random().nextInt64(), service)); + + Stopwatch t; + + for (std::size_t i = 0; i < producers.size(); ++i) + producers[i]->startThread(); + + for (std::size_t i = 0; i < producers.size(); ++i) + producers[i]->waitForThreadToExit(); + + for (std::size_t i = 0; i < consumers.size(); ++i) + service.dispatch (bind (&Consumer ::stop_one)); + + for (std::size_t i = 0; i < consumers.size(); ++i) + consumers[i]->waitForThreadToExit(); + + double const seconds (t.getElapsed()); + logMessage (String (seconds, 2) + " seconds"); + + pass(); + } + + void runTest() + { +#if 1 + testThreads > (1, 1); + testThreads > (1, 4); + testThreads > (1, 16); + testThreads > (4, 1); + testThreads > (8, 16); +#endif + +#if 0 + testThreads > (1, 1); + testThreads > (1, 4); + testThreads > (1, 16); + testThreads > (4, 1); + testThreads > (8, 16); +#endif + } + + ServiceQueueTimingTests () : UnitTest ("ServiceQueueTiming", "beast", runManual) + { + } +}; + +static ServiceQueueTimingTests serviceQueueTimingTests; + +//------------------------------------------------------------------------------ + +class ServiceQueueTests + : public UnitTest +{ +public: + struct ServiceThread : Thread + { + Random m_random; + ServiceQueue& m_service; + String m_string; + + ServiceThread (int id, int64 seedValue, + ServiceQueue& service) + : Thread ("#" + String::fromNumber (id)) + , m_random (seedValue) + , m_service (service) + { + startThread(); + } + + ~ServiceThread () + { + stopThread(); + } + + static ServiceThread*& thread() + { + static ThreadLocalValue local; + return local.get(); + } + + static void stop_one () + { + thread()->signalThreadShouldExit(); + } + + static void handler () + { + thread()->do_handler(); + } + + void do_handler() + { +#if 1 + String const s (String::fromNumber (m_random.nextInt())); + m_string += s; + if (m_string.length() > 100) + m_string = String::empty; +#endif + } + + void run () + { + thread() = this; + while (! threadShouldExit()) + m_service.run_one(); + } + }; + + static int const callsPerThread = 10000; + + void testThreads (std::size_t n) + { + beginTestCase (String::fromNumber (n) + " threads"); + ServiceQueue service (n); + std::vector > threads; + threads.reserve (n); + for (std::size_t i = 0; i < n; ++i) + threads.push_back (new ServiceThread (i + 1, + random().nextInt64(), service)); + for (std::size_t i = n * callsPerThread; i; --i) + service.dispatch (bind (&ServiceThread::handler)); + for (std::size_t i = 0; i < threads.size(); ++i) + service.dispatch (bind (&ServiceThread::stop_one)); + for (std::size_t i = 0; i < threads.size(); ++i) + threads[i]->waitForThreadToExit(); + pass(); + } + + void runTest() + { + testThreads (1); + testThreads (4); + testThreads (16); + } + + ServiceQueueTests () : UnitTest ("ServiceQueue", "beast") + { + } +}; + +static ServiceQueueTests serviceQueueTests; + +} + +} diff --git a/modules/beast_core/thread/ServiceQueue.h b/modules/beast_core/thread/ServiceQueue.h new file mode 100644 index 0000000000..de921f0e41 --- /dev/null +++ b/modules/beast_core/thread/ServiceQueue.h @@ -0,0 +1,606 @@ +//------------------------------------------------------------------------------ +/* + This file is part of Beast: https://github.com/vinniefalco/Beast + Copyright 2013, Vinnie Falco + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef BEAST_SERVICEQUEUE_H_INCLUDED +#define BEAST_SERVICEQUEUE_H_INCLUDED + +namespace beast { + +namespace detail { + +//------------------------------------------------------------------------------ + +// VFALCO NOTE This allocator is a work in progress + +#if 0 + +class ServiceQueueAllocatorArena : public SharedObject +{ +public: + typedef std::size_t size_type; + + class Page : public LockFreeStack ::Node + { + public: + size_type const m_pageBytes; + Atomic m_refs; + char* m_pos; + bool m_full; + size_type m_count; + + static std::size_t overhead() + { return sizeof (Page); } + + static std::size_t pointer_overhead() + { return sizeof (Page*); } + + // pageBytes doesn't include the Page structure + explicit Page (size_type pageBytes) + : m_pageBytes (pageBytes) + , m_pos (begin()) + , m_full (false) + , m_count (0) + { + } + + ~Page () + { + // This means someone forgot to deallocate something + bassert (!m_full && m_refs.get() == 0); + } + + static Page* create (size_type pageBytes) + { + return new (new uint8[pageBytes + overhead()]) Page (pageBytes); + } + + static void destroy (Page* page) + { + page->~Page(); + delete[] ((uint8*)page); + } + + void reset () + { + m_refs.set (0); + m_pos = begin(); + m_full = false; + m_count = 0; + } + + bool full() const + { + return m_full; + } + + void* allocate (size_type n) + { + size_type const needed (n + pointer_overhead()); + char* pos = m_pos + needed; + if (pos > end()) + { + m_full = true; + return nullptr; + } + ++m_refs; + void* p (m_pos + pointer_overhead()); + get_page(p) = this; + m_pos = pos; + ++m_count; + return p; + } + + char* begin() const + { + return const_cast ( + reinterpret_cast (this) + overhead()); + } + + char const* end() const + { + return begin() + m_pageBytes; + } + + // Returns true if the page can be recycled + bool deallocate (void* p, size_type) + { + bool const unused ((--m_refs) == 0); + return unused && m_full; + } + + // Returns a reference to the per-allocation overhead area + static Page*& get_page (void* p) + { + return *reinterpret_cast ( + static_cast (p) - pointer_overhead()); + } + }; + + struct State + { + State() + { + } + + ~State() + { + // If this goes off, someone forgot to call deallocate! + bassert (full.get() == 0); + + destroy (active); + destroy (recycle); + } + + void destroy (LockFreeStack & stack) + { + for(;;) + { + Page* const page (stack.pop_front()); + if (page == nullptr) + break; + Page::destroy (page); + } + } + + Atomic full; + Atomic page_count; + LockFreeStack active; + LockFreeStack recycle; + }; + + typedef SharedData SharedState; + + size_type const m_maxBytes; + SharedState m_state; + + explicit ServiceQueueAllocatorArena (size_type maxBytes = 16 * 1024) + : m_maxBytes (maxBytes) + { + } + + ~ServiceQueueAllocatorArena() + { + } + + void* allocate (size_type n) + { + SharedState::UnlockedAccess state (m_state); + + // Loop until we satisfy the allocation from an + // active page, or we run out of active pages. + // + for (;;) + { + // Acquire ownership of an active page + // This prevents other threads from seeing it. + Page* page (state->active.pop_front()); + if (page == nullptr) + break; + + void* p = page->allocate (n); + if (p != nullptr) + { + // Put the page back so other threads can use it + state->active.push_front (page); + return p; + } + + // Page is full, count it for diagnostics + ++state->full; + } + + // No active page, get a recycled page or create a new page. + // + Page* page (state->recycle.pop_front()); + if (page == nullptr) + { + page = Page::create (std::max (m_maxBytes, n)); + ++state->page_count; + } + + void* p = page->allocate (n); + bassert (p != nullptr); + // Throw page into the active list so other threads can use it + state->active.push_front (page); + return p; + } + + void deallocate (void* p, size_type n) + { + SharedState::UnlockedAccess state (m_state); + Page* const page (Page::get_page(p)); + if (page->deallocate (p, n)) + { + --state->full; + page->reset(); + Page::destroy (page); + //state->recycle.push_front (page); + } + } +}; + +//------------------------------------------------------------------------------ + +template +struct ServiceQueueAllocator +{ + typedef T value_type; + typedef T* pointer; + typedef T& reference; + typedef T const* const_pointer; + typedef T const& const_reference; + typedef std::size_t size_type; + typedef std::ptrdiff_t difference_type; + + ServiceQueueAllocator () + : m_arena (new ServiceQueueAllocatorArena) + { + } + + ServiceQueueAllocator (ServiceQueueAllocator const& other) + : m_arena (other.m_arena) + { + } + + template + ServiceQueueAllocator (ServiceQueueAllocator const& other) + : m_arena (other.m_arena) + { + } + + template + struct rebind + { + typedef ServiceQueueAllocator other; + }; + + pointer address (reference x) const + { + return &x; + } + + const_pointer address (const_reference x) const + { + return &x; + } + + pointer allocate (size_type n, + std::allocator::const_pointer = nullptr) const + { + size_type const bytes (n * sizeof (value_type)); + pointer const p (static_cast ( + m_arena->allocate (bytes))); + return p; + } + + void deallocate (pointer p, size_type n) const + { + size_type const bytes = (n * sizeof (value_type)); + m_arena->deallocate (p, bytes); + } + + size_type max_size () const + { + return std::numeric_limits ::max () / sizeof (value_type); + } + + void construct (pointer p, const_reference val) const + { + new ((void *)p) value_type (val); + } + + void destroy (pointer p) const + { + p->~value_type (); + } + +private: + template + friend struct ServiceQueueAllocator; + + SharedPtr m_arena; +}; + +#endif + +//------------------------------------------------------------------------------ + +class ServiceQueueBase +{ +public: + ServiceQueueBase(); + ~ServiceQueueBase(); + + std::size_t poll(); + std::size_t poll_one(); + std::size_t run(); + std::size_t run_one(); + void stop(); + bool stopped() const + { return m_stopped.get() != 0; } + void reset(); + +protected: + class Item; + class Waiter; + class ScopedServiceThread; + + void wait(); + void enqueue (Item* item); + + virtual std::size_t dequeue() = 0; + virtual Waiter* new_waiter() = 0; + + //-------------------------------------------------------------------------- + + class Item : public List ::Node + { + public: + virtual ~Item() { } + virtual void operator()() = 0; + virtual std::size_t size() const = 0; + }; + + //-------------------------------------------------------------------------- + + class Waiter : public LockFreeStack ::Node + { + public: + Waiter() + { } + void wait() + { m_event.wait(); } + void signal() + { m_event.signal(); } + private: + WaitableEvent m_event; + }; + + //-------------------------------------------------------------------------- + + struct State + { + // handlers + List handlers; + LockFreeStack waiting; + LockFreeStack unused; + }; + + typedef SharedData SharedState; + SharedState m_state; + Atomic m_stopped; + + static ThreadLocalValue s_service; +}; + +} + +//------------------------------------------------------------------------------ + +/** A queue for disatching function calls on other threads. + Handlers are guaranteed to be called only from threads that are currently + calling run, run_one, poll, or poll_one. +*/ +template > +class ServiceQueueType : public detail::ServiceQueueBase +{ +private: + using ServiceQueueBase::Item; + using ServiceQueueBase::Waiter; + + template + class ItemType : public Item + { + public: + explicit ItemType (BEAST_MOVE_ARG(Handler) handler) + : m_handler (BEAST_MOVE_CAST(Handler)(handler)) + { } + void operator() () + { m_handler(); } + std::size_t size() const + { return sizeof (*this); } + private: + Handler m_handler; + }; + +public: + typedef Allocator allocator_type; // for std::uses_allocator<> + + explicit ServiceQueueType (int expectedConcurrency = 1, + Allocator alloc = Allocator()) + : m_alloc (alloc) + { + typename Allocator::template rebind ::other a (m_alloc); + SharedState::Access state (m_state); + while (expectedConcurrency--) + { + state->unused.push_front (new (a.allocate (1)) Waiter); + } + } + + ~ServiceQueueType() + { + SharedState::Access state (m_state); + + // Must be empty + bassert (state->handlers.empty()); + bassert (state->waiting.empty()); + + typename Allocator::template rebind ::other a (m_alloc); + for(;;) + { + Waiter* const waiter (state->unused.pop_front()); + if (waiter == nullptr) + break; + a.destroy (waiter); + a.deallocate (waiter, 1); + } + } + + /** Returns the allocator associated with the container. */ + allocator_type get_allocator() const + { + return m_alloc; + } + + /** Returns `true` if the current thread is processing events. + If the current thread of execution is inside a call to run, + run_one, poll, or poll_one, this function returns `true`. + */ + bool is_service_thread() const + { return s_service.get() == this; } + + /** Run the handler on a service thread. + If the current thread of execution is a service thread then this + function wil dispatch the handler on the caller's thread before + returning. + The function signature of the handler must be: + @code + void handler(); + @endcode + */ + template + void dispatch (BEAST_MOVE_ARG(Handler) handler) + { + if (is_service_thread()) + { + handler(); + } + else + { + typename Allocator::template rebind >::other a (m_alloc); + enqueue (new (a.allocate (1)) + ItemType (BEAST_MOVE_CAST(Handler)(handler))); + } + } + + /** Request the handler to run on a service thread. + This returns immediately, even if the current thread of execution is + a service thread. + The function signature of the handler must be: + @code + void handler(); + @endcode + */ + template + void post (BEAST_MOVE_ARG(Handler) handler) + { + typename Allocator::template rebind >::other a (m_alloc); + enqueue (new (a.allocate (1)) + ItemType (BEAST_MOVE_CAST(Handler)(handler))); + } + + /** Run the event loop to execute ready handlers. + This runs handlers that are ready to run, without blocking, until + there are no more handlers ready or the service queue has been stopped. + @return The number of handlers that were executed. + */ + std::size_t poll () + { return ServiceQueueBase::poll(); } + + /** Run the event loop to execute at most one ready handler. + This will run zero or one handlers, without blocking, depending on + whether or not there is handler immediately ready to run. + @return The number of handlers that were executed. + */ + std::size_t poll_one () + { return ServiceQueueBase::poll_one(); } + + /** Runs the queue's processing loop. + The current thread of execution becomes a service thread. This call + blocks until there is no more work remaining. + @return The number of handlers that were executed. + */ + std::size_t run () + { return ServiceQueueBase::run(); } + + /** Runs the queue's processing loop to execute at most one handler. + @return The number of handlers that were executed. + */ + std::size_t run_one () + { return ServiceQueueBase::run_one(); } + + /** Stop the queue's processing loop. + All threads executing run or run_one will return as soon as possible. + Future calls to run, run_one, poll, or poll_one will return immediately + until reset is called. + @see reset + */ + void stop() + { return ServiceQueueBase::stop(); } + + /** Returns `true` if the queue has been stopped. + When a queue is stopped, calls to run, run_one, poll, or poll_one will + return immediately without invoking any handlers. + */ + bool stopped() const + { return ServiceQueueBase::stopped(); } + + /** Reset the queue after a stop. + This allows the event loop to be restarted. This may not be called while + there are any threads currently executing the run, run_one, poll, or + poll_one functions, or undefined behavior will result. + */ + void reset() + { return ServiceQueueBase::reset(); } + +private: + // Dispatch a single queued handler if possible. + // Returns the number of handlers dispatched (0 or 1) + // + std::size_t dequeue () + { + if (stopped()) + return 0; + + Item* item (nullptr); + + { + SharedState::Access state (m_state); + if (state->handlers.empty()) + return 0; + item = &state->handlers.front(); + state->handlers.erase ( + state->handlers.iterator_to (*item)); + } + + (*item)(); + + typename Allocator::template rebind ::other a (m_alloc); + std::size_t const size (item->size()); + item->~Item(); + a.deallocate (reinterpret_cast(item), size); + return 1; + } + + // Create a new Waiter + Waiter* new_waiter() + { + typename Allocator::template rebind ::other a (m_alloc); + return new (a.allocate (1)) Waiter; + } + + Allocator m_alloc; +}; + +typedef ServiceQueueType > ServiceQueue; + +} + +#endif