Replace WaitableEvent with portable std primitives:

The WaitableEvent class was a leftover from the pre-Boost
version of Beast and used Windows- and pthread-specific
APIs.

This refactor replaces that functionality by using only
interfaces provided by the C++ standard, making the code
more portable.

Closes #2402.
This commit is contained in:
Howard Hinnant
2018-10-22 13:51:30 -04:00
committed by Nik Bougalis
parent 5e96da51f9
commit 156e8dae83
11 changed files with 72 additions and 340 deletions

View File

@@ -1312,7 +1312,6 @@ else ()
target_sources (xrpl_core PRIVATE
src/ripple/beast/core/CurrentThreadName.cpp
src/ripple/beast/core/SemanticVersion.cpp
src/ripple/beast/core/WaitableEvent.cpp
src/ripple/beast/hash/impl/xxhash.cpp
src/ripple/beast/insight/impl/Collector.cpp
src/ripple/beast/insight/impl/Groups.cpp

View File

@@ -60,10 +60,12 @@
#include <ripple/beast/core/LexicalCast.h>
#include <boost/asio/steady_timer.hpp>
#include <boost/system/error_code.hpp>
#include <fstream>
#include <sstream>
#include <iostream>
#include <condition_variable>
#include <cstring>
#include <fstream>
#include <iostream>
#include <mutex>
#include <sstream>
namespace ripple {
@@ -368,7 +370,10 @@ public:
std::vector <std::unique_ptr<Stoppable>> websocketServers_;
boost::asio::signal_set m_signals;
beast::WaitableEvent m_stop;
std::condition_variable cv_;
std::mutex mut_;
bool isTimeToStop = false;
std::atomic<bool> checkSigs_;
@@ -1474,7 +1479,10 @@ ApplicationImp::run()
getLoadManager ().activateDeadlockDetector ();
}
m_stop.wait ();
{
std::unique_lock<std::mutex> lk{mut_};
cv_.wait(lk, [this]{return isTimeToStop;});
}
// Stop the server. When this returns, all
// Stoppable objects should be stopped.
@@ -1489,7 +1497,9 @@ ApplicationImp::signalStop()
{
// Unblock the main thread (which is sitting in run()).
//
m_stop.signal();
std::lock_guard<std::mutex> lk{mut_};
isTimeToStop = true;
cv_.notify_all();
}
bool

View File

@@ -21,13 +21,14 @@
#include <ripple/basics/Log.h>
#include <ripple/beast/net/IPAddressConversion.h>
#include <ripple/beast/net/IPEndpoint.h>
#include <ripple/beast/core/WaitableEvent.h>
#include <boost/asio.hpp>
#include <atomic>
#include <cassert>
#include <condition_variable>
#include <deque>
#include <locale>
#include <memory>
#include <mutex>
namespace ripple {
@@ -110,7 +111,10 @@ public:
boost::asio::io_service::strand m_strand;
boost::asio::ip::tcp::resolver m_resolver;
beast::WaitableEvent m_stop_complete;
std::condition_variable m_cv;
std::mutex m_mut;
bool m_asyncHandlersCompleted;
std::atomic <bool> m_stop_called;
std::atomic <bool> m_stopped;
@@ -139,7 +143,7 @@ public:
, m_io_service (io_service)
, m_strand (io_service)
, m_resolver (io_service)
, m_stop_complete (true, true)
, m_asyncHandlersCompleted (true)
, m_stop_called (false)
, m_stopped (true)
{
@@ -156,7 +160,9 @@ public:
// AsyncObject
void asyncHandlersComplete()
{
m_stop_complete.signal ();
std::unique_lock<std::mutex> lk{m_mut};
m_asyncHandlersCompleted = true;
m_cv.notify_all();
}
//--------------------------------------------------------------------------
@@ -172,7 +178,10 @@ public:
if (m_stopped.exchange (false) == true)
{
m_stop_complete.reset ();
{
std::lock_guard<std::mutex> lk{m_mut};
m_asyncHandlersCompleted = false;
}
addReference ();
}
}
@@ -194,7 +203,9 @@ public:
stop_async ();
JLOG(m_journal.debug()) << "Waiting to stop";
m_stop_complete.wait();
std::unique_lock<std::mutex> lk{m_mut};
m_cv.wait(lk, [this]{return m_asyncHandlersCompleted;});
lk.unlock();
JLOG(m_journal.debug()) << "Stopped";
}

View File

@@ -1,171 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <ripple/beast/core/WaitableEvent.h>
#include <cerrno>
#if BEAST_WINDOWS
#include <Windows.h>
#undef check
#undef direct
#undef max
#undef min
#undef TYPE_BOOL
namespace beast {
WaitableEvent::WaitableEvent (const bool manualReset, bool initiallySignaled)
: handle (CreateEvent (0, manualReset ? TRUE : FALSE, initiallySignaled ? TRUE : FALSE, 0))
{
}
WaitableEvent::~WaitableEvent()
{
CloseHandle (handle);
}
void WaitableEvent::signal() const
{
SetEvent (handle);
}
void WaitableEvent::reset() const
{
ResetEvent (handle);
}
bool WaitableEvent::wait () const
{
return WaitForSingleObject (handle, INFINITE) == WAIT_OBJECT_0;
}
bool WaitableEvent::wait (const int timeOutMs) const
{
if (timeOutMs >= 0)
return WaitForSingleObject (handle,
(DWORD) timeOutMs) == WAIT_OBJECT_0;
return wait ();
}
}
#else
#include <sys/time.h>
namespace beast {
WaitableEvent::WaitableEvent (const bool useManualReset, bool initiallySignaled)
: triggered (false), manualReset (useManualReset)
{
pthread_cond_init (&condition, 0);
pthread_mutexattr_t atts;
pthread_mutexattr_init (&atts);
#if ! BEAST_ANDROID
pthread_mutexattr_setprotocol (&atts, PTHREAD_PRIO_INHERIT);
#endif
pthread_mutex_init (&mutex, &atts);
if (initiallySignaled)
signal ();
}
WaitableEvent::~WaitableEvent()
{
pthread_cond_destroy (&condition);
pthread_mutex_destroy (&mutex);
}
bool WaitableEvent::wait () const
{
return wait (-1);
}
bool WaitableEvent::wait (const int timeOutMillisecs) const
{
pthread_mutex_lock (&mutex);
if (! triggered)
{
if (timeOutMillisecs < 0)
{
do
{
pthread_cond_wait (&condition, &mutex);
}
while (! triggered);
}
else
{
struct timeval now;
gettimeofday (&now, 0);
struct timespec time;
time.tv_sec = now.tv_sec + (timeOutMillisecs / 1000);
time.tv_nsec = (now.tv_usec + ((timeOutMillisecs % 1000) * 1000)) * 1000;
if (time.tv_nsec >= 1000000000)
{
time.tv_nsec -= 1000000000;
time.tv_sec++;
}
do
{
if (pthread_cond_timedwait (&condition, &mutex, &time) == ETIMEDOUT)
{
pthread_mutex_unlock (&mutex);
return false;
}
}
while (! triggered);
}
}
if (! manualReset)
triggered = false;
pthread_mutex_unlock (&mutex);
return true;
}
void WaitableEvent::signal() const
{
pthread_mutex_lock (&mutex);
triggered = true;
pthread_cond_broadcast (&condition);
pthread_mutex_unlock (&mutex);
}
void WaitableEvent::reset() const
{
pthread_mutex_lock (&mutex);
triggered = false;
pthread_mutex_unlock (&mutex);
}
}
#endif

View File

@@ -1,124 +0,0 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Portions of this file are from JUCE.
Copyright (c) 2013 - Raw Material Software Ltd.
Please visit http://www.juce.com
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_THREADS_WAITABLEEVENT_H_INCLUDED
#define BEAST_THREADS_WAITABLEEVENT_H_INCLUDED
#include <ripple/beast/core/Config.h>
#if ! BEAST_WINDOWS
#include <pthread.h>
#endif
namespace beast {
/** Allows threads to wait for events triggered by other threads.
A thread can call wait() on a WaitableEvent, and this will suspend the
calling thread until another thread wakes it up by calling the signal()
method.
*/
class WaitableEvent
{
public:
//==============================================================================
/** Creates a WaitableEvent object.
@param manualReset If this is false, the event will be reset automatically when the wait()
method is called. If manualReset is true, then once the event is signalled,
the only way to reset it will be by calling the reset() method.
@param initiallySignaled If this is true then the event will be signaled when
the constructor returns.
*/
explicit WaitableEvent (bool manualReset = false, bool initiallySignaled = false);
/** Destructor.
If other threads are waiting on this object when it gets deleted, this
can cause nasty errors, so be careful!
*/
~WaitableEvent();
WaitableEvent (WaitableEvent const&) = delete;
WaitableEvent& operator= (WaitableEvent const&) = delete;
//==============================================================================
/** Suspends the calling thread until the event has been signalled.
This will wait until the object's signal() method is called by another thread,
or until the timeout expires.
After the event has been signalled, this method will return true and if manualReset
was set to false in the WaitableEvent's constructor, then the event will be reset.
@param timeOutMilliseconds the maximum time to wait, in milliseconds. A negative
value will cause it to wait forever.
@returns true if the object has been signalled, false if the timeout expires first.
@see signal, reset
*/
/** @{ */
bool wait () const; // wait forever
// VFALCO TODO Change wait() to seconds instead of millis
bool wait (int timeOutMilliseconds) const; // DEPRECATED
/** @} */
//==============================================================================
/** Wakes up any threads that are currently waiting on this object.
If signal() is called when nothing is waiting, the next thread to call wait()
will return immediately and reset the signal.
If the WaitableEvent is manual reset, all current and future threads that wait upon this
object will be woken, until reset() is explicitly called.
If the WaitableEvent is automatic reset, and one or more threads is waiting upon the object,
then one of them will be woken up. If no threads are currently waiting, then the next thread
to call wait() will be woken up. As soon as a thread is woken, the signal is automatically
reset.
@see wait, reset
*/
void signal() const;
//==============================================================================
/** Resets the event to an unsignalled state.
If it's not already signalled, this does nothing.
*/
void reset() const;
private:
#if BEAST_WINDOWS
void* handle;
#else
mutable pthread_cond_t condition;
mutable pthread_mutex_t mutex;
mutable bool triggered;
mutable bool manualReset;
#endif
};
}
#endif

View File

@@ -60,7 +60,6 @@
#include <ripple/beast/core/CurrentThreadName.cpp>
#include <ripple/beast/core/SemanticVersion.cpp>
#include <ripple/beast/core/WaitableEvent.cpp>
#ifdef _CRTDBG_MAP_ALLOC
#pragma pop_macro("calloc")

View File

@@ -22,7 +22,6 @@
#include <ripple/beast/core/LockFreeStack.h>
#include <ripple/beast/utility/Journal.h>
#include <ripple/beast/core/WaitableEvent.h>
#include <ripple/core/Job.h>
#include <ripple/core/ClosureCounter.h>
#include <atomic>
@@ -316,7 +315,9 @@ private:
std::atomic<bool> m_stopped {false};
std::atomic<bool> m_childrenStopped {false};
Children m_children;
beast::WaitableEvent m_stoppedEvent;
std::condition_variable m_cv;
std::mutex m_mut;
bool m_is_stopping = false;
};
//------------------------------------------------------------------------------

View File

@@ -63,7 +63,9 @@ bool Stoppable::areChildrenStopped () const
void Stoppable::stopped ()
{
m_stoppedEvent.signal();
std::lock_guard<std::mutex> lk{m_mut};
m_is_stopping = true;
m_cv.notify_all();
}
void Stoppable::onPrepare ()
@@ -123,17 +125,16 @@ void Stoppable::stopRecursive (beast::Journal j)
m_childrenStopped = true;
onChildrenStopped ();
// Now block on this Stoppable.
// Now block on this Stoppable until m_is_stopping is set by stopped().
//
bool const timedOut (! m_stoppedEvent.wait (1 * 1000)); // milliseconds
if (timedOut)
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk{m_mut};
if (!m_cv.wait_for(lk, 1s, [this]{return m_is_stopping;}))
{
if (auto stream = j.error())
stream << "Waiting for '" << m_name << "' to stop";
m_stoppedEvent.wait ();
m_cv.wait(lk, [this]{return m_is_stopping;});
}
// once we get here, we know the stoppable has stopped.
m_stopped = true;
}

View File

@@ -32,7 +32,7 @@ Workers::Workers (
: m_callback (callback)
, perfLog_ (perfLog)
, m_threadNames (threadNames)
, m_allPaused (true, true)
, m_allPaused (true)
, m_semaphore (0)
, m_numberOfThreads (0)
, m_activeCount (0)
@@ -111,7 +111,9 @@ void Workers::pauseAllThreadsAndWait ()
{
setNumberOfThreads (0);
m_allPaused.wait ();
std::unique_lock<std::mutex> lk{m_mut};
m_cv.wait(lk, [this]{return m_allPaused;});
lk.unlock();
assert (numberOfCurrentlyRunningTasks () == 0);
}
@@ -185,7 +187,10 @@ void Workers::Worker::run ()
// we are the first one then reset the "all paused" event
//
if (++m_workers.m_activeCount == 1)
m_workers.m_allPaused.reset ();
{
std::lock_guard<std::mutex> lk{m_workers.m_mut};
m_workers.m_allPaused = false;
}
for (;;)
{
@@ -236,7 +241,11 @@ void Workers::Worker::run ()
// are the last one then signal the "all paused" event.
//
if (--m_workers.m_activeCount == 0)
m_workers.m_allPaused.signal ();
{
std::lock_guard<std::mutex> lk{m_workers.m_mut};
m_workers.m_allPaused = true;
m_workers.m_cv.notify_all();
}
// Set inactive thread name.
beast::setCurrentThreadName ("(" + threadName_ + ")");

View File

@@ -22,7 +22,6 @@
#include <ripple/core/impl/semaphore.h>
#include <ripple/beast/core/LockFreeStack.h>
#include <ripple/beast/core/WaitableEvent.h>
#include <atomic>
#include <condition_variable>
#include <mutex>
@@ -169,7 +168,9 @@ private:
Callback& m_callback;
perf::PerfLog& perfLog_;
std::string m_threadNames; // The name to give each thread
beast::WaitableEvent m_allPaused; // signaled when all threads paused
std::condition_variable m_cv; // signaled when all threads paused
std::mutex m_mut;
bool m_allPaused;
semaphore m_semaphore; // each pending task is 1 resource
int m_numberOfThreads; // how many we want active now
std::atomic <int> m_activeCount; // to know when all are paused

View File

@@ -23,8 +23,10 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
#include <ripple/core/JobTypes.h>
#include <ripple/json/json_value.h>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <memory>
#include <mutex>
#include <string>
namespace ripple {
@@ -86,20 +88,16 @@ class Workers_test : public beast::unit_test::suite
public:
struct TestCallback : Workers::Callback
{
TestCallback()
: finished(false, false)
, count(0)
{
}
void processTask(int instance) override
{
std::lock_guard<std::mutex> lk{mut};
if (--count == 0)
finished.signal();
cv.notify_all();
}
beast::WaitableEvent finished;
std::atomic <int> count;
std::condition_variable cv;
std::mutex mut;
int count = 0;
};
void testThreads(int const tc1, int const tc2, int const tc3)
@@ -118,10 +116,6 @@ public:
{
// Prepare the callback.
cb.count = threadCount;
if (threadCount == 0)
cb.finished.signal();
else
cb.finished.reset();
// Execute the test.
w.setNumberOfThreads(threadCount);
@@ -132,9 +126,11 @@ public:
// 10 seconds should be enough to finish on any system
//
bool const signaled = cb.finished.wait(10 * 1000);
using namespace std::chrono_literals;
std::unique_lock<std::mutex> lk{cb.mut};
bool const signaled = cb.cv.wait_for(lk, 10s, [&cb]{return cb.count == 0;});
BEAST_EXPECT(signaled);
BEAST_EXPECT(cb.count.load() == 0);
BEAST_EXPECT(cb.count == 0);
};
testForThreadCount (tc1);
testForThreadCount (tc2);
@@ -142,7 +138,7 @@ public:
w.pauseAllThreadsAndWait();
// We had better finished all our work!
BEAST_EXPECT(cb.count.load() == 0);
BEAST_EXPECT(cb.count == 0);
}
void run() override