Implement Stoppable for LoadManager

This commit is contained in:
Vinnie Falco
2013-09-28 17:40:46 -07:00
parent 58a8a97177
commit 00a714d14d
16 changed files with 424 additions and 451 deletions

View File

@@ -40,6 +40,8 @@ class RPCServiceManagerLog;
template <> char const* LogPartition::getPartitionName <RPCServiceManagerLog> () { return "RPCServiceManager"; }
class HTTPServerLog;
template <> char const* LogPartition::getPartitionName <HTTPServerLog> () { return "RPCServer"; }
class LoadManagerLog;
template <> char const* LogPartition::getPartitionName <LoadManagerLog> () { return "LoadManager"; }
//
//------------------------------------------------------------------------------
@@ -124,7 +126,7 @@ public:
, mFeeVote (IFeeVote::New (10, 50 * SYSTEM_CURRENCY_PARTS, 12.5 * SYSTEM_CURRENCY_PARTS))
, mFeeTrack (ILoadFeeTrack::New ())
, mFeeTrack (LoadFeeTrack::New (LogJournal::get <LoadManagerLog> ()))
, mHashRouter (IHashRouter::New (IHashRouter::getDefaultHoldTime ()))
@@ -132,7 +134,7 @@ public:
, mProofOfWorkFactory (ProofOfWorkFactory::New ())
, m_loadManager (LoadManager::New ())
, m_loadManager (LoadManager::New (*this, LogJournal::get <LoadManagerLog> ()))
, mPeerFinder (PeerFinder::New (*this))
@@ -145,8 +147,6 @@ public:
// VFALCO TODO remove these once the call is thread safe.
HashMaps::getInstance ().initializeNonce <size_t> ();
initValidatorsConfig ();
}
~ApplicationImp ()
@@ -163,28 +163,6 @@ public:
//--------------------------------------------------------------------------
// Initialize the Validators object with Config information.
void initValidatorsConfig ()
{
{
std::vector <std::string> const& strings (getConfig().validators);
if (! strings.empty ())
m_validators->addStrings ("rippled.cfg", strings);
}
if (! getConfig().getValidatorsURL().empty())
{
m_validators->addURL (getConfig().getValidatorsURL());
}
if (getConfig().getValidatorsFile() != File::nonexistent ())
{
m_validators->addFile (getConfig().getValidatorsFile());
}
}
//--------------------------------------------------------------------------
RPC::Manager& getRPCServiceManager()
{
return *m_rpcServiceManager;
@@ -270,7 +248,7 @@ public:
return *mFeatures;
}
ILoadFeeTrack& getFeeTrack ()
LoadFeeTrack& getFeeTrack ()
{
return *mFeeTrack;
}
@@ -388,10 +366,6 @@ public:
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
m_jobQueue->setThreadCount (0, getConfig ().RUN_STANDALONE);
m_sweepTimer.setExpiration (10);
m_loadManager->start ();
#if ! BEAST_WIN32
#ifdef SIGINT
@@ -642,16 +616,54 @@ public:
m_networkOPs->setStandAlone ();
}
}
//--------------------------------------------------------------------------
// Initialize the Validators object with Config information.
void prepareValidators ()
{
{
std::vector <std::string> const& strings (getConfig().validators);
if (! strings.empty ())
m_validators->addStrings ("rippled.cfg", strings);
}
if (! getConfig().getValidatorsURL().empty())
{
m_validators->addURL (getConfig().getValidatorsURL());
}
if (getConfig().getValidatorsFile() != File::nonexistent ())
{
m_validators->addFile (getConfig().getValidatorsFile());
}
}
//--------------------------------------------------------------------------
//
// Stoppable
//
void onPrepare (Journal)
{
prepareValidators ();
}
void onStart (Journal journal)
{
journal.debug << "Application starting";
m_sweepTimer.setExpiration (10);
}
// Called to indicate shutdown.
void onStop (Journal)
void onStop (Journal journal)
{
journal.debug << "Application stopping";
m_sweepTimer.cancel();
// VFALCO TODO get rid of this flag
mShutdown = true;
mValidations->flush ();
@@ -717,7 +729,7 @@ public:
// eliminate LoadManager's dependency inversions.
//
// This deletes the object and therefore, stops the thread.
m_loadManager = nullptr;
//m_loadManager = nullptr;
m_journal.info << "Done.";
@@ -844,7 +856,7 @@ private:
ScopedPointer <Validators::Manager> m_validators;
ScopedPointer <IFeatures> mFeatures;
ScopedPointer <IFeeVote> mFeeVote;
ScopedPointer <ILoadFeeTrack> mFeeTrack;
ScopedPointer <LoadFeeTrack> mFeeTrack;
ScopedPointer <IHashRouter> mHashRouter;
ScopedPointer <Validations> mValidations;
ScopedPointer <ProofOfWorkFactory> mProofOfWorkFactory;

View File

@@ -29,7 +29,7 @@ namespace RPC { class Manager; }
class IFeatures;
class IFeeVote;
class IHashRouter;
class ILoadFeeTrack;
class LoadFeeTrack;
class Peers;
class UniqueNodeList;
class JobQueue;
@@ -98,7 +98,7 @@ public:
virtual IFeatures& getFeatureTable () = 0;
virtual IFeeVote& getFeeVote () = 0;
virtual IHashRouter& getHashRouter () = 0;
virtual ILoadFeeTrack& getFeeTrack () = 0;
virtual LoadFeeTrack& getFeeTrack () = 0;
virtual LoadManager& getLoadManager () = 0;
virtual Peers& getPeers () = 0;
virtual ProofOfWorkFactory& getProofOfWorkFactory () = 0;

View File

@@ -17,16 +17,11 @@
*/
//==============================================================================
SETUP_LOG (LoadManager)
//------------------------------------------------------------------------------
class LoadManagerImp
: public LoadManager
, public Thread
{
private:
public:
/* Entry mapping utilization to cost.
The cost is expressed as a unitless relative quantity. These
@@ -72,11 +67,33 @@ private:
int m_resourceFlags;
};
public:
LoadManagerImp ()
: Thread ("loadmgr")
//--------------------------------------------------------------------------
Journal m_journal;
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
BlackList<UptimeTimerAdapter> mBlackList;
int mCreditRate; // credits gained/lost per second
int mCreditLimit; // the most credits a source can have
int mDebitWarn; // when a source drops below this, we warn
int mDebitLimit; // when a source drops below this, we cut it off (should be negative)
bool mArmed;
int mDeadLock; // Detect server deadlocks
std::vector <Cost> mCosts;
//--------------------------------------------------------------------------
LoadManagerImp (Stoppable& parent, Journal journal)
: LoadManager (parent)
, Thread ("loadmgr")
, m_journal (journal)
, mLock (this, "LoadManagerImp", __FILE__, __LINE__)
, m_logThread ("loadmgr_log")
, mCreditRate (100)
, mCreditLimit (500)
, mDebitWarn (-500)
@@ -85,8 +102,6 @@ public:
, mDeadLock (0)
, mCosts (LT_MAX)
{
m_logThread.start ();
/** Flags indicating the type of load.
Utilization may include any combination of:
@@ -128,7 +143,6 @@ public:
UptimeTimer::getInstance ().beginManualUpdates ();
}
private:
~LoadManagerImp ()
{
UptimeTimer::getInstance ().endManualUpdates ();
@@ -136,11 +150,36 @@ private:
stopThread ();
}
void start ()
//--------------------------------------------------------------------------
//
// Stoppable
//
void onPrepare (Journal)
{
startThread();
}
void onStart (Journal journal)
{
journal.debug << "Starting";
startThread ();
}
void onStop (Journal journal)
{
if (isThreadRunning ())
{
journal.debug << "Stopping";
stopThread (0);
}
else
{
stopped ();
}
}
//--------------------------------------------------------------------------
void canonicalize (LoadSource& source, int now) const
{
if (source.mLastUpdate != now)
@@ -240,17 +279,17 @@ private:
void logWarning (const std::string& source) const
{
if (source.empty ())
WriteLog (lsDEBUG, LoadManager) << "Load warning from empty source";
m_journal.debug << "Load warning from empty source";
else
WriteLog (lsINFO, LoadManager) << "Load warning: " << source;
m_journal.info << "Load warning: " << source;
}
void logDisconnect (const std::string& source) const
{
if (source.empty ())
WriteLog (lsINFO, LoadManager) << "Disconnect for empty source";
m_journal.info << "Disconnect for empty source";
else
WriteLog (lsWARNING, LoadManager) << "Disconnect for: " << source;
m_journal.warning << "Disconnect for: " << source;
}
Json::Value getBlackList (int threshold)
@@ -282,9 +321,9 @@ private:
mArmed = true;
}
static void logDeadlock (int dlTime)
void logDeadlock (int dlTime)
{
WriteLog (lsWARNING, LoadManager) << "Server stalled for " << dlTime << " seconds.";
m_journal.warning << "Server stalled for " << dlTime << " seconds.";
#if RIPPLE_TRACK_MUTEXES
StringArray report;
@@ -298,59 +337,6 @@ private:
#endif
}
private:
// VFALCO TODO These used to be public but are apparently not used. Find out why.
/*
int getCreditRate () const
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
return mCreditRate;
}
int getCreditLimit () const
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
return mCreditLimit;
}
int getDebitWarn () const
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
return mDebitWarn;
}
int getDebitLimit () const
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
return mDebitLimit;
}
void setCreditRate (int r)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mCreditRate = r;
}
void setCreditLimit (int r)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mCreditLimit = r;
}
void setDebitWarn (int r)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mDebitWarn = r;
}
void setDebitLimit (int r)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mDebitLimit = r;
}
*/
private:
void addCost (const Cost& c)
{
mCosts [static_cast <int> (c.getLoadType ())] = c;
@@ -393,9 +379,7 @@ private:
// Report the deadlocked condition every 10 seconds
if ((timeSpentDeadlocked % reportingIntervalSeconds) == 0)
{
// VFALCO TODO Replace this with a dedicated thread with call queue.
//
m_logThread.call (&logDeadlock, timeSpentDeadlocked);
logDeadlock (timeSpentDeadlocked);
}
// If we go over 500 seconds spent deadlocked, it means that the
@@ -413,7 +397,7 @@ private:
// Another option is using an observer pattern to invert the dependency.
if (getApp().getJobQueue ().isOverloaded ())
{
WriteLog (lsINFO, LoadManager) << getApp().getJobQueue ().getJson (0);
m_journal.info << getApp().getJobQueue ().getJson (0);
change = getApp().getFeeTrack ().raiseLocalFee ();
}
else
@@ -432,7 +416,7 @@ private:
if ((when.is_negative ()) || (when.total_seconds () > 1))
{
WriteLog (lsWARNING, LoadManager) << "time jump";
m_journal.warning << "time jump";
t = boost::posix_time::microsec_clock::universal_time ();
}
else
@@ -441,32 +425,18 @@ private:
}
}
}
private:
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
beast::ThreadWithCallQueue m_logThread;
BlackList<UptimeTimerAdapter> mBlackList;
int mCreditRate; // credits gained/lost per second
int mCreditLimit; // the most credits a source can have
int mDebitWarn; // when a source drops below this, we warn
int mDebitLimit; // when a source drops below this, we cut it off (should be negative)
bool mArmed;
int mDeadLock; // Detect server deadlocks
std::vector <Cost> mCosts;
};
//------------------------------------------------------------------------------
LoadManager* LoadManager::New ()
LoadManager::LoadManager (Stoppable& parent)
: Stoppable ("LoadManager", parent)
{
ScopedPointer <LoadManager> object (new LoadManagerImp);
return object.release ();
}
//------------------------------------------------------------------------------
LoadManager* LoadManager::New (Stoppable& parent, Journal journal)
{
return new LoadManagerImp (parent, journal);
}

View File

@@ -34,17 +34,18 @@
@see LoadSource, LoadType
*/
class LoadManager
class LoadManager : public Stoppable
{
protected:
explicit LoadManager (Stoppable& parent);
public:
/** Create a new manager.
The manager thread begins running immediately.
@note The thresholds for warnings and punishments are in
the ctor-initializer
*/
static LoadManager* New ();
static LoadManager* New (Stoppable& parent, Journal journal);
/** Destroy the manager.
@@ -52,15 +53,6 @@ public:
*/
virtual ~LoadManager () { }
/** Start the associated thread.
This is here to prevent the deadlock detector from activating during
a lengthy program initialization.
*/
// VFALCO TODO Simplify the two stage initialization to one stage (construction).
// NOTE In stand-alone mode the load manager thread isn't started
virtual void start () = 0;
/** Turn on deadlock detection.
The deadlock detector begins in a disabled state. After this function