Move refactored files into ripple_core

This commit is contained in:
Vinnie Falco
2013-06-23 10:55:34 -07:00
parent 0a358ded7a
commit 3b20bd1b0c
15 changed files with 103 additions and 21 deletions

View File

@@ -351,7 +351,9 @@ static void runIO (boost::asio::io_service& io)
void Application::setup ()
{
mJobQueue.setThreadCount ();
// VFALCO NOTE: 0 means use heuristics to determine the thread count.
mJobQueue.setThreadCount (0, theConfig.RUN_STANDALONE);
mSweepTimer.expires_from_now (boost::posix_time::seconds (10));
mSweepTimer.async_wait (boost::bind (&Application::sweep, this));
mLoadMgr.init ();

View File

@@ -1,185 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
Job::Job ()
: mType (jtINVALID)
, mJobIndex (0)
{
}
Job::Job (JobType type, uint64 index)
: mType (type)
, mJobIndex (index)
{
}
Job::Job (JobType type,
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job)
: mType (type)
, mJobIndex (index)
, mJob (job)
, mName (name)
{
m_loadEvent = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
}
JobType Job::getType () const
{
return mType;
}
void Job::doJob ()
{
m_loadEvent->start ();
mJob (*this);
// VFALCO TODO Isn't there a way to construct the load event with
// the proper name? This way the load event object
// can have the invariant "name is always set"
//
m_loadEvent->reName (mName);
}
void Job::rename (std::string const& newName)
{
mName = newName;
}
const char* Job::toString (JobType t)
{
switch (t)
{
case jtINVALID:
return "invalid";
case jtPACK:
return "makeFetchPack";
case jtPUBOLDLEDGER:
return "publishAcqLedger";
case jtVALIDATION_ut:
return "untrustedValidation";
case jtPROOFWORK:
return "proofOfWork";
case jtPROPOSAL_ut:
return "untrustedProposal";
case jtLEDGER_DATA:
return "ledgerData";
case jtUPDATE_PF:
return "updatePaths";
case jtCLIENT:
return "clientCommand";
case jtTRANSACTION:
return "transaction";
case jtPUBLEDGER:
return "publishNewLedger";
case jtVALIDATION_t:
return "trustedValidation";
case jtWAL:
return "writeAhead";
case jtWRITE:
return "writeObjects";
case jtTRANSACTION_l:
return "localTransaction";
case jtPROPOSAL_t:
return "trustedProposal";
case jtADMIN:
return "administration";
case jtDEATH:
return "jobOfDeath";
case jtPEER:
return "peerCommand";
case jtDISK:
return "diskAccess";
case jtACCEPTLEDGER:
return "acceptLedger";
case jtTXN_PROC:
return "processTransaction";
case jtOB_SETUP:
return "orderBookSetup";
case jtPATH_FIND:
return "pathFind";
case jtHO_READ:
return "nodeRead";
case jtHO_WRITE:
return "nodeWrite";
default:
assert (false);
return "unknown";
}
}
bool Job::operator> (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex > j.mJobIndex;
}
bool Job::operator>= (const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex >= j.mJobIndex;
}
bool Job::operator< (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex < j.mJobIndex;
}
bool Job::operator<= (const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex <= j.mJobIndex;
}

View File

@@ -1,95 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_JOB_H
#define RIPPLE_JOB_H
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{
// must be in priority order, low to high
jtINVALID = -1,
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtUPDATE_PF = 7, // Update pathfinding requests
jtCLIENT = 8, // A websocket command from the client
jtTRANSACTION = 9, // A transaction received from the network
jtPUBLEDGER = 10, // Publish a fully-accepted ledger
jtWAL = 11, // Write-ahead logging
jtVALIDATION_t = 12, // A validation from a trusted source
jtWRITE = 13, // Write out hashed objects
jtTRANSACTION_l = 14, // A local transaction
jtPROPOSAL_t = 15, // A proposal from a trusted source
jtADMIN = 16, // An administrative operation
jtDEATH = 17, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,
jtDISK = 25,
jtACCEPTLEDGER = 26,
jtTXN_PROC = 27,
jtOB_SETUP = 28,
jtPATH_FIND = 29,
jtHO_READ = 30,
jtHO_WRITE = 31,
}; // CAUTION: If you add new types, add them to JobType.cpp too
// VFALCO TODO move this into the enum so it calculates itself?
#define NUM_JOB_TYPES 48 // why 48 and not 32?
class Job
{
public:
/** Default constructor.
Allows Job to be used as a container type.
This is used to allow things like jobMap [key] = value.
*/
// VFALCO NOTE I'd prefer not to have a default constructed object.
// What is the semantic meaning of a Job with no associated
// function? Having the invariant "all Job objects refer to
// a job" would reduce the number of states.
//
Job ();
Job (JobType type, uint64 index);
// VFALCO TODO try to remove the dependency on LoadMonitor.
Job (JobType type,
std::string const& name,
uint64 index,
LoadMonitor& lm,
FUNCTION_TYPE <void (Job&)> const& job);
JobType getType () const;
void doJob ();
void rename (const std::string& n);
// These comparison operators make the jobs sort in priority order in the job set
bool operator< (const Job& j) const;
bool operator> (const Job& j) const;
bool operator<= (const Job& j) const;
bool operator>= (const Job& j) const;
static const char* toString (JobType);
private:
JobType mType;
uint64 mJobIndex;
FUNCTION_TYPE <void (Job&)> mJob;
LoadEvent::pointer m_loadEvent;
std::string mName;
};
#endif

View File

@@ -1,278 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
SETUP_LOG (JobQueue)
JobQueue::JobQueue (boost::asio::io_service& svc)
: mLastJob (0)
, mThreadCount (0)
, mShuttingDown (false)
, mIOService (svc)
{
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
mJobLoads [ jtPROOFWORK ].setTargetLatency (2000, 5000);
mJobLoads [ jtTRANSACTION ].setTargetLatency (250, 1000);
mJobLoads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250);
mJobLoads [ jtPUBLEDGER ].setTargetLatency (3000, 4500);
mJobLoads [ jtWAL ].setTargetLatency (1000, 2500);
mJobLoads [ jtVALIDATION_t ].setTargetLatency (500, 1500);
mJobLoads [ jtWRITE ].setTargetLatency (750, 1500);
mJobLoads [ jtTRANSACTION_l ].setTargetLatency (100, 500);
mJobLoads [ jtPROPOSAL_t ].setTargetLatency (100, 500);
mJobLoads [ jtCLIENT ].setTargetLatency (2000, 5000);
mJobLoads [ jtPEER ].setTargetLatency (200, 1250);
mJobLoads [ jtDISK ].setTargetLatency (500, 1000);
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
}
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
assert (type != jtINVALID);
boost::mutex::scoped_lock sl (mJobLock);
if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
assert (mThreadCount != 0); // do not add jobs to a queue with no threads
mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc));
++mJobCounts[type].first;
mJobCond.notify_one ();
}
int JobQueue::getJobCount (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : c->second.first;
}
int JobQueue::getJobCountTotal (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second);
}
int JobQueue::getJobCountGE (JobType t)
{
// return the number of jobs at this priority level or greater
int ret = 0;
boost::mutex::scoped_lock sl (mJobLock);
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
if (it.first >= t)
ret += it.second.first;
return ret;
}
std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts ()
{
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
boost::mutex::scoped_lock sl (mJobLock);
ret.reserve (mJobCounts.size ());
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
ret.push_back (it);
return ret;
}
Json::Value JobQueue::getJson (int)
{
Json::Value ret (Json::objectValue);
boost::mutex::scoped_lock sl (mJobLock);
ret["threads"] = mThreadCount;
Json::Value priorities = Json::arrayValue;
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
uint64 count, latencyAvg, latencyPeak;
int jobCount, threadCount;
bool isOver;
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
std::map< JobType, std::pair<int, int> >::iterator it = mJobCounts.find (static_cast<JobType> (i));
if (it == mJobCounts.end ())
{
jobCount = 0;
threadCount = 0;
}
else
{
jobCount = it->second.first;
threadCount = it->second.second;
}
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0))
{
Json::Value pri (Json::objectValue);
if (isOver)
pri["over_target"] = true;
pri["job_type"] = Job::toString (static_cast<JobType> (i));
if (jobCount != 0)
pri["waiting"] = jobCount;
if (count != 0)
pri["per_second"] = static_cast<int> (count);
if (latencyPeak != 0)
pri["peak_time"] = static_cast<int> (latencyPeak);
if (latencyAvg != 0)
pri["avg_time"] = static_cast<int> (latencyAvg);
if (threadCount != 0)
pri["in_progress"] = threadCount;
priorities.append (pri);
}
}
ret["job_types"] = priorities;
return ret;
}
int JobQueue::isOverloaded ()
{
int count = 0;
boost::mutex::scoped_lock sl (mJobLock);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (mJobLoads[i].isOver ())
++count;
return count;
}
void JobQueue::shutdown ()
{
// shut down the job queue without completing pending jobs
WriteLog (lsINFO, JobQueue) << "Job queue shutting down";
boost::mutex::scoped_lock sl (mJobLock);
mShuttingDown = true;
mJobCond.notify_all ();
while (mThreadCount != 0)
mJobCond.wait (sl);
}
// set the number of thread serving the job queue to precisely this number
void JobQueue::setThreadCount (int c)
{
if (theConfig.RUN_STANDALONE)
{
c = 1;
}
else if (c == 0)
{
c = boost::thread::hardware_concurrency ();
// VFALCO NOTE According to boost, hardware_concurrency cannot return
// negative numbers/
//
if (c < 0)
c = 2; // VFALCO NOTE Why 2?
if (c > 4) // I/O will bottleneck
c = 4;
c += 2;
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
}
// VFALCO TODO Split the function up. The lower part actually does the "do",
// The part above this comment figures out the value for numThreads
//
boost::mutex::scoped_lock sl (mJobLock);
while (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl);
}
while (mThreadCount < c)
{
++mThreadCount;
boost::thread (BIND_TYPE (&JobQueue::threadEntry, this)).detach ();
}
while (mThreadCount > c)
{
if (mJobCounts[jtDEATH].first != 0)
{
mJobCond.wait (sl);
}
else
{
mJobSet.insert (Job (jtDEATH, 0));
++ (mJobCounts[jtDEATH].first);
}
}
mJobCond.notify_one (); // in case we sucked up someone else's signal
}
// do jobs until asked to stop
void JobQueue::threadEntry ()
{
boost::mutex::scoped_lock sl (mJobLock);
while (1)
{
setCallingThreadName ("waiting");
while (mJobSet.empty () && !mShuttingDown)
{
mJobCond.wait (sl);
}
if (mJobSet.empty ())
break;
JobType type;
std::set<Job>::iterator it = mJobSet.begin ();
{
Job job (*it);
mJobSet.erase (it);
type = job.getType ();
-- (mJobCounts[type].first);
if (type == jtDEATH)
break;
++ (mJobCounts[type].second);
sl.unlock ();
setCallingThreadName (Job::toString (type));
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
job.doJob ();
} // must destroy job without holding lock
sl.lock ();
-- (mJobCounts[type].second);
}
--mThreadCount;
mJobCond.notify_all ();
}
// vim:ts=4

View File

@@ -1,61 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_JOBQUEUE_H
#define RIPPLE_JOBQUEUE_H
class JobQueue
{
public:
explicit JobQueue (boost::asio::io_service&);
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job);
int getJobCount (JobType t); // Jobs waiting at this priority
int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority
int getJobCountGE (JobType t); // All waiting jobs at or greater than this priority
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts (); // jobs waiting, threads doing
void shutdown ();
void setThreadCount (int c = 0);
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar
// since they create the object.
//
LoadEvent::pointer getLoadEvent (JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent> (boost::ref (mJobLoads[t]), name, true);
}
// VFALCO TODO Why do we need two versions, one which returns a shared
// pointer and the other which returns an autoptr?
//
LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name)
{
return LoadEvent::autoptr (new LoadEvent (mJobLoads[t], name, true));
}
int isOverloaded ();
Json::Value getJson (int c = 0);
private:
void threadEntry ();
boost::mutex mJobLock;
boost::condition_variable mJobCond;
uint64 mLastJob;
std::set <Job> mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
int mThreadCount;
bool mShuttingDown;
boost::asio::io_service& mIOService;
std::map<JobType, std::pair<int, int > > mJobCounts;
};
#endif

View File

@@ -1,42 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
LoadEvent::LoadEvent (LoadMonitor& monitor, const std::string& name, bool shouldStart)
: mMonitor (monitor)
, mRunning (false)
, mName (name)
{
mStartTime = boost::posix_time::microsec_clock::universal_time ();
if (shouldStart)
start ();
}
LoadEvent::~LoadEvent ()
{
if (mRunning)
stop ();
}
void LoadEvent::reName (const std::string& name)
{
mName = name;
}
void LoadEvent::start ()
{
mRunning = true;
mStartTime = boost::posix_time::microsec_clock::universal_time ();
}
void LoadEvent::stop ()
{
assert (mRunning);
mRunning = false;
mMonitor.addCountAndLatency (mName,
static_cast<int> ((boost::posix_time::microsec_clock::universal_time () - mStartTime).total_milliseconds ()));
}

View File

@@ -1,52 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_LOADEVENT_H
#define RIPPLE_LOADEVENT_H
class LoadMonitor;
// VFALCO NOTE What is the difference between a LoadEvent and a LoadMonitor?
// VFALCO TODO Rename LoadEvent to LoadMonitor::Event
//
// This looks like a scoped elapsed time measuring class
//
class LoadEvent
{
public:
// VFALCO NOTE Why are these shared pointers? Wouldn't there be a
// piece of lifetime-managed calling code that can simply own
// the object?
//
// Why both kinds of containers?
//
typedef boost::shared_ptr <LoadEvent> pointer;
typedef UPTR_T <LoadEvent> autoptr;
public:
// VFALCO TODO remove the dependency on LoadMonitor. Is that possible?
LoadEvent (LoadMonitor& monitor,
const std::string& name,
bool shouldStart);
~LoadEvent ();
// VFALCO TODO rename this to setName () or setLabel ()
void reName (const std::string& name);
// okay to call if already started
void start ();
void stop ();
private:
LoadMonitor& mMonitor;
bool mRunning;
std::string mName;
boost::posix_time::ptime mStartTime;
};
#endif

View File

@@ -1,156 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
SETUP_LOG (LoadMonitor)
LoadMonitor::LoadMonitor ()
: mCounts (0)
, mLatencyEvents (0)
, mLatencyMSAvg (0)
, mLatencyMSPeak (0)
, mTargetLatencyAvg (0)
, mTargetLatencyPk (0)
, mLastUpdate (UptimeTimer::getInstance ().getElapsedSeconds ())
{
}
// VFALCO NOTE WHY do we need "the mutex?" This dependence on
// a hidden global, especially a synchronization primitive,
// is a flawed design.
// It's not clear exactly which data needs to be protected.
//
// call with the mutex
void LoadMonitor::update ()
{
int now = UptimeTimer::getInstance ().getElapsedSeconds ();
// VFALCO TODO stop returning from the middle of functions.
if (now == mLastUpdate) // current
return;
if ((now < mLastUpdate) || (now > (mLastUpdate + 8)))
{
// way out of date
mCounts = 0;
mLatencyEvents = 0;
mLatencyMSAvg = 0;
mLatencyMSPeak = 0;
mLastUpdate = now;
// VFALCO TODO don't return from the middle...
return;
}
// do exponential decay
do
{
++mLastUpdate;
mCounts -= ((mCounts + 3) / 4);
mLatencyEvents -= ((mLatencyEvents + 3) / 4);
mLatencyMSAvg -= (mLatencyMSAvg / 4);
mLatencyMSPeak -= (mLatencyMSPeak / 4);
}
while (mLastUpdate < now);
}
void LoadMonitor::addCount ()
{
boost::mutex::scoped_lock sl (mLock);
update ();
++mCounts;
}
void LoadMonitor::addLatency (int latency)
{
if (latency == 1)
latency = 0;
boost::mutex::scoped_lock sl (mLock);
update ();
++mLatencyEvents;
mLatencyMSAvg += latency;
mLatencyMSPeak += latency;
int lp = mLatencyEvents * latency * 4;
if (mLatencyMSPeak < lp)
mLatencyMSPeak = lp;
}
void LoadMonitor::addCountAndLatency (const std::string& name, int latency)
{
if (latency > 500)
{
WriteLog ((latency > 1000) ? lsWARNING : lsINFO, LoadMonitor) << "Job: " << name << " ExecutionTime: " << latency;
}
if (latency == 1)
latency = 0;
boost::mutex::scoped_lock sl (mLock);
update ();
++mCounts;
++mLatencyEvents;
mLatencyMSAvg += latency;
mLatencyMSPeak += latency;
int lp = mLatencyEvents * latency * 4;
if (mLatencyMSPeak < lp)
mLatencyMSPeak = lp;
}
void LoadMonitor::setTargetLatency (uint64 avg, uint64 pk)
{
mTargetLatencyAvg = avg;
mTargetLatencyPk = pk;
}
bool LoadMonitor::isOverTarget (uint64 avg, uint64 peak)
{
return (mTargetLatencyPk && (peak > mTargetLatencyPk)) ||
(mTargetLatencyAvg && (avg > mTargetLatencyAvg));
}
bool LoadMonitor::isOver ()
{
boost::mutex::scoped_lock sl (mLock);
update ();
if (mLatencyEvents == 0)
return 0;
return isOverTarget (mLatencyMSAvg / (mLatencyEvents * 4), mLatencyMSPeak / (mLatencyEvents * 4));
}
void LoadMonitor::getCountAndLatency (uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver)
{
boost::mutex::scoped_lock sl (mLock);
update ();
count = mCounts / 4;
if (mLatencyEvents == 0)
{
latencyAvg = 0;
latencyPeak = 0;
}
else
{
latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
}
isOver = isOverTarget (latencyAvg, latencyPeak);
}
// vim:ts=4

View File

@@ -1,47 +0,0 @@
//------------------------------------------------------------------------------
/*
Copyright (c) 2011-2013, OpenCoin, Inc.
*/
//==============================================================================
#ifndef RIPPLE_LOADMONITOR_H
#define RIPPLE_LOADMONITOR_H
// Monitors load levels and response times
// VFALCO TODO Rename this. Having both LoadManager and LoadMonitor is confusing.
//
class LoadMonitor
{
public:
LoadMonitor ();
void addCount ();
void addLatency (int latency);
void addCountAndLatency (const std::string& name, int latency);
void setTargetLatency (uint64 avg, uint64 pk);
bool isOverTarget (uint64 avg, uint64 peak);
// VFALCO TODO make this return the values in a struct.
void getCountAndLatency (uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver);
bool isOver ();
private:
void update ();
boost::mutex mLock;
uint64 mCounts;
uint64 mLatencyEvents;
uint64 mLatencyMSAvg;
uint64 mLatencyMSPeak;
uint64 mTargetLatencyAvg;
uint64 mTargetLatencyPk;
int mLastUpdate;
};
#endif