Use std::thread in LedgerCleaner (RIPD-236)

This commit is contained in:
Scott Schurr
2015-10-26 09:46:49 -07:00
committed by Nik Bougalis
parent 7bb4ff901e
commit fa05ded88e

View File

@@ -18,19 +18,12 @@
//============================================================================== //==============================================================================
#include <BeastConfig.h> #include <BeastConfig.h>
#include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/impl/LedgerCleaner.h> #include <ripple/app/ledger/impl/LedgerCleaner.h>
#include <ripple/app/main/Application.h> #include <ripple/app/ledger/InboundLedgers.h>
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/core/LoadFeeTrack.h> #include <ripple/core/LoadFeeTrack.h>
#include <ripple/protocol/JsonFields.h> #include <ripple/protocol/JsonFields.h>
#include <ripple/protocol/Protocol.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <beast/threads/Thread.h> #include <beast/threads/Thread.h>
#include <memory>
#include <mutex>
#include <thread>
namespace ripple { namespace ripple {
@@ -48,14 +41,23 @@ Cleans up the ledger. Specifically, resolves these issues:
*/ */
class LedgerCleanerImp class LedgerCleanerImp : public LedgerCleaner
: public LedgerCleaner
, public beast::Thread
{ {
public:
Application& app_; Application& app_;
beast::Journal m_journal; beast::Journal j_;
std::mutex lock_; mutable std::mutex mutex_;
mutable std::condition_variable wakeup_;
std::thread thread_;
enum class State : char {
readyToClean = 0,
startCleaning,
cleaning
};
State state_ = State::readyToClean;
bool shouldExit_ = false;
// The lowest ledger in the range we're checking. // The lowest ledger in the range we're checking.
LedgerIndex minRange_ = 0; LedgerIndex minRange_ = 0;
@@ -73,21 +75,21 @@ public:
int failures_ = 0; int failures_ = 0;
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
public:
LedgerCleanerImp ( LedgerCleanerImp (
Application& app, Application& app,
Stoppable& stoppable, Stoppable& stoppable,
beast::Journal journal) beast::Journal journal)
: LedgerCleaner (stoppable) : LedgerCleaner (stoppable)
, Thread ("LedgerCleaner")
, app_ (app) , app_ (app)
, m_journal (journal) , j_ (journal)
{ {
} }
~LedgerCleanerImp () ~LedgerCleanerImp () override
{ {
stopThread (); if (thread_.joinable())
LogicError ("LedgerCleanerImp::onStop not called.");
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -96,20 +98,24 @@ public:
// //
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void onPrepare () void onPrepare () override
{ {
} }
void onStart () void onStart () override
{ {
startThread(); thread_ = std::thread {&LedgerCleanerImp::run, this};
} }
void onStop () void onStop () override
{ {
m_journal.info << "Stopping"; JLOG (j_.info) << "Stopping";
signalThreadShouldExit(); {
notify(); std::lock_guard<std::mutex> lock (mutex_);
shouldExit_ = true;
wakeup_.notify_one();
}
thread_.join();
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -118,9 +124,9 @@ public:
// //
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void onWrite (beast::PropertyStream::Map& map) void onWrite (beast::PropertyStream::Map& map) override
{ {
std::lock_guard<std::mutex> _(lock_); std::lock_guard<std::mutex> lock (mutex_);
if (maxRange_ == 0) if (maxRange_ == 0)
map["status"] = "idle"; map["status"] = "idle";
@@ -142,14 +148,14 @@ public:
// //
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
void doClean (Json::Value const& params) void doClean (Json::Value const& params) override
{ {
LedgerIndex minRange; LedgerIndex minRange;
LedgerIndex maxRange; LedgerIndex maxRange;
app_.getLedgerMaster().getFullValidatedRange (minRange, maxRange); app_.getLedgerMaster().getFullValidatedRange (minRange, maxRange);
{ {
std::lock_guard<std::mutex> _(lock_); std::lock_guard<std::mutex> lock (mutex_);
maxRange_ = maxRange; maxRange_ = maxRange;
minRange_ = minRange; minRange_ = minRange;
@@ -213,9 +219,13 @@ public:
if (params.isMember(jss::stop) && params[jss::stop].asBool()) if (params.isMember(jss::stop) && params[jss::stop].asBool())
minRange_ = maxRange_ = 0; minRange_ = maxRange_ = 0;
}
notify(); if (state_ == State::readyToClean)
{
state_ = State::startCleaning;
wakeup_.notify_one();
}
}
} }
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
@@ -223,25 +233,35 @@ public:
// LedgerCleanerImp // LedgerCleanerImp
// //
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
private:
void init () void init ()
{ {
m_journal.debug << "Initializing"; JLOG (j_.debug) << "Initializing";
} }
void run () void run ()
{ {
m_journal.debug << "Started"; beast::Thread::setCurrentThreadName ("LedgerCleaner");
JLOG (j_.debug) << "Started";
init (); init();
while (! this->threadShouldExit()) while (true)
{ {
this->wait ();
if (! this->threadShouldExit())
{ {
doLedgerCleaner(); std::unique_lock<std::mutex> lock (mutex_);
wakeup_.wait(lock, [this]()
{
return (
shouldExit_ ||
state_ == State::startCleaning);
});
if (shouldExit_)
break;
state_ = State::cleaning;
} }
doLedgerCleaner();
} }
stopped(); stopped();
@@ -253,11 +273,11 @@ public:
boost::optional<LedgerHash> hash; boost::optional<LedgerHash> hash;
try try
{ {
hash = hashOfSeq(*ledger, index, m_journal); hash = hashOfSeq(*ledger, index, j_);
} }
catch (SHAMapMissingNode &) catch (SHAMapMissingNode &)
{ {
m_journal.warning << JLOG (j_.warning) <<
"Node missing from ledger " << ledger->info().seq; "Node missing from ledger " << ledger->info().seq;
app_.getInboundLedgers().acquire ( app_.getInboundLedgers().acquire (
ledger->getHash(), ledger->info().seq, ledger->getHash(), ledger->info().seq,
@@ -284,7 +304,7 @@ public:
ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
if (!nodeLedger) if (!nodeLedger)
{ {
m_journal.debug << "Ledger " << ledgerIndex << " not available"; JLOG (j_.debug) << "Ledger " << ledgerIndex << " not available";
app_.getLedgerMaster().clearLedger (ledgerIndex); app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire( app_.getInboundLedgers().acquire(
ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
@@ -297,21 +317,21 @@ public:
(dbLedger->info().parentHash != nodeLedger->info().parentHash)) (dbLedger->info().parentHash != nodeLedger->info().parentHash))
{ {
// Ideally we'd also check for more than one ledger with that index // Ideally we'd also check for more than one ledger with that index
m_journal.debug << JLOG (j_.debug) <<
"Ledger " << ledgerIndex << " mismatches SQL DB"; "Ledger " << ledgerIndex << " mismatches SQL DB";
doTxns = true; doTxns = true;
} }
if(! app_.getLedgerMaster().fixIndex(ledgerIndex, ledgerHash)) if(! app_.getLedgerMaster().fixIndex(ledgerIndex, ledgerHash))
{ {
m_journal.debug << "ledger " << ledgerIndex JLOG (j_.debug) << "ledger " << ledgerIndex
<< " had wrong entry in history"; << " had wrong entry in history";
doTxns = true; doTxns = true;
} }
if (doNodes && !nodeLedger->walkLedger(app_.journal ("Ledger"))) if (doNodes && !nodeLedger->walkLedger(app_.journal ("Ledger")))
{ {
m_journal.debug << "Ledger " << ledgerIndex << " is missing nodes"; JLOG (j_.debug) << "Ledger " << ledgerIndex << " is missing nodes";
app_.getLedgerMaster().clearLedger (ledgerIndex); app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire( app_.getInboundLedgers().acquire(
ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
@@ -320,7 +340,7 @@ public:
if (doTxns && !pendSaveValidated(app_, nodeLedger, true, false)) if (doTxns && !pendSaveValidated(app_, nodeLedger, true, false))
{ {
m_journal.debug << "Failed to save ledger " << ledgerIndex; JLOG (j_.debug) << "Failed to save ledger " << ledgerIndex;
return false; return false;
} }
@@ -343,7 +363,7 @@ public:
referenceLedger = app_.getLedgerMaster().getValidatedLedger(); referenceLedger = app_.getLedgerMaster().getValidatedLedger();
if (!referenceLedger) if (!referenceLedger)
{ {
m_journal.warning << "No validated ledger"; JLOG (j_.warning) << "No validated ledger";
return ledgerHash; // Nothing we can do. No validated ledger. return ledgerHash; // Nothing we can do. No validated ledger.
} }
} }
@@ -376,7 +396,7 @@ public:
} }
} }
else else
m_journal.warning << "Validated ledger is prior to target ledger"; JLOG (j_.warning) << "Validated ledger is prior to target ledger";
return ledgerHash; return ledgerHash;
} }
@@ -384,9 +404,15 @@ public:
/** Run the ledger cleaner. */ /** Run the ledger cleaner. */
void doLedgerCleaner() void doLedgerCleaner()
{ {
auto shouldExit = [this]()
{
std::lock_guard<std::mutex> lock(mutex_);
return shouldExit_;
};
Ledger::pointer goodLedger; Ledger::pointer goodLedger;
while (! this->threadShouldExit()) while (! shouldExit())
{ {
LedgerIndex ledgerIndex; LedgerIndex ledgerIndex;
LedgerHash ledgerHash; LedgerHash ledgerHash;
@@ -395,18 +421,19 @@ public:
while (app_.getFeeTrack().isLoadedLocal()) while (app_.getFeeTrack().isLoadedLocal())
{ {
m_journal.debug << "Waiting for load to subside"; JLOG (j_.debug) << "Waiting for load to subside";
std::this_thread::sleep_for(std::chrono::seconds(5)); std::this_thread::sleep_for(std::chrono::seconds(5));
if (this->threadShouldExit ()) if (shouldExit())
return; return;
} }
{ {
std::lock_guard<std::mutex> _(lock_); std::lock_guard<std::mutex> lock (mutex_);
if ((minRange_ > maxRange_) || if ((minRange_ > maxRange_) ||
(maxRange_ == 0) || (minRange_ == 0)) (maxRange_ == 0) || (minRange_ == 0))
{ {
minRange_ = maxRange_ = 0; minRange_ = maxRange_ = 0;
state_ = State::readyToClean;
return; return;
} }
ledgerIndex = maxRange_; ledgerIndex = maxRange_;
@@ -419,20 +446,20 @@ public:
bool fail = false; bool fail = false;
if (ledgerHash.isZero()) if (ledgerHash.isZero())
{ {
m_journal.info << "Unable to get hash for ledger " JLOG (j_.info) << "Unable to get hash for ledger "
<< ledgerIndex; << ledgerIndex;
fail = true; fail = true;
} }
else if (!doLedger(ledgerIndex, ledgerHash, doNodes, doTxns)) else if (!doLedger(ledgerIndex, ledgerHash, doNodes, doTxns))
{ {
m_journal.info << "Failed to process ledger " << ledgerIndex; JLOG (j_.info) << "Failed to process ledger " << ledgerIndex;
fail = true; fail = true;
} }
if (fail) if (fail)
{ {
{ {
std::lock_guard<std::mutex> _(lock_); std::lock_guard<std::mutex> lock (mutex_);
++failures_; ++failures_;
} }
// Wait for acquiring to catch up to us // Wait for acquiring to catch up to us
@@ -441,7 +468,7 @@ public:
else else
{ {
{ {
std::lock_guard<std::mutex> _(lock_); std::lock_guard<std::mutex> lock (mutex_);
if (ledgerIndex == minRange_) if (ledgerIndex == minRange_)
++minRange_; ++minRange_;
if (ledgerIndex == maxRange_) if (ledgerIndex == maxRange_)