From fa05ded88ea3d98ff925a16e45d76d646caa0fc9 Mon Sep 17 00:00:00 2001 From: Scott Schurr Date: Mon, 26 Oct 2015 09:46:49 -0700 Subject: [PATCH] Use std::thread in LedgerCleaner (RIPD-236) --- src/ripple/app/ledger/impl/LedgerCleaner.cpp | 143 +++++++++++-------- 1 file changed, 85 insertions(+), 58 deletions(-) diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index 17c7965f1..158c198c4 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -18,19 +18,12 @@ //============================================================================== #include -#include -#include -#include #include -#include +#include +#include #include #include -#include -#include #include -#include -#include -#include namespace ripple { @@ -48,14 +41,23 @@ Cleans up the ledger. Specifically, resolves these issues: */ -class LedgerCleanerImp - : public LedgerCleaner - , public beast::Thread +class LedgerCleanerImp : public LedgerCleaner { -public: Application& app_; - beast::Journal m_journal; - std::mutex lock_; + beast::Journal j_; + mutable std::mutex mutex_; + + mutable std::condition_variable wakeup_; + + std::thread thread_; + + enum class State : char { + readyToClean = 0, + startCleaning, + cleaning + }; + State state_ = State::readyToClean; + bool shouldExit_ = false; // The lowest ledger in the range we're checking. LedgerIndex minRange_ = 0; @@ -73,21 +75,21 @@ public: int failures_ = 0; //-------------------------------------------------------------------------- - +public: LedgerCleanerImp ( Application& app, Stoppable& stoppable, beast::Journal journal) : LedgerCleaner (stoppable) - , Thread ("LedgerCleaner") , app_ (app) - , m_journal (journal) + , j_ (journal) { } - ~LedgerCleanerImp () + ~LedgerCleanerImp () override { - stopThread (); + if (thread_.joinable()) + LogicError ("LedgerCleanerImp::onStop not called."); } //-------------------------------------------------------------------------- @@ -96,20 +98,24 @@ public: // //-------------------------------------------------------------------------- - void onPrepare () + void onPrepare () override { } - void onStart () + void onStart () override { - startThread(); + thread_ = std::thread {&LedgerCleanerImp::run, this}; } - void onStop () + void onStop () override { - m_journal.info << "Stopping"; - signalThreadShouldExit(); - notify(); + JLOG (j_.info) << "Stopping"; + { + std::lock_guard lock (mutex_); + shouldExit_ = true; + wakeup_.notify_one(); + } + thread_.join(); } //-------------------------------------------------------------------------- @@ -118,9 +124,9 @@ public: // //-------------------------------------------------------------------------- - void onWrite (beast::PropertyStream::Map& map) + void onWrite (beast::PropertyStream::Map& map) override { - std::lock_guard _(lock_); + std::lock_guard lock (mutex_); if (maxRange_ == 0) map["status"] = "idle"; @@ -142,14 +148,14 @@ public: // //-------------------------------------------------------------------------- - void doClean (Json::Value const& params) + void doClean (Json::Value const& params) override { LedgerIndex minRange; LedgerIndex maxRange; app_.getLedgerMaster().getFullValidatedRange (minRange, maxRange); { - std::lock_guard _(lock_); + std::lock_guard lock (mutex_); maxRange_ = maxRange; minRange_ = minRange; @@ -213,9 +219,13 @@ public: if (params.isMember(jss::stop) && params[jss::stop].asBool()) minRange_ = maxRange_ = 0; - } - notify(); + if (state_ == State::readyToClean) + { + state_ = State::startCleaning; + wakeup_.notify_one(); + } + } } //-------------------------------------------------------------------------- @@ -223,25 +233,35 @@ public: // LedgerCleanerImp // //-------------------------------------------------------------------------- - +private: void init () { - m_journal.debug << "Initializing"; + JLOG (j_.debug) << "Initializing"; } void run () { - m_journal.debug << "Started"; + beast::Thread::setCurrentThreadName ("LedgerCleaner"); + JLOG (j_.debug) << "Started"; - init (); + init(); - while (! this->threadShouldExit()) + while (true) { - this->wait (); - if (! this->threadShouldExit()) { - doLedgerCleaner(); + std::unique_lock lock (mutex_); + wakeup_.wait(lock, [this]() + { + return ( + shouldExit_ || + state_ == State::startCleaning); + }); + if (shouldExit_) + break; + + state_ = State::cleaning; } + doLedgerCleaner(); } stopped(); @@ -253,11 +273,11 @@ public: boost::optional hash; try { - hash = hashOfSeq(*ledger, index, m_journal); + hash = hashOfSeq(*ledger, index, j_); } catch (SHAMapMissingNode &) { - m_journal.warning << + JLOG (j_.warning) << "Node missing from ledger " << ledger->info().seq; app_.getInboundLedgers().acquire ( ledger->getHash(), ledger->info().seq, @@ -284,7 +304,7 @@ public: ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); if (!nodeLedger) { - m_journal.debug << "Ledger " << ledgerIndex << " not available"; + JLOG (j_.debug) << "Ledger " << ledgerIndex << " not available"; app_.getLedgerMaster().clearLedger (ledgerIndex); app_.getInboundLedgers().acquire( ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); @@ -297,21 +317,21 @@ public: (dbLedger->info().parentHash != nodeLedger->info().parentHash)) { // Ideally we'd also check for more than one ledger with that index - m_journal.debug << + JLOG (j_.debug) << "Ledger " << ledgerIndex << " mismatches SQL DB"; doTxns = true; } if(! app_.getLedgerMaster().fixIndex(ledgerIndex, ledgerHash)) { - m_journal.debug << "ledger " << ledgerIndex + JLOG (j_.debug) << "ledger " << ledgerIndex << " had wrong entry in history"; doTxns = true; } if (doNodes && !nodeLedger->walkLedger(app_.journal ("Ledger"))) { - m_journal.debug << "Ledger " << ledgerIndex << " is missing nodes"; + JLOG (j_.debug) << "Ledger " << ledgerIndex << " is missing nodes"; app_.getLedgerMaster().clearLedger (ledgerIndex); app_.getInboundLedgers().acquire( ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); @@ -320,7 +340,7 @@ public: if (doTxns && !pendSaveValidated(app_, nodeLedger, true, false)) { - m_journal.debug << "Failed to save ledger " << ledgerIndex; + JLOG (j_.debug) << "Failed to save ledger " << ledgerIndex; return false; } @@ -343,7 +363,7 @@ public: referenceLedger = app_.getLedgerMaster().getValidatedLedger(); if (!referenceLedger) { - m_journal.warning << "No validated ledger"; + JLOG (j_.warning) << "No validated ledger"; return ledgerHash; // Nothing we can do. No validated ledger. } } @@ -376,7 +396,7 @@ public: } } else - m_journal.warning << "Validated ledger is prior to target ledger"; + JLOG (j_.warning) << "Validated ledger is prior to target ledger"; return ledgerHash; } @@ -384,9 +404,15 @@ public: /** Run the ledger cleaner. */ void doLedgerCleaner() { + auto shouldExit = [this]() + { + std::lock_guard lock(mutex_); + return shouldExit_; + }; + Ledger::pointer goodLedger; - while (! this->threadShouldExit()) + while (! shouldExit()) { LedgerIndex ledgerIndex; LedgerHash ledgerHash; @@ -395,18 +421,19 @@ public: while (app_.getFeeTrack().isLoadedLocal()) { - m_journal.debug << "Waiting for load to subside"; + JLOG (j_.debug) << "Waiting for load to subside"; std::this_thread::sleep_for(std::chrono::seconds(5)); - if (this->threadShouldExit ()) + if (shouldExit()) return; } { - std::lock_guard _(lock_); + std::lock_guard lock (mutex_); if ((minRange_ > maxRange_) || (maxRange_ == 0) || (minRange_ == 0)) { minRange_ = maxRange_ = 0; + state_ = State::readyToClean; return; } ledgerIndex = maxRange_; @@ -419,20 +446,20 @@ public: bool fail = false; if (ledgerHash.isZero()) { - m_journal.info << "Unable to get hash for ledger " + JLOG (j_.info) << "Unable to get hash for ledger " << ledgerIndex; fail = true; } else if (!doLedger(ledgerIndex, ledgerHash, doNodes, doTxns)) { - m_journal.info << "Failed to process ledger " << ledgerIndex; + JLOG (j_.info) << "Failed to process ledger " << ledgerIndex; fail = true; } if (fail) { { - std::lock_guard _(lock_); + std::lock_guard lock (mutex_); ++failures_; } // Wait for acquiring to catch up to us @@ -441,7 +468,7 @@ public: else { { - std::lock_guard _(lock_); + std::lock_guard lock (mutex_); if (ledgerIndex == minRange_) ++minRange_; if (ledgerIndex == maxRange_)