diff --git a/modules/beast_core/thread/beast_CallQueue.cpp b/modules/beast_core/thread/beast_CallQueue.cpp index 438c6beec..98ddfcd33 100644 --- a/modules/beast_core/thread/beast_CallQueue.cpp +++ b/modules/beast_core/thread/beast_CallQueue.cpp @@ -151,3 +151,155 @@ bool CallQueue::doSynchronize () return did_something; } + +//------------------------------------------------------------------------------ + +class CallQueueTests : public UnitTest +{ +public: + enum + { + callsPerThread = 50000 + }; + + struct TestQueue : CallQueue + { + explicit TestQueue (Thread& thread) + : CallQueue (thread.getThreadName()) + , m_thread (thread) + { + } + + void synchronize () + { + CallQueue::synchronize(); + } + + void signal () + { + m_thread.notify (); + } + + void reset () + { + } + + void close () + { + CallQueue::close(); + } + + Thread& m_thread; + }; + + struct TestThread : Thread + { + explicit TestThread (int id) + : Thread ("#" + String::fromNumber (id)) + , m_queue (*this) + { + startThread (); + } + + void stop () + { + m_queue.call (&TestThread::doStop, this); + m_queue.close (); + } + + void doStop () + { + signalThreadShouldExit (); + notify (); + } + + void func1 () + { + m_string = m_string + String::fromNumber (m_random.nextInt(10)); + if (m_string.length() > 100) + m_string = String::empty; + } + + void func2 () + { + m_string = m_string + String::fromNumber (m_random.nextInt()); + if (m_string.length() > 100) + m_string = String::empty; + } + + void func3 () + { + m_string = m_string + m_string; + if (m_string.length() > 100) + m_string = m_string.substring (m_random.nextInt (30)); + } + + void run () + { + while (! threadShouldExit ()) + { + wait (); + + m_queue.synchronize(); + } + } + + Random m_random; + TestQueue m_queue; + String m_string; + }; + + void testThreads (std::size_t nThreads) + { + beginTestCase (String::fromNumber (nThreads) + " threads"); + + OwnedArray threads; + threads.ensureStorageAllocated (nThreads); + + for (std::size_t i = 0; i < nThreads; ++i) + threads.add (new TestThread (i + 1)); + + Time const startTime (Time::getCurrentTime()); + + for (std::size_t i = 0; i < callsPerThread * nThreads; ++i) + { + int const n (random().nextInt (threads.size())); + int const f (random().nextInt (3)); + switch (f) + { + default: + bassertfalse; + case 0: threads[n]->m_queue.call (&TestThread::func1, threads[n]); break; + case 1: threads[n]->m_queue.call (&TestThread::func2, threads[n]); break; + case 2: threads[n]->m_queue.call (&TestThread::func3, threads[n]); break; + }; + } + + for (std::size_t i = 0; i < threads.size(); ++i) + threads[i]->stop (); + + for (std::size_t i = 0; i < threads.size(); ++i) + threads[i]->waitForThreadToExit(); + + double const secondsElapsed ((Time::getCurrentTime() - startTime).inSeconds ()); + + pass (); + + std::size_t const totalCalls (callsPerThread * nThreads); + double const callsPerSecond = (totalCalls / secondsElapsed); + logMessage (String::fromNumber (callsPerSecond + 0.5, 0) + " calls/second (in " + + String::fromNumber (secondsElapsed, 1) + " seconds)"); + } + + void runTest () + { + testThreads (8); + testThreads (64); + } + + CallQueueTests () : UnitTest ("CallQueue", "beast") + { + } +}; + +static CallQueueTests callQueueTests; diff --git a/modules/beast_core/thread/beast_ThreadWithCallQueue.cpp b/modules/beast_core/thread/beast_ThreadWithCallQueue.cpp index be9ccbe60..20dcc0ce3 100644 --- a/modules/beast_core/thread/beast_ThreadWithCallQueue.cpp +++ b/modules/beast_core/thread/beast_ThreadWithCallQueue.cpp @@ -165,10 +165,13 @@ public: { explicit TestThread (int id) : ThreadWithCallQueue ("#" + String::fromNumber (id)) + , interruptedOnce (false) { start (this); } + bool interruptedOnce; + void threadInit () { } @@ -187,7 +190,10 @@ public: { interrupted = interruptionPoint (); if (interrupted) + { + interruptedOnce = true; break; + } s = s + String::fromNumber (m_random.nextInt ()); @@ -195,6 +201,8 @@ public: s = String::empty; } + bassert (interrupted); + return interrupted; } @@ -243,6 +251,11 @@ public: }; } + for (int i = 0; i < threads.size(); ++i) + { + expect (threads[i]->interruptedOnce); + } + for (int i = 0; i < threads.size(); ++i) threads[i]->stop (false); for (int i = 0; i < threads.size(); ++i)