Convert LoadManager to use std::thread (RIPD-236)

This commit is contained in:
Scott Schurr
2015-09-09 12:19:54 -07:00
committed by Nik Bougalis
parent 570bb2e139
commit caa4ed31de
2 changed files with 204 additions and 181 deletions

View File

@@ -22,192 +22,176 @@
#include <ripple/app/main/Application.h>
#include <ripple/app/misc/NetworkOPs.h>
#include <ripple/basics/UptimeTimer.h>
#include <ripple/core/JobQueue.h>
#include <ripple/core/LoadFeeTrack.h>
#include <ripple/json/to_string.h>
#include <beast/threads/Thread.h>
#include <beast/cxx14/memory.h> // <memory>
#include <mutex>
#include <thread>
namespace ripple {
class LoadManagerImp
: public LoadManager
, public beast::Thread
LoadManager::LoadManager (
Application& app, Stoppable& parent, beast::Journal journal)
: Stoppable ("LoadManager", parent)
, app_ (app)
, journal_ (journal)
, deadLock_ (0)
, armed_ (false)
, stop_ (false)
{
public:
using LockType = std::mutex;
using ScopedLockType = std::lock_guard <LockType>;
UptimeTimer::getInstance ().beginManualUpdates ();
}
Application& app_;
beast::Journal m_journal;
LockType mLock;
bool mArmed;
int mDeadLock; // Detect server deadlocks
//--------------------------------------------------------------------------
LoadManagerImp (Application& app,
Stoppable& parent, beast::Journal journal)
: LoadManager (parent)
, Thread ("loadmgr")
, app_ (app)
, m_journal (journal)
, mArmed (false)
, mDeadLock (0)
{
UptimeTimer::getInstance ().beginManualUpdates ();
}
~LoadManagerImp ()
LoadManager::~LoadManager ()
{
try
{
UptimeTimer::getInstance ().endManualUpdates ();
stopThread ();
onStop ();
}
//--------------------------------------------------------------------------
//
// Stoppable
//
//--------------------------------------------------------------------------
void onPrepare ()
catch (std::exception const& ex)
{
// Swallow the exception in a destructor.
JLOG(journal_.warning) << "std::exception in ~LoadManager. "
<< ex.what();
}
void onStart ()
{
m_journal.debug << "Starting";
startThread ();
}
void onStop ()
{
if (isThreadRunning ())
{
m_journal.debug << "Stopping";
stopThreadAsync ();
}
else
{
stopped ();
}
}
//--------------------------------------------------------------------------
void resetDeadlockDetector ()
{
ScopedLockType sl (mLock);
mDeadLock = UptimeTimer::getInstance ().getElapsedSeconds ();
}
void activateDeadlockDetector ()
{
mArmed = true;
}
void logDeadlock (int dlTime)
{
m_journal.warning << "Server stalled for " << dlTime << " seconds.";
}
// VFALCO NOTE Where's the thread object? It's not a data member...
//
void run ()
{
using clock_type = std::chrono::steady_clock;
// Initialize the clock to the current time.
auto t = clock_type::now();
while (! threadShouldExit ())
{
{
// VFALCO NOTE What is this lock protecting?
ScopedLockType sl (mLock);
// VFALCO NOTE I think this is to reduce calls to the operating system
// for retrieving the current time.
//
// TODO Instead of incrementing can't we just retrieve the current
// time again?
//
// Manually update the timer.
UptimeTimer::getInstance ().incrementElapsedTime ();
// Measure the amount of time we have been deadlocked, in seconds.
//
// VFALCO NOTE mDeadLock is a canary for detecting the condition.
int const timeSpentDeadlocked = UptimeTimer::getInstance ().getElapsedSeconds () - mDeadLock;
// VFALCO NOTE I think that "armed" refers to the deadlock detector
//
int const reportingIntervalSeconds = 10;
if (mArmed && (timeSpentDeadlocked >= reportingIntervalSeconds))
{
// Report the deadlocked condition every 10 seconds
if ((timeSpentDeadlocked % reportingIntervalSeconds) == 0)
{
logDeadlock (timeSpentDeadlocked);
}
// If we go over 500 seconds spent deadlocked, it means that the
// deadlock resolution code has failed, which qualifies as undefined
// behavior.
//
assert (timeSpentDeadlocked < 500);
}
}
bool change;
// VFALCO TODO Eliminate the dependence on the Application object.
// Choices include constructing with the job queue / feetracker.
// Another option is using an observer pattern to invert the dependency.
if (app_.getJobQueue ().isOverloaded ())
{
if (m_journal.info)
m_journal.info << app_.getJobQueue ().getJson (0);
change = app_.getFeeTrack ().raiseLocalFee ();
}
else
{
change = app_.getFeeTrack ().lowerLocalFee ();
}
if (change)
{
// VFALCO TODO replace this with a Listener / observer and subscribe in NetworkOPs or Application
app_.getOPs ().reportFeeChange ();
}
t += std::chrono::seconds (1);
auto const duration = t - clock_type::now();
if ((duration < std::chrono::seconds (0)) || (duration > std::chrono::seconds (1)))
{
m_journal.warning << "time jump";
t = clock_type::now();
}
else
{
std::this_thread::sleep_for (duration);
}
}
stopped ();
}
};
}
//------------------------------------------------------------------------------
LoadManager::LoadManager (Stoppable& parent)
: Stoppable ("LoadManager", parent)
void LoadManager::activateDeadlockDetector ()
{
std::lock_guard<std::mutex> sl (mutex_);
armed_ = true;
}
void LoadManager::resetDeadlockDetector ()
{
auto const elapsedSeconds =
UptimeTimer::getInstance ().getElapsedSeconds ();
std::lock_guard<std::mutex> sl (mutex_);
deadLock_ = elapsedSeconds;
}
//------------------------------------------------------------------------------
void LoadManager::onPrepare ()
{
}
void LoadManager::onStart ()
{
JLOG(journal_.debug) << "Starting";
assert (! thread_.joinable());
thread_ = std::thread {&LoadManager::run, this};
}
void LoadManager::onStop ()
{
if (thread_.joinable())
{
JLOG(journal_.debug) << "Stopping";
{
std::lock_guard<std::mutex> sl (mutex_);
stop_ = true;
}
thread_.join();
}
stopped ();
}
//------------------------------------------------------------------------------
void LoadManager::run ()
{
beast::Thread::setCurrentThreadName ("LoadManager");
using clock_type = std::chrono::steady_clock;
// Initialize the clock to the current time.
auto t = clock_type::now();
bool stop = false;
while (! (stop || isStopping ()))
{
{
// VFALCO NOTE I think this is to reduce calls to the operating
// system for retrieving the current time.
//
// TODO Instead of incrementing can't we just retrieve the
// current time again?
//
// Manually update the timer.
UptimeTimer::getInstance ().incrementElapsedTime ();
// Copy out shared data under a lock. Use copies outside lock.
std::unique_lock<std::mutex> sl (mutex_);
auto const deadLock = deadLock_;
auto const armed = armed_;
stop = stop_;
sl.unlock();
// Measure the amount of time we have been deadlocked, in seconds.
//
// VFALCO NOTE deadLock_ is a canary for detecting the condition.
int const timeSpentDeadlocked =
UptimeTimer::getInstance ().getElapsedSeconds () - deadLock;
// VFALCO NOTE I think that "armed" refers to the deadlock detector.
//
int const reportingIntervalSeconds = 10;
if (armed && (timeSpentDeadlocked >= reportingIntervalSeconds))
{
// Report the deadlocked condition every 10 seconds
if ((timeSpentDeadlocked % reportingIntervalSeconds) == 0)
{
JLOG(journal_.warning)
<< "Server stalled for "
<< timeSpentDeadlocked << " seconds.";
}
// If we go over 500 seconds spent deadlocked, it means that
// the deadlock resolution code has failed, which qualifies
// as undefined behavior.
//
assert (timeSpentDeadlocked < 500);
}
}
bool change = false;
if (app_.getJobQueue ().isOverloaded ())
{
JLOG(journal_.info) << app_.getJobQueue ().getJson (0);
change = app_.getFeeTrack ().raiseLocalFee ();
}
else
{
change = app_.getFeeTrack ().lowerLocalFee ();
}
if (change)
{
// VFALCO TODO replace this with a Listener / observer and
// subscribe in NetworkOPs or Application.
app_.getOPs ().reportFeeChange ();
}
t += std::chrono::seconds (1);
auto const duration = t - clock_type::now();
if ((duration < std::chrono::seconds (0)) ||
(duration > std::chrono::seconds (1)))
{
JLOG(journal_.warning) << "time jump";
t = clock_type::now();
}
else
{
std::this_thread::sleep_for (duration);
}
}
stopped ();
}
//------------------------------------------------------------------------------
@@ -216,7 +200,7 @@ std::unique_ptr<LoadManager>
make_LoadManager (Application& app,
beast::Stoppable& parent, beast::Journal journal)
{
return std::make_unique<LoadManagerImp>(app, parent, journal);
return std::make_unique<LoadManager>(app, parent, journal);
}
} // ripple