From 78245a072c0e58b30ff482e41c375494505d9b32 Mon Sep 17 00:00:00 2001 From: John Freeman Date: Fri, 4 Dec 2020 14:19:54 -0600 Subject: [PATCH] Clean-up the `Stoppable` architecture --- src/ripple/app/ledger/impl/LedgerCleaner.cpp | 5 -- src/ripple/app/main/Application.cpp | 6 -- src/ripple/app/main/LoadManager.cpp | 5 -- src/ripple/app/main/LoadManager.h | 3 - src/ripple/app/misc/NetworkOPs.cpp | 57 +++++++------- src/ripple/app/misc/SHAMapStore.h | 8 +- src/ripple/app/misc/SHAMapStoreImp.cpp | 23 ++---- src/ripple/app/misc/SHAMapStoreImp.h | 13 +--- src/ripple/basics/impl/PerfLogImp.cpp | 4 +- src/ripple/basics/impl/PerfLogImp.h | 6 -- src/ripple/core/Stoppable.h | 45 ++++------- src/ripple/core/impl/Stoppable.cpp | 41 ++++------ src/ripple/core/impl/Workers.cpp | 74 +++++++++---------- .../nodestore/impl/DatabaseShardImp.cpp | 7 +- .../peerfinder/impl/PeerfinderManager.cpp | 5 -- src/test/basics/PerfLog_test.cpp | 1 - src/test/core/Stoppable_test.cpp | 1 - 17 files changed, 106 insertions(+), 198 deletions(-) diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index fad97590e4..d80913ad7e 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -92,11 +92,6 @@ public: // //-------------------------------------------------------------------------- - void - onPrepare() override - { - } - void onStart() override { diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 29af2f8655..89cfdf5b69 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -977,11 +977,6 @@ public: // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -1634,7 +1629,6 @@ void ApplicationImp::doStart(bool withTimers) { startTimers_ = withTimers; - prepare(); start(); } diff --git a/src/ripple/app/main/LoadManager.cpp b/src/ripple/app/main/LoadManager.cpp index 5c481c5c3e..b63972f34e 100644 --- a/src/ripple/app/main/LoadManager.cpp +++ b/src/ripple/app/main/LoadManager.cpp @@ -77,11 +77,6 @@ LoadManager::resetDeadlockDetector() //------------------------------------------------------------------------------ -void -LoadManager::onPrepare() -{ -} - void LoadManager::onStart() { diff --git a/src/ripple/app/main/LoadManager.h b/src/ripple/app/main/LoadManager.h index 4ff3d8b8a8..c5d344bdb4 100644 --- a/src/ripple/app/main/LoadManager.h +++ b/src/ripple/app/main/LoadManager.h @@ -82,9 +82,6 @@ public: //-------------------------------------------------------------------------- // Stoppable members - void - onPrepare() override; - void onStart() override; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 67e9f29241..acc8c31d59 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -607,7 +607,33 @@ public: // Stoppable. void - onStop() override; + onStop() override + { + { + boost::system::error_code ec; + heartbeatTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: heartbeatTimer cancel error: " + << ec.message(); + } + + ec.clear(); + clusterTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: clusterTimer cancel error: " + << ec.message(); + } + } + // Make sure that any waitHandlers pending in our timers are done + // before we declare ourselves stopped. + using namespace std::chrono_literals; + waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); + stopped(); + } private: void @@ -673,7 +699,6 @@ private: ConsensusPhase mLastConsensusPhase; LedgerMaster& m_ledgerMaster; - std::shared_ptr mAcquiringLedger; SubInfoMapType mSubAccount; SubInfoMapType mSubRTAccount; @@ -3516,34 +3541,6 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl) return true; } -void -NetworkOPsImp::onStop() -{ - mAcquiringLedger.reset(); - { - boost::system::error_code ec; - heartbeatTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: heartbeatTimer cancel error: " << ec.message(); - } - - ec.clear(); - clusterTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: clusterTimer cancel error: " << ec.message(); - } - } - // Make sure that any waitHandlers pending in our timers are done - // before we declare ourselves stopped. - using namespace std::chrono_literals; - waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); - stopped(); -} - #ifndef USE_NEW_BOOK_PAGE // NIKB FIXME this should be looked at. There's no reason why this shouldn't diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index 8952453cba..e45cb30b20 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -21,7 +21,6 @@ #define RIPPLE_APP_MISC_SHAMAPSTORE_H_INCLUDED #include -#include #include #include #include @@ -29,17 +28,16 @@ namespace ripple { class TransactionMaster; +class Stoppable; /** * class to create database, launch online delete thread, and * related SQLite database */ -class SHAMapStore : public Stoppable +class SHAMapStore { public: - SHAMapStore(Stoppable& parent) : Stoppable("SHAMapStore", parent) - { - } + virtual ~SHAMapStore() = default; /** Called by LedgerMaster every time a ledger validates. */ virtual void diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 86e96587bf..8d4bca6b48 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -150,7 +150,7 @@ SHAMapStoreImp::SHAMapStoreImp( Stoppable& parent, NodeStore::Scheduler& scheduler, beast::Journal journal) - : SHAMapStore(parent) + : Stoppable("SHAMapStore", parent) , app_(app) , scheduler_(scheduler) , journal_(journal) @@ -731,6 +731,7 @@ SHAMapStoreImp::health() void SHAMapStoreImp::onStop() { + // This is really a check for `if (thread_)`. if (deleteInterval_) { { @@ -738,26 +739,12 @@ SHAMapStoreImp::onStop() stop_ = true; } cond_.notify_one(); + // stopped() will be called by the thread_ running run(), + // when it reaches the check for stop_. } else { - stopped(); - } -} - -void -SHAMapStoreImp::onChildrenStopped() -{ - if (deleteInterval_) - { - { - std::lock_guard lock(mutex_); - stop_ = true; - } - cond_.notify_one(); - } - else - { + // There is no thread running run(), so we must call stopped(). stopped(); } } diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 541d74a38e..062b90dde6 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -33,7 +34,7 @@ namespace ripple { class NetworkOPs; -class SHAMapStoreImp : public SHAMapStore +class SHAMapStoreImp : public Stoppable, public SHAMapStore { private: struct SavedState @@ -117,7 +118,7 @@ private: boost::optional recoveryWaitTime_; // these do not exist upon SHAMapStore creation, but do exist - // as of onPrepare() or before + // as of run() or before NetworkOPs* netOPs_ = nullptr; LedgerMaster* ledgerMaster_ = nullptr; FullBelowCache* fullBelowCache_ = nullptr; @@ -246,14 +247,10 @@ private: // the main "run()". Health health(); + // // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -265,8 +262,6 @@ private: void onStop() override; // Called when all child Stoppable objects have stoped - void - onChildrenStopped() override; }; } // namespace ripple diff --git a/src/ripple/basics/impl/PerfLogImp.cpp b/src/ripple/basics/impl/PerfLogImp.cpp index 8143865784..f3bb986d25 100644 --- a/src/ripple/basics/impl/PerfLogImp.cpp +++ b/src/ripple/basics/impl/PerfLogImp.cpp @@ -477,14 +477,12 @@ PerfLogImp::onStop() } thread_.join(); } - if (areChildrenStopped()) - stopped(); } void PerfLogImp::onChildrenStopped() { - onStop(); + stopped(); } //----------------------------------------------------------------------------- diff --git a/src/ripple/basics/impl/PerfLogImp.h b/src/ripple/basics/impl/PerfLogImp.h index 943829b6e9..8fa430ec16 100644 --- a/src/ripple/basics/impl/PerfLogImp.h +++ b/src/ripple/basics/impl/PerfLogImp.h @@ -210,12 +210,6 @@ public: void rotate() override; - // Stoppable - void - onPrepare() override - { - } - // Called when application is ready to start threads. void onStart() override; diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index cf2395653a..e192b83bea 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -339,6 +339,7 @@ private: std::string m_name; RootStoppable& m_root; Child m_child; + // TODO [C++20]: Use std::atomic_flag instead. std::atomic m_stopped{false}; std::atomic m_childrenStopped{false}; Children m_children; @@ -360,17 +361,9 @@ public: bool isStopping() const; - /** Prepare all contained Stoppable objects. - This calls onPrepare for all Stoppable objects in the tree. - Calls made after the first have no effect. - Thread safety: - May be called from any thread. - */ - void - prepare(); - - /** Start all contained Stoppable objects. - The default implementation does nothing. + /** Prepare and start all contained Stoppable objects. + This calls onPrepare for all Stoppable objects in the tree, bottom-up, + then calls onStart for the same, top-down. Calls made after the first have no effect. Thread safety: May be called from any thread. @@ -381,8 +374,8 @@ public: /** Notify a root stoppable and children to stop, and block until stopped. Has no effect if the stoppable was already notified. This blocks until the stoppable and all of its children have stopped. - Undefined behavior results if stop() is called without a previous call - to start(). + Undefined behavior results if stop() is called without finishing + a previous call to start(). Thread safety: Safe to call from any thread not associated with a Stoppable. */ @@ -393,7 +386,7 @@ public: bool started() const { - return m_started; + return startExited_; } /* JobQueue uses this method for Job counting. */ @@ -411,20 +404,10 @@ public: alertable_sleep_until(std::chrono::system_clock::time_point const& t); private: - /* Notify a root stoppable and children to stop, without waiting. - Has no effect if the stoppable was already notified. - - Returns true on the first call to this method, false otherwise. - - Thread safety: - Safe to call from any thread at any time. - */ - bool - stopAsync(beast::Journal j); - - std::atomic m_prepared{false}; - std::atomic m_started{false}; - std::atomic m_calledStop{false}; + // TODO [C++20]: Use std::atomic_flag instead. + std::atomic startEntered_{false}; + std::atomic startExited_{false}; + std::atomic stopEntered_{false}; std::mutex m_; std::condition_variable c_; JobCounter jobCounter_; @@ -446,9 +429,11 @@ RootStoppable::alertable_sleep_until( std::chrono::system_clock::time_point const& t) { std::unique_lock lock(m_); - if (m_calledStop) + if (stopEntered_) return true; - return c_.wait_until(lock, t, [this] { return m_calledStop.load(); }); + // TODO [C++20]: When `stopEntered_` is changed to a `std::atomic_flag`, + // this call to `load` needs to change to a call to `test`. + return c_.wait_until(lock, t, [this] { return stopEntered_.load(); }); } inline bool diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index f76317351c..5d54c15660 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -174,54 +174,39 @@ RootStoppable::~RootStoppable() bool RootStoppable::isStopping() const { - return m_calledStop; -} - -void -RootStoppable::prepare() -{ - if (m_prepared.exchange(true) == false) - prepareRecursive(); + return stopEntered_; } void RootStoppable::start() { - // Courtesy call to prepare. - if (m_prepared.exchange(true) == false) - prepareRecursive(); - - if (m_started.exchange(true) == false) - startRecursive(); + if (startEntered_.exchange(true)) + return; + prepareRecursive(); + startRecursive(); + startExited_ = true; } void RootStoppable::stop(beast::Journal j) { // Must have a prior call to start() - assert(m_started); + assert(startExited_); - if (stopAsync(j)) - stopRecursive(j); -} - -bool -RootStoppable::stopAsync(beast::Journal j) -{ bool alreadyCalled; { - // Even though m_calledStop is atomic, we change its value under a + // Even though stopEntered_ is atomic, we change its value under a // lock. This removes a small timing window that occurs if the - // waiting thread is handling a spurious wakeup while m_calledStop + // waiting thread is handling a spurious wakeup while stopEntered_ // changes state. std::unique_lock lock(m_); - alreadyCalled = m_calledStop.exchange(true); + alreadyCalled = stopEntered_.exchange(true); } if (alreadyCalled) { if (auto stream = j.warn()) - stream << "Stoppable::stop called again"; - return false; + stream << "RootStoppable::stop called again"; + return; } // Wait until all in-flight JobQueue Jobs are completed. @@ -230,7 +215,7 @@ RootStoppable::stopAsync(beast::Journal j) c_.notify_all(); stopAsyncRecursive(j); - return true; + stopRecursive(j); } } // namespace ripple diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index c08a449ce7..321ca36856 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -63,51 +63,51 @@ void Workers::setNumberOfThreads(int numberOfThreads) { static int instance{0}; - if (m_numberOfThreads != numberOfThreads) + if (m_numberOfThreads == numberOfThreads) + return; + + if (perfLog_) + perfLog_->resizeJobs(numberOfThreads); + + if (numberOfThreads > m_numberOfThreads) { - if (perfLog_) - perfLog_->resizeJobs(numberOfThreads); + // Increasing the number of working threads + int const amount = numberOfThreads - m_numberOfThreads; - if (numberOfThreads > m_numberOfThreads) + for (int i = 0; i < amount; ++i) { - // Increasing the number of working threads - int const amount = numberOfThreads - m_numberOfThreads; + // See if we can reuse a paused worker + Worker* worker = m_paused.pop_front(); - for (int i = 0; i < amount; ++i) + if (worker != nullptr) { - // See if we can reuse a paused worker - Worker* worker = m_paused.pop_front(); - - if (worker != nullptr) - { - // If we got here then the worker thread is at [1] - // This will unblock their call to wait() - // - worker->notify(); - } - else - { - worker = new Worker(*this, m_threadNames, instance++); - m_everyone.push_front(worker); - } + // If we got here then the worker thread is at [1] + // This will unblock their call to wait() + // + worker->notify(); + } + else + { + worker = new Worker(*this, m_threadNames, instance++); + m_everyone.push_front(worker); } } - else - { - // Decreasing the number of working threads - int const amount = m_numberOfThreads - numberOfThreads; - - for (int i = 0; i < amount; ++i) - { - ++m_pauseCount; - - // Pausing a thread counts as one "internal task" - m_semaphore.notify(); - } - } - - m_numberOfThreads = numberOfThreads; } + else + { + // Decreasing the number of working threads + int const amount = m_numberOfThreads - numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + ++m_pauseCount; + + // Pausing a thread counts as one "internal task" + m_semaphore.notify(); + } + } + + m_numberOfThreads = numberOfThreads; } void diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 4dcaa4d658..76ac42551f 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -722,12 +722,7 @@ DatabaseShardImp::onChildrenStopped() } } - if (areChildrenStopped()) - stopped(); - else - { - JLOG(j_.warn()) << " Children failed to stop"; - } + stopped(); } void diff --git a/src/ripple/peerfinder/impl/PeerfinderManager.cpp b/src/ripple/peerfinder/impl/PeerfinderManager.cpp index f5310e8c7d..73383befd9 100644 --- a/src/ripple/peerfinder/impl/PeerfinderManager.cpp +++ b/src/ripple/peerfinder/impl/PeerfinderManager.cpp @@ -227,11 +227,6 @@ public: m_logic.load(); } - void - onStart() override - { - } - void onStop() override { diff --git a/src/test/basics/PerfLog_test.cpp b/src/test/basics/PerfLog_test.cpp index e223047a79..452057ef15 100644 --- a/src/test/basics/PerfLog_test.cpp +++ b/src/test/basics/PerfLog_test.cpp @@ -101,7 +101,6 @@ class PerfLog_test : public beast::unit_test::suite void doStart() { - prepare(); start(); } diff --git a/src/test/core/Stoppable_test.cpp b/src/test/core/Stoppable_test.cpp index b5d8327a73..c538af2a9c 100644 --- a/src/test/core/Stoppable_test.cpp +++ b/src/test/core/Stoppable_test.cpp @@ -476,7 +476,6 @@ class Stoppable_test : public beast::unit_test::suite void run() { - prepare(); start(); stop(journal_); }