diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index fad97590e..d80913ad7 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -92,11 +92,6 @@ public: // //-------------------------------------------------------------------------- - void - onPrepare() override - { - } - void onStart() override { diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index 29af2f865..89cfdf5b6 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -977,11 +977,6 @@ public: // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -1634,7 +1629,6 @@ void ApplicationImp::doStart(bool withTimers) { startTimers_ = withTimers; - prepare(); start(); } diff --git a/src/ripple/app/main/LoadManager.cpp b/src/ripple/app/main/LoadManager.cpp index 5c481c5c3..b63972f34 100644 --- a/src/ripple/app/main/LoadManager.cpp +++ b/src/ripple/app/main/LoadManager.cpp @@ -77,11 +77,6 @@ LoadManager::resetDeadlockDetector() //------------------------------------------------------------------------------ -void -LoadManager::onPrepare() -{ -} - void LoadManager::onStart() { diff --git a/src/ripple/app/main/LoadManager.h b/src/ripple/app/main/LoadManager.h index 4ff3d8b8a..c5d344bdb 100644 --- a/src/ripple/app/main/LoadManager.h +++ b/src/ripple/app/main/LoadManager.h @@ -82,9 +82,6 @@ public: //-------------------------------------------------------------------------- // Stoppable members - void - onPrepare() override; - void onStart() override; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index 67e9f2924..acc8c31d5 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -607,7 +607,33 @@ public: // Stoppable. void - onStop() override; + onStop() override + { + { + boost::system::error_code ec; + heartbeatTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: heartbeatTimer cancel error: " + << ec.message(); + } + + ec.clear(); + clusterTimer_.cancel(ec); + if (ec) + { + JLOG(m_journal.error()) + << "NetworkOPs: clusterTimer cancel error: " + << ec.message(); + } + } + // Make sure that any waitHandlers pending in our timers are done + // before we declare ourselves stopped. + using namespace std::chrono_literals; + waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); + stopped(); + } private: void @@ -673,7 +699,6 @@ private: ConsensusPhase mLastConsensusPhase; LedgerMaster& m_ledgerMaster; - std::shared_ptr mAcquiringLedger; SubInfoMapType mSubAccount; SubInfoMapType mSubRTAccount; @@ -3516,34 +3541,6 @@ NetworkOPsImp::tryRemoveRpcSub(std::string const& strUrl) return true; } -void -NetworkOPsImp::onStop() -{ - mAcquiringLedger.reset(); - { - boost::system::error_code ec; - heartbeatTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: heartbeatTimer cancel error: " << ec.message(); - } - - ec.clear(); - clusterTimer_.cancel(ec); - if (ec) - { - JLOG(m_journal.error()) - << "NetworkOPs: clusterTimer cancel error: " << ec.message(); - } - } - // Make sure that any waitHandlers pending in our timers are done - // before we declare ourselves stopped. - using namespace std::chrono_literals; - waitHandlerCounter_.join("NetworkOPs", 1s, m_journal); - stopped(); -} - #ifndef USE_NEW_BOOK_PAGE // NIKB FIXME this should be looked at. There's no reason why this shouldn't diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index 8952453cb..e45cb30b2 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -21,7 +21,6 @@ #define RIPPLE_APP_MISC_SHAMAPSTORE_H_INCLUDED #include -#include #include #include #include @@ -29,17 +28,16 @@ namespace ripple { class TransactionMaster; +class Stoppable; /** * class to create database, launch online delete thread, and * related SQLite database */ -class SHAMapStore : public Stoppable +class SHAMapStore { public: - SHAMapStore(Stoppable& parent) : Stoppable("SHAMapStore", parent) - { - } + virtual ~SHAMapStore() = default; /** Called by LedgerMaster every time a ledger validates. */ virtual void diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 86e96587b..8d4bca6b4 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -150,7 +150,7 @@ SHAMapStoreImp::SHAMapStoreImp( Stoppable& parent, NodeStore::Scheduler& scheduler, beast::Journal journal) - : SHAMapStore(parent) + : Stoppable("SHAMapStore", parent) , app_(app) , scheduler_(scheduler) , journal_(journal) @@ -731,6 +731,7 @@ SHAMapStoreImp::health() void SHAMapStoreImp::onStop() { + // This is really a check for `if (thread_)`. if (deleteInterval_) { { @@ -738,26 +739,12 @@ SHAMapStoreImp::onStop() stop_ = true; } cond_.notify_one(); + // stopped() will be called by the thread_ running run(), + // when it reaches the check for stop_. } else { - stopped(); - } -} - -void -SHAMapStoreImp::onChildrenStopped() -{ - if (deleteInterval_) - { - { - std::lock_guard lock(mutex_); - stop_ = true; - } - cond_.notify_one(); - } - else - { + // There is no thread running run(), so we must call stopped(). stopped(); } } diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 541d74a38..062b90dde 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include #include @@ -33,7 +34,7 @@ namespace ripple { class NetworkOPs; -class SHAMapStoreImp : public SHAMapStore +class SHAMapStoreImp : public Stoppable, public SHAMapStore { private: struct SavedState @@ -117,7 +118,7 @@ private: boost::optional recoveryWaitTime_; // these do not exist upon SHAMapStore creation, but do exist - // as of onPrepare() or before + // as of run() or before NetworkOPs* netOPs_ = nullptr; LedgerMaster* ledgerMaster_ = nullptr; FullBelowCache* fullBelowCache_ = nullptr; @@ -246,14 +247,10 @@ private: // the main "run()". Health health(); + // // Stoppable // - void - onPrepare() override - { - } - void onStart() override { @@ -265,8 +262,6 @@ private: void onStop() override; // Called when all child Stoppable objects have stoped - void - onChildrenStopped() override; }; } // namespace ripple diff --git a/src/ripple/basics/impl/PerfLogImp.cpp b/src/ripple/basics/impl/PerfLogImp.cpp index 814386578..f3bb986d2 100644 --- a/src/ripple/basics/impl/PerfLogImp.cpp +++ b/src/ripple/basics/impl/PerfLogImp.cpp @@ -477,14 +477,12 @@ PerfLogImp::onStop() } thread_.join(); } - if (areChildrenStopped()) - stopped(); } void PerfLogImp::onChildrenStopped() { - onStop(); + stopped(); } //----------------------------------------------------------------------------- diff --git a/src/ripple/basics/impl/PerfLogImp.h b/src/ripple/basics/impl/PerfLogImp.h index 943829b6e..8fa430ec1 100644 --- a/src/ripple/basics/impl/PerfLogImp.h +++ b/src/ripple/basics/impl/PerfLogImp.h @@ -210,12 +210,6 @@ public: void rotate() override; - // Stoppable - void - onPrepare() override - { - } - // Called when application is ready to start threads. void onStart() override; diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index cf2395653..e192b83be 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -339,6 +339,7 @@ private: std::string m_name; RootStoppable& m_root; Child m_child; + // TODO [C++20]: Use std::atomic_flag instead. std::atomic m_stopped{false}; std::atomic m_childrenStopped{false}; Children m_children; @@ -360,17 +361,9 @@ public: bool isStopping() const; - /** Prepare all contained Stoppable objects. - This calls onPrepare for all Stoppable objects in the tree. - Calls made after the first have no effect. - Thread safety: - May be called from any thread. - */ - void - prepare(); - - /** Start all contained Stoppable objects. - The default implementation does nothing. + /** Prepare and start all contained Stoppable objects. + This calls onPrepare for all Stoppable objects in the tree, bottom-up, + then calls onStart for the same, top-down. Calls made after the first have no effect. Thread safety: May be called from any thread. @@ -381,8 +374,8 @@ public: /** Notify a root stoppable and children to stop, and block until stopped. Has no effect if the stoppable was already notified. This blocks until the stoppable and all of its children have stopped. - Undefined behavior results if stop() is called without a previous call - to start(). + Undefined behavior results if stop() is called without finishing + a previous call to start(). Thread safety: Safe to call from any thread not associated with a Stoppable. */ @@ -393,7 +386,7 @@ public: bool started() const { - return m_started; + return startExited_; } /* JobQueue uses this method for Job counting. */ @@ -411,20 +404,10 @@ public: alertable_sleep_until(std::chrono::system_clock::time_point const& t); private: - /* Notify a root stoppable and children to stop, without waiting. - Has no effect if the stoppable was already notified. - - Returns true on the first call to this method, false otherwise. - - Thread safety: - Safe to call from any thread at any time. - */ - bool - stopAsync(beast::Journal j); - - std::atomic m_prepared{false}; - std::atomic m_started{false}; - std::atomic m_calledStop{false}; + // TODO [C++20]: Use std::atomic_flag instead. + std::atomic startEntered_{false}; + std::atomic startExited_{false}; + std::atomic stopEntered_{false}; std::mutex m_; std::condition_variable c_; JobCounter jobCounter_; @@ -446,9 +429,11 @@ RootStoppable::alertable_sleep_until( std::chrono::system_clock::time_point const& t) { std::unique_lock lock(m_); - if (m_calledStop) + if (stopEntered_) return true; - return c_.wait_until(lock, t, [this] { return m_calledStop.load(); }); + // TODO [C++20]: When `stopEntered_` is changed to a `std::atomic_flag`, + // this call to `load` needs to change to a call to `test`. + return c_.wait_until(lock, t, [this] { return stopEntered_.load(); }); } inline bool diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index f76317351..5d54c1566 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -174,54 +174,39 @@ RootStoppable::~RootStoppable() bool RootStoppable::isStopping() const { - return m_calledStop; -} - -void -RootStoppable::prepare() -{ - if (m_prepared.exchange(true) == false) - prepareRecursive(); + return stopEntered_; } void RootStoppable::start() { - // Courtesy call to prepare. - if (m_prepared.exchange(true) == false) - prepareRecursive(); - - if (m_started.exchange(true) == false) - startRecursive(); + if (startEntered_.exchange(true)) + return; + prepareRecursive(); + startRecursive(); + startExited_ = true; } void RootStoppable::stop(beast::Journal j) { // Must have a prior call to start() - assert(m_started); + assert(startExited_); - if (stopAsync(j)) - stopRecursive(j); -} - -bool -RootStoppable::stopAsync(beast::Journal j) -{ bool alreadyCalled; { - // Even though m_calledStop is atomic, we change its value under a + // Even though stopEntered_ is atomic, we change its value under a // lock. This removes a small timing window that occurs if the - // waiting thread is handling a spurious wakeup while m_calledStop + // waiting thread is handling a spurious wakeup while stopEntered_ // changes state. std::unique_lock lock(m_); - alreadyCalled = m_calledStop.exchange(true); + alreadyCalled = stopEntered_.exchange(true); } if (alreadyCalled) { if (auto stream = j.warn()) - stream << "Stoppable::stop called again"; - return false; + stream << "RootStoppable::stop called again"; + return; } // Wait until all in-flight JobQueue Jobs are completed. @@ -230,7 +215,7 @@ RootStoppable::stopAsync(beast::Journal j) c_.notify_all(); stopAsyncRecursive(j); - return true; + stopRecursive(j); } } // namespace ripple diff --git a/src/ripple/core/impl/Workers.cpp b/src/ripple/core/impl/Workers.cpp index c08a449ce..321ca3685 100644 --- a/src/ripple/core/impl/Workers.cpp +++ b/src/ripple/core/impl/Workers.cpp @@ -63,51 +63,51 @@ void Workers::setNumberOfThreads(int numberOfThreads) { static int instance{0}; - if (m_numberOfThreads != numberOfThreads) + if (m_numberOfThreads == numberOfThreads) + return; + + if (perfLog_) + perfLog_->resizeJobs(numberOfThreads); + + if (numberOfThreads > m_numberOfThreads) { - if (perfLog_) - perfLog_->resizeJobs(numberOfThreads); + // Increasing the number of working threads + int const amount = numberOfThreads - m_numberOfThreads; - if (numberOfThreads > m_numberOfThreads) + for (int i = 0; i < amount; ++i) { - // Increasing the number of working threads - int const amount = numberOfThreads - m_numberOfThreads; + // See if we can reuse a paused worker + Worker* worker = m_paused.pop_front(); - for (int i = 0; i < amount; ++i) + if (worker != nullptr) { - // See if we can reuse a paused worker - Worker* worker = m_paused.pop_front(); - - if (worker != nullptr) - { - // If we got here then the worker thread is at [1] - // This will unblock their call to wait() - // - worker->notify(); - } - else - { - worker = new Worker(*this, m_threadNames, instance++); - m_everyone.push_front(worker); - } + // If we got here then the worker thread is at [1] + // This will unblock their call to wait() + // + worker->notify(); + } + else + { + worker = new Worker(*this, m_threadNames, instance++); + m_everyone.push_front(worker); } } - else - { - // Decreasing the number of working threads - int const amount = m_numberOfThreads - numberOfThreads; - - for (int i = 0; i < amount; ++i) - { - ++m_pauseCount; - - // Pausing a thread counts as one "internal task" - m_semaphore.notify(); - } - } - - m_numberOfThreads = numberOfThreads; } + else + { + // Decreasing the number of working threads + int const amount = m_numberOfThreads - numberOfThreads; + + for (int i = 0; i < amount; ++i) + { + ++m_pauseCount; + + // Pausing a thread counts as one "internal task" + m_semaphore.notify(); + } + } + + m_numberOfThreads = numberOfThreads; } void diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 4dcaa4d65..76ac42551 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -722,12 +722,7 @@ DatabaseShardImp::onChildrenStopped() } } - if (areChildrenStopped()) - stopped(); - else - { - JLOG(j_.warn()) << " Children failed to stop"; - } + stopped(); } void diff --git a/src/ripple/peerfinder/impl/PeerfinderManager.cpp b/src/ripple/peerfinder/impl/PeerfinderManager.cpp index f5310e8c7..73383befd 100644 --- a/src/ripple/peerfinder/impl/PeerfinderManager.cpp +++ b/src/ripple/peerfinder/impl/PeerfinderManager.cpp @@ -227,11 +227,6 @@ public: m_logic.load(); } - void - onStart() override - { - } - void onStop() override { diff --git a/src/test/basics/PerfLog_test.cpp b/src/test/basics/PerfLog_test.cpp index e223047a7..452057ef1 100644 --- a/src/test/basics/PerfLog_test.cpp +++ b/src/test/basics/PerfLog_test.cpp @@ -101,7 +101,6 @@ class PerfLog_test : public beast::unit_test::suite void doStart() { - prepare(); start(); } diff --git a/src/test/core/Stoppable_test.cpp b/src/test/core/Stoppable_test.cpp index b5d8327a7..c538af2a9 100644 --- a/src/test/core/Stoppable_test.cpp +++ b/src/test/core/Stoppable_test.cpp @@ -476,7 +476,6 @@ class Stoppable_test : public beast::unit_test::suite void run() { - prepare(); start(); stop(journal_); }