Clean-up the Stoppable architecture

This commit is contained in:
John Freeman
2020-12-04 14:19:54 -06:00
committed by Edward Hennis
parent 1fd1c34112
commit 78245a072c
17 changed files with 106 additions and 198 deletions

View File

@@ -92,11 +92,6 @@ public:
//
//--------------------------------------------------------------------------
void
onPrepare() override
{
}
void
onStart() override
{

View File

@@ -977,11 +977,6 @@ public:
// Stoppable
//
void
onPrepare() override
{
}
void
onStart() override
{
@@ -1634,7 +1629,6 @@ void
ApplicationImp::doStart(bool withTimers)
{
startTimers_ = withTimers;
prepare();
start();
}

View File

@@ -77,11 +77,6 @@ LoadManager::resetDeadlockDetector()
//------------------------------------------------------------------------------
void
LoadManager::onPrepare()
{
}
void
LoadManager::onStart()
{

View File

@@ -82,9 +82,6 @@ public:
//--------------------------------------------------------------------------
// Stoppable members
void
onPrepare() override;
void
onStart() override;

View File

@@ -607,7 +607,33 @@ public:
// Stoppable.
void
onStop() override;
onStop() override
{
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: "
<< ec.message();
}
ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: "
<< ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}
private:
void
@@ -673,7 +699,6 @@ private:
ConsensusPhase mLastConsensusPhase;
LedgerMaster& m_ledgerMaster;
std::shared_ptr<InboundLedger> mAcquiringLedger;
SubInfoMapType mSubAccount;
SubInfoMapType mSubRTAccount;
@@ -3516,34 +3541,6 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl)
return true;
}
void
NetworkOPsImp::onStop()
{
mAcquiringLedger.reset();
{
boost::system::error_code ec;
heartbeatTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: heartbeatTimer cancel error: " << ec.message();
}
ec.clear();
clusterTimer_.cancel(ec);
if (ec)
{
JLOG(m_journal.error())
<< "NetworkOPs: clusterTimer cancel error: " << ec.message();
}
}
// Make sure that any waitHandlers pending in our timers are done
// before we declare ourselves stopped.
using namespace std::chrono_literals;
waitHandlerCounter_.join("NetworkOPs", 1s, m_journal);
stopped();
}
#ifndef USE_NEW_BOOK_PAGE
// NIKB FIXME this should be looked at. There's no reason why this shouldn't

View File

@@ -21,7 +21,6 @@
#define RIPPLE_APP_MISC_SHAMAPSTORE_H_INCLUDED
#include <ripple/app/ledger/Ledger.h>
#include <ripple/core/Stoppable.h>
#include <ripple/nodestore/Manager.h>
#include <ripple/protocol/ErrorCodes.h>
#include <boost/optional.hpp>
@@ -29,17 +28,16 @@
namespace ripple {
class TransactionMaster;
class Stoppable;
/**
* class to create database, launch online delete thread, and
* related SQLite database
*/
class SHAMapStore : public Stoppable
class SHAMapStore
{
public:
SHAMapStore(Stoppable& parent) : Stoppable("SHAMapStore", parent)
{
}
virtual ~SHAMapStore() = default;
/** Called by LedgerMaster every time a ledger validates. */
virtual void

View File

@@ -150,7 +150,7 @@ SHAMapStoreImp::SHAMapStoreImp(
Stoppable& parent,
NodeStore::Scheduler& scheduler,
beast::Journal journal)
: SHAMapStore(parent)
: Stoppable("SHAMapStore", parent)
, app_(app)
, scheduler_(scheduler)
, journal_(journal)
@@ -731,6 +731,7 @@ SHAMapStoreImp::health()
void
SHAMapStoreImp::onStop()
{
// This is really a check for `if (thread_)`.
if (deleteInterval_)
{
{
@@ -738,26 +739,12 @@ SHAMapStoreImp::onStop()
stop_ = true;
}
cond_.notify_one();
// stopped() will be called by the thread_ running run(),
// when it reaches the check for stop_.
}
else
{
stopped();
}
}
void
SHAMapStoreImp::onChildrenStopped()
{
if (deleteInterval_)
{
{
std::lock_guard lock(mutex_);
stop_ = true;
}
cond_.notify_one();
}
else
{
// There is no thread running run(), so we must call stopped().
stopped();
}
}

View File

@@ -23,6 +23,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/misc/SHAMapStore.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/Stoppable.h>
#include <ripple/nodestore/DatabaseRotating.h>
#include <atomic>
#include <chrono>
@@ -33,7 +34,7 @@ namespace ripple {
class NetworkOPs;
class SHAMapStoreImp : public SHAMapStore
class SHAMapStoreImp : public Stoppable, public SHAMapStore
{
private:
struct SavedState
@@ -117,7 +118,7 @@ private:
boost::optional<std::chrono::seconds> recoveryWaitTime_;
// these do not exist upon SHAMapStore creation, but do exist
// as of onPrepare() or before
// as of run() or before
NetworkOPs* netOPs_ = nullptr;
LedgerMaster* ledgerMaster_ = nullptr;
FullBelowCache* fullBelowCache_ = nullptr;
@@ -246,14 +247,10 @@ private:
// the main "run()".
Health
health();
//
// Stoppable
//
void
onPrepare() override
{
}
void
onStart() override
{
@@ -265,8 +262,6 @@ private:
void
onStop() override;
// Called when all child Stoppable objects have stoped
void
onChildrenStopped() override;
};
} // namespace ripple

View File

@@ -477,14 +477,12 @@ PerfLogImp::onStop()
}
thread_.join();
}
if (areChildrenStopped())
stopped();
}
void
PerfLogImp::onChildrenStopped()
{
onStop();
stopped();
}
//-----------------------------------------------------------------------------

View File

@@ -210,12 +210,6 @@ public:
void
rotate() override;
// Stoppable
void
onPrepare() override
{
}
// Called when application is ready to start threads.
void
onStart() override;

View File

@@ -339,6 +339,7 @@ private:
std::string m_name;
RootStoppable& m_root;
Child m_child;
// TODO [C++20]: Use std::atomic_flag instead.
std::atomic<bool> m_stopped{false};
std::atomic<bool> m_childrenStopped{false};
Children m_children;
@@ -360,17 +361,9 @@ public:
bool
isStopping() const;
/** Prepare all contained Stoppable objects.
This calls onPrepare for all Stoppable objects in the tree.
Calls made after the first have no effect.
Thread safety:
May be called from any thread.
*/
void
prepare();
/** Start all contained Stoppable objects.
The default implementation does nothing.
/** Prepare and start all contained Stoppable objects.
This calls onPrepare for all Stoppable objects in the tree, bottom-up,
then calls onStart for the same, top-down.
Calls made after the first have no effect.
Thread safety:
May be called from any thread.
@@ -381,8 +374,8 @@ public:
/** Notify a root stoppable and children to stop, and block until stopped.
Has no effect if the stoppable was already notified.
This blocks until the stoppable and all of its children have stopped.
Undefined behavior results if stop() is called without a previous call
to start().
Undefined behavior results if stop() is called without finishing
a previous call to start().
Thread safety:
Safe to call from any thread not associated with a Stoppable.
*/
@@ -393,7 +386,7 @@ public:
bool
started() const
{
return m_started;
return startExited_;
}
/* JobQueue uses this method for Job counting. */
@@ -411,20 +404,10 @@ public:
alertable_sleep_until(std::chrono::system_clock::time_point const& t);
private:
/* Notify a root stoppable and children to stop, without waiting.
Has no effect if the stoppable was already notified.
Returns true on the first call to this method, false otherwise.
Thread safety:
Safe to call from any thread at any time.
*/
bool
stopAsync(beast::Journal j);
std::atomic<bool> m_prepared{false};
std::atomic<bool> m_started{false};
std::atomic<bool> m_calledStop{false};
// TODO [C++20]: Use std::atomic_flag instead.
std::atomic<bool> startEntered_{false};
std::atomic<bool> startExited_{false};
std::atomic<bool> stopEntered_{false};
std::mutex m_;
std::condition_variable c_;
JobCounter jobCounter_;
@@ -446,9 +429,11 @@ RootStoppable::alertable_sleep_until(
std::chrono::system_clock::time_point const& t)
{
std::unique_lock<std::mutex> lock(m_);
if (m_calledStop)
if (stopEntered_)
return true;
return c_.wait_until(lock, t, [this] { return m_calledStop.load(); });
// TODO [C++20]: When `stopEntered_` is changed to a `std::atomic_flag`,
// this call to `load` needs to change to a call to `test`.
return c_.wait_until(lock, t, [this] { return stopEntered_.load(); });
}
inline bool

View File

@@ -174,54 +174,39 @@ RootStoppable::~RootStoppable()
bool
RootStoppable::isStopping() const
{
return m_calledStop;
}
void
RootStoppable::prepare()
{
if (m_prepared.exchange(true) == false)
prepareRecursive();
return stopEntered_;
}
void
RootStoppable::start()
{
// Courtesy call to prepare.
if (m_prepared.exchange(true) == false)
prepareRecursive();
if (m_started.exchange(true) == false)
startRecursive();
if (startEntered_.exchange(true))
return;
prepareRecursive();
startRecursive();
startExited_ = true;
}
void
RootStoppable::stop(beast::Journal j)
{
// Must have a prior call to start()
assert(m_started);
assert(startExited_);
if (stopAsync(j))
stopRecursive(j);
}
bool
RootStoppable::stopAsync(beast::Journal j)
{
bool alreadyCalled;
{
// Even though m_calledStop is atomic, we change its value under a
// Even though stopEntered_ is atomic, we change its value under a
// lock. This removes a small timing window that occurs if the
// waiting thread is handling a spurious wakeup while m_calledStop
// waiting thread is handling a spurious wakeup while stopEntered_
// changes state.
std::unique_lock<std::mutex> lock(m_);
alreadyCalled = m_calledStop.exchange(true);
alreadyCalled = stopEntered_.exchange(true);
}
if (alreadyCalled)
{
if (auto stream = j.warn())
stream << "Stoppable::stop called again";
return false;
stream << "RootStoppable::stop called again";
return;
}
// Wait until all in-flight JobQueue Jobs are completed.
@@ -230,7 +215,7 @@ RootStoppable::stopAsync(beast::Journal j)
c_.notify_all();
stopAsyncRecursive(j);
return true;
stopRecursive(j);
}
} // namespace ripple

View File

@@ -63,51 +63,51 @@ void
Workers::setNumberOfThreads(int numberOfThreads)
{
static int instance{0};
if (m_numberOfThreads != numberOfThreads)
if (m_numberOfThreads == numberOfThreads)
return;
if (perfLog_)
perfLog_->resizeJobs(numberOfThreads);
if (numberOfThreads > m_numberOfThreads)
{
if (perfLog_)
perfLog_->resizeJobs(numberOfThreads);
// Increasing the number of working threads
int const amount = numberOfThreads - m_numberOfThreads;
if (numberOfThreads > m_numberOfThreads)
for (int i = 0; i < amount; ++i)
{
// Increasing the number of working threads
int const amount = numberOfThreads - m_numberOfThreads;
// See if we can reuse a paused worker
Worker* worker = m_paused.pop_front();
for (int i = 0; i < amount; ++i)
if (worker != nullptr)
{
// See if we can reuse a paused worker
Worker* worker = m_paused.pop_front();
if (worker != nullptr)
{
// If we got here then the worker thread is at [1]
// This will unblock their call to wait()
//
worker->notify();
}
else
{
worker = new Worker(*this, m_threadNames, instance++);
m_everyone.push_front(worker);
}
// If we got here then the worker thread is at [1]
// This will unblock their call to wait()
//
worker->notify();
}
else
{
worker = new Worker(*this, m_threadNames, instance++);
m_everyone.push_front(worker);
}
}
else
{
// Decreasing the number of working threads
int const amount = m_numberOfThreads - numberOfThreads;
for (int i = 0; i < amount; ++i)
{
++m_pauseCount;
// Pausing a thread counts as one "internal task"
m_semaphore.notify();
}
}
m_numberOfThreads = numberOfThreads;
}
else
{
// Decreasing the number of working threads
int const amount = m_numberOfThreads - numberOfThreads;
for (int i = 0; i < amount; ++i)
{
++m_pauseCount;
// Pausing a thread counts as one "internal task"
m_semaphore.notify();
}
}
m_numberOfThreads = numberOfThreads;
}
void

View File

@@ -722,12 +722,7 @@ DatabaseShardImp::onChildrenStopped()
}
}
if (areChildrenStopped())
stopped();
else
{
JLOG(j_.warn()) << " Children failed to stop";
}
stopped();
}
void

View File

@@ -227,11 +227,6 @@ public:
m_logic.load();
}
void
onStart() override
{
}
void
onStop() override
{

View File

@@ -101,7 +101,6 @@ class PerfLog_test : public beast::unit_test::suite
void
doStart()
{
prepare();
start();
}

View File

@@ -476,7 +476,6 @@ class Stoppable_test : public beast::unit_test::suite
void
run()
{
prepare();
start();
stop(journal_);
}