diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp index b4fa54ec9..81b956121 100644 --- a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.cpp @@ -96,6 +96,8 @@ void Workers::pauseAllThreadsAndWait () setNumberOfThreads (0); m_allPaused.wait (); + + bassert (numberOfCurrentlyRunningTasks () == 0); } void Workers::addTask () @@ -103,6 +105,11 @@ void Workers::addTask () m_semaphore.signal (); } +int Workers::numberOfCurrentlyRunningTasks () const noexcept +{ + return m_runningTaskCount.get (); +} + void Workers::deleteWorkers (LockFreeStack & stack) { for (;;) @@ -176,7 +183,9 @@ void Workers::Worker::run () // We couldn't pause so we must have gotten // unblocked in order to process a task. // + ++m_workers.m_runningTaskCount; m_workers.m_callback.processTask (); + --m_workers.m_runningTaskCount; } // Any worker that goes into the paused list must diff --git a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h index 4d2d9e9a9..f7be8f67d 100644 --- a/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h +++ b/Subtrees/beast/modules/beast_basics/threads/beast_Workers.h @@ -29,7 +29,12 @@ public: struct Callback { /** Perform a task. - The call is made on a thread owned by Workers. + + The call is made on a thread owned by Workers. It is important + that you only process one task from inside your callback. Each + call to addTask will result in exactly one call to processTask. + + @see Workers::addTask */ virtual void processTask () = 0; }; @@ -68,12 +73,22 @@ public: */ void pauseAllThreadsAndWait (); - /** Increment the number of tasks. - The callback will be called for each task. + /** Add a task to be performed. + + Every call to addTask will eventually result in a call to + Callback::processTask unless the Workers object is destroyed or + the number of threads is never set above zero. + @note This function is thread-safe. */ void addTask (); + /** Get the number of currently executing calls of Callback::processTask. + While this function is thread-safe, the value may not stay + accurate for very long. It's mainly for diagnostic purposes. + */ + int numberOfCurrentlyRunningTasks () const noexcept; + //-------------------------------------------------------------------------- private: @@ -87,7 +102,6 @@ private: Idle: Active, but blocked on waiting for a task. Pausd: Blocked waiting to exit or become active. */ - class Worker : public LockFreeStack ::Node , public LockFreeStack ::Node @@ -115,6 +129,7 @@ private: int m_numberOfThreads; // how many we want active now Atomic m_activeCount; // to know when all are paused Atomic m_pauseCount; // how many threads need to pause now + Atomic m_runningTaskCount; // how many calls to processTask() active LockFreeStack m_everyone; // holds all created workers LockFreeStack m_paused; // holds just paused workers };