diff --git a/CMakeLists.txt b/CMakeLists.txt index c4f52368bb..460d55c914 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1312,7 +1312,6 @@ else () target_sources (xrpl_core PRIVATE src/ripple/beast/core/CurrentThreadName.cpp src/ripple/beast/core/SemanticVersion.cpp - src/ripple/beast/core/WaitableEvent.cpp src/ripple/beast/hash/impl/xxhash.cpp src/ripple/beast/insight/impl/Collector.cpp src/ripple/beast/insight/impl/Groups.cpp diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index dd6f93857b..ef00d57a39 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -60,10 +60,12 @@ #include #include #include -#include -#include -#include +#include #include +#include +#include +#include +#include namespace ripple { @@ -368,7 +370,10 @@ public: std::vector > websocketServers_; boost::asio::signal_set m_signals; - beast::WaitableEvent m_stop; + + std::condition_variable cv_; + std::mutex mut_; + bool isTimeToStop = false; std::atomic checkSigs_; @@ -1474,7 +1479,10 @@ ApplicationImp::run() getLoadManager ().activateDeadlockDetector (); } - m_stop.wait (); + { + std::unique_lock lk{mut_}; + cv_.wait(lk, [this]{return isTimeToStop;}); + } // Stop the server. When this returns, all // Stoppable objects should be stopped. @@ -1489,7 +1497,9 @@ ApplicationImp::signalStop() { // Unblock the main thread (which is sitting in run()). // - m_stop.signal(); + std::lock_guard lk{mut_}; + isTimeToStop = true; + cv_.notify_all(); } bool diff --git a/src/ripple/basics/impl/ResolverAsio.cpp b/src/ripple/basics/impl/ResolverAsio.cpp index 6dfc2e5500..e60b125045 100644 --- a/src/ripple/basics/impl/ResolverAsio.cpp +++ b/src/ripple/basics/impl/ResolverAsio.cpp @@ -21,13 +21,14 @@ #include #include #include -#include #include #include #include +#include #include #include #include +#include namespace ripple { @@ -110,7 +111,10 @@ public: boost::asio::io_service::strand m_strand; boost::asio::ip::tcp::resolver m_resolver; - beast::WaitableEvent m_stop_complete; + std::condition_variable m_cv; + std::mutex m_mut; + bool m_asyncHandlersCompleted; + std::atomic m_stop_called; std::atomic m_stopped; @@ -139,7 +143,7 @@ public: , m_io_service (io_service) , m_strand (io_service) , m_resolver (io_service) - , m_stop_complete (true, true) + , m_asyncHandlersCompleted (true) , m_stop_called (false) , m_stopped (true) { @@ -156,7 +160,9 @@ public: // AsyncObject void asyncHandlersComplete() { - m_stop_complete.signal (); + std::unique_lock lk{m_mut}; + m_asyncHandlersCompleted = true; + m_cv.notify_all(); } //-------------------------------------------------------------------------- @@ -172,7 +178,10 @@ public: if (m_stopped.exchange (false) == true) { - m_stop_complete.reset (); + { + std::lock_guard lk{m_mut}; + m_asyncHandlersCompleted = false; + } addReference (); } } @@ -194,7 +203,9 @@ public: stop_async (); JLOG(m_journal.debug()) << "Waiting to stop"; - m_stop_complete.wait(); + std::unique_lock lk{m_mut}; + m_cv.wait(lk, [this]{return m_asyncHandlersCompleted;}); + lk.unlock(); JLOG(m_journal.debug()) << "Stopped"; } diff --git a/src/ripple/beast/core/WaitableEvent.cpp b/src/ripple/beast/core/WaitableEvent.cpp deleted file mode 100644 index f09648179c..0000000000 --- a/src/ripple/beast/core/WaitableEvent.cpp +++ /dev/null @@ -1,171 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Portions of this file are from JUCE. - Copyright (c) 2013 - Raw Material Software Ltd. - Please visit http://www.juce.com - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include - -#if BEAST_WINDOWS - -#include -#undef check -#undef direct -#undef max -#undef min -#undef TYPE_BOOL - -namespace beast { - -WaitableEvent::WaitableEvent (const bool manualReset, bool initiallySignaled) - : handle (CreateEvent (0, manualReset ? TRUE : FALSE, initiallySignaled ? TRUE : FALSE, 0)) -{ -} - -WaitableEvent::~WaitableEvent() -{ - CloseHandle (handle); -} - -void WaitableEvent::signal() const -{ - SetEvent (handle); -} - -void WaitableEvent::reset() const -{ - ResetEvent (handle); -} - -bool WaitableEvent::wait () const -{ - return WaitForSingleObject (handle, INFINITE) == WAIT_OBJECT_0; -} - -bool WaitableEvent::wait (const int timeOutMs) const -{ - if (timeOutMs >= 0) - return WaitForSingleObject (handle, - (DWORD) timeOutMs) == WAIT_OBJECT_0; - return wait (); -} - -} - -#else - -#include - -namespace beast { - -WaitableEvent::WaitableEvent (const bool useManualReset, bool initiallySignaled) - : triggered (false), manualReset (useManualReset) -{ - pthread_cond_init (&condition, 0); - - pthread_mutexattr_t atts; - pthread_mutexattr_init (&atts); - #if ! BEAST_ANDROID - pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT); - #endif - pthread_mutex_init (&mutex, &atts); - - if (initiallySignaled) - signal (); -} - -WaitableEvent::~WaitableEvent() -{ - pthread_cond_destroy (&condition); - pthread_mutex_destroy (&mutex); -} - -bool WaitableEvent::wait () const -{ - return wait (-1); -} - -bool WaitableEvent::wait (const int timeOutMillisecs) const -{ - pthread_mutex_lock (&mutex); - - if (! triggered) - { - if (timeOutMillisecs < 0) - { - do - { - pthread_cond_wait (&condition, &mutex); - } - while (! triggered); - } - else - { - struct timeval now; - gettimeofday (&now, 0); - - struct timespec time; - time.tv_sec = now.tv_sec + (timeOutMillisecs / 1000); - time.tv_nsec = (now.tv_usec + ((timeOutMillisecs % 1000) * 1000)) * 1000; - - if (time.tv_nsec >= 1000000000) - { - time.tv_nsec -= 1000000000; - time.tv_sec++; - } - - do - { - if (pthread_cond_timedwait (&condition, &mutex, &time) == ETIMEDOUT) - { - pthread_mutex_unlock (&mutex); - return false; - } - } - while (! triggered); - } - } - - if (! manualReset) - triggered = false; - - pthread_mutex_unlock (&mutex); - return true; -} - -void WaitableEvent::signal() const -{ - pthread_mutex_lock (&mutex); - triggered = true; - pthread_cond_broadcast (&condition); - pthread_mutex_unlock (&mutex); -} - -void WaitableEvent::reset() const -{ - pthread_mutex_lock (&mutex); - triggered = false; - pthread_mutex_unlock (&mutex); -} - -} - -#endif diff --git a/src/ripple/beast/core/WaitableEvent.h b/src/ripple/beast/core/WaitableEvent.h deleted file mode 100644 index 604c2f8ef6..0000000000 --- a/src/ripple/beast/core/WaitableEvent.h +++ /dev/null @@ -1,124 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of Beast: https://github.com/vinniefalco/Beast - Copyright 2013, Vinnie Falco - - Portions of this file are from JUCE. - Copyright (c) 2013 - Raw Material Software Ltd. - Please visit http://www.juce.com - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef BEAST_THREADS_WAITABLEEVENT_H_INCLUDED -#define BEAST_THREADS_WAITABLEEVENT_H_INCLUDED - -#include - -#if ! BEAST_WINDOWS -#include -#endif - -namespace beast { - -/** Allows threads to wait for events triggered by other threads. - A thread can call wait() on a WaitableEvent, and this will suspend the - calling thread until another thread wakes it up by calling the signal() - method. -*/ -class WaitableEvent -{ -public: - //============================================================================== - /** Creates a WaitableEvent object. - - @param manualReset If this is false, the event will be reset automatically when the wait() - method is called. If manualReset is true, then once the event is signalled, - the only way to reset it will be by calling the reset() method. - - @param initiallySignaled If this is true then the event will be signaled when - the constructor returns. - */ - explicit WaitableEvent (bool manualReset = false, bool initiallySignaled = false); - - /** Destructor. - - If other threads are waiting on this object when it gets deleted, this - can cause nasty errors, so be careful! - */ - ~WaitableEvent(); - - WaitableEvent (WaitableEvent const&) = delete; - WaitableEvent& operator= (WaitableEvent const&) = delete; - - //============================================================================== - /** Suspends the calling thread until the event has been signalled. - - This will wait until the object's signal() method is called by another thread, - or until the timeout expires. - - After the event has been signalled, this method will return true and if manualReset - was set to false in the WaitableEvent's constructor, then the event will be reset. - - @param timeOutMilliseconds the maximum time to wait, in milliseconds. A negative - value will cause it to wait forever. - - @returns true if the object has been signalled, false if the timeout expires first. - @see signal, reset - */ - /** @{ */ - bool wait () const; // wait forever - // VFALCO TODO Change wait() to seconds instead of millis - bool wait (int timeOutMilliseconds) const; // DEPRECATED - /** @} */ - - //============================================================================== - /** Wakes up any threads that are currently waiting on this object. - - If signal() is called when nothing is waiting, the next thread to call wait() - will return immediately and reset the signal. - - If the WaitableEvent is manual reset, all current and future threads that wait upon this - object will be woken, until reset() is explicitly called. - - If the WaitableEvent is automatic reset, and one or more threads is waiting upon the object, - then one of them will be woken up. If no threads are currently waiting, then the next thread - to call wait() will be woken up. As soon as a thread is woken, the signal is automatically - reset. - - @see wait, reset - */ - void signal() const; - - //============================================================================== - /** Resets the event to an unsignalled state. - - If it's not already signalled, this does nothing. - */ - void reset() const; - -private: -#if BEAST_WINDOWS - void* handle; -#else - mutable pthread_cond_t condition; - mutable pthread_mutex_t mutex; - mutable bool triggered; - mutable bool manualReset; -#endif -}; - -} - -#endif diff --git a/src/ripple/beast/core/core.unity.cpp b/src/ripple/beast/core/core.unity.cpp index b06129e1e4..6acf2c2bcf 100644 --- a/src/ripple/beast/core/core.unity.cpp +++ b/src/ripple/beast/core/core.unity.cpp @@ -60,7 +60,6 @@ #include #include -#include #ifdef _CRTDBG_MAP_ALLOC #pragma pop_macro("calloc") diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index dd59244299..1ad1e6c383 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -316,7 +315,9 @@ private: std::atomic m_stopped {false}; std::atomic m_childrenStopped {false}; Children m_children; - beast::WaitableEvent m_stoppedEvent; + std::condition_variable m_cv; + std::mutex m_mut; + bool m_is_stopping = false; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index 1f29b4592b..4b6d8ab345 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -63,7 +63,9 @@ bool Stoppable::areChildrenStopped () const void Stoppable::stopped () { - m_stoppedEvent.signal(); + std::lock_guard lk{m_mut}; + m_is_stopping = true; + m_cv.notify_all(); } void Stoppable::onPrepare () @@ -123,17 +125,16 @@ void Stoppable::stopRecursive (beast::Journal j) m_childrenStopped = true; onChildrenStopped (); - // Now block on this Stoppable. + // Now block on this Stoppable until m_is_stopping is set by stopped(). // - bool const timedOut (! m_stoppedEvent.wait (1 * 1000)); // milliseconds - if (timedOut) + using namespace std::chrono_literals; + std::unique_lock lk{m_mut}; + if (!m_cv.wait_for(lk, 1s, [this]{return m_is_stopping;})) { if (auto stream = j.error()) stream << "Waiting for '" << m_name << "' to stop"; - m_stoppedEvent.wait (); + m_cv.wait(lk, [this]{return m_is_stopping;}); } - - // once we get here, we know the stoppable has stopped. m_stopped = true; } diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index 6d6fbf2000..46bc1d38b4 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -32,7 +32,7 @@ Workers::Workers ( : m_callback (callback) , perfLog_ (perfLog) , m_threadNames (threadNames) - , m_allPaused (true, true) + , m_allPaused (true) , m_semaphore (0) , m_numberOfThreads (0) , m_activeCount (0) @@ -111,7 +111,9 @@ void Workers::pauseAllThreadsAndWait () { setNumberOfThreads (0); - m_allPaused.wait (); + std::unique_lock lk{m_mut}; + m_cv.wait(lk, [this]{return m_allPaused;}); + lk.unlock(); assert (numberOfCurrentlyRunningTasks () == 0); } @@ -185,7 +187,10 @@ void Workers::Worker::run () // we are the first one then reset the "all paused" event // if (++m_workers.m_activeCount == 1) - m_workers.m_allPaused.reset (); + { + std::lock_guard lk{m_workers.m_mut}; + m_workers.m_allPaused = false; + } for (;;) { @@ -236,7 +241,11 @@ void Workers::Worker::run () // are the last one then signal the "all paused" event. // if (--m_workers.m_activeCount == 0) - m_workers.m_allPaused.signal (); + { + std::lock_guard lk{m_workers.m_mut}; + m_workers.m_allPaused = true; + m_workers.m_cv.notify_all(); + } // Set inactive thread name. beast::setCurrentThreadName ("(" + threadName_ + ")"); diff --git a/src/ripple/core/impl/Workers.h b/src/ripple/core/impl/Workers.h index 3ea2716797..9721ae9e6e 100644 --- a/src/ripple/core/impl/Workers.h +++ b/src/ripple/core/impl/Workers.h @@ -22,7 +22,6 @@ #include #include -#include #include #include #include @@ -169,7 +168,9 @@ private: Callback& m_callback; perf::PerfLog& perfLog_; std::string m_threadNames; // The name to give each thread - beast::WaitableEvent m_allPaused; // signaled when all threads paused + std::condition_variable m_cv; // signaled when all threads paused + std::mutex m_mut; + bool m_allPaused; semaphore m_semaphore; // each pending task is 1 resource int m_numberOfThreads; // how many we want active now std::atomic m_activeCount; // to know when all are paused diff --git a/src/test/core/Workers_test.cpp b/src/test/core/Workers_test.cpp index 9fe210cbb2..abf6639a97 100644 --- a/src/test/core/Workers_test.cpp +++ b/src/test/core/Workers_test.cpp @@ -23,8 +23,10 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. #include #include #include +#include #include #include +#include #include namespace ripple { @@ -86,20 +88,16 @@ class Workers_test : public beast::unit_test::suite public: struct TestCallback : Workers::Callback { - TestCallback() - : finished(false, false) - , count(0) - { - } - void processTask(int instance) override { + std::lock_guard lk{mut}; if (--count == 0) - finished.signal(); + cv.notify_all(); } - beast::WaitableEvent finished; - std::atomic count; + std::condition_variable cv; + std::mutex mut; + int count = 0; }; void testThreads(int const tc1, int const tc2, int const tc3) @@ -118,10 +116,6 @@ public: { // Prepare the callback. cb.count = threadCount; - if (threadCount == 0) - cb.finished.signal(); - else - cb.finished.reset(); // Execute the test. w.setNumberOfThreads(threadCount); @@ -132,9 +126,11 @@ public: // 10 seconds should be enough to finish on any system // - bool const signaled = cb.finished.wait(10 * 1000); + using namespace std::chrono_literals; + std::unique_lock lk{cb.mut}; + bool const signaled = cb.cv.wait_for(lk, 10s, [&cb]{return cb.count == 0;}); BEAST_EXPECT(signaled); - BEAST_EXPECT(cb.count.load() == 0); + BEAST_EXPECT(cb.count == 0); }; testForThreadCount (tc1); testForThreadCount (tc2); @@ -142,7 +138,7 @@ public: w.pauseAllThreadsAndWait(); // We had better finished all our work! - BEAST_EXPECT(cb.count.load() == 0); + BEAST_EXPECT(cb.count == 0); } void run() override