Use SharedData in JobQueue and set latency, limits for new job types

This commit is contained in:
Vinnie Falco
2013-08-24 10:09:35 -07:00
parent 2bea9a8739
commit 21485ec003
2 changed files with 105 additions and 68 deletions

View File

@@ -4,6 +4,10 @@
*/
//==============================================================================
SETUP_LOG (JobQueue)
//------------------------------------------------------------------------------
JobQueue::Count::Count () noexcept
: type (jtINVALID)
, waiting (0)
@@ -22,19 +26,26 @@ JobQueue::Count::Count (JobType type_) noexcept
//------------------------------------------------------------------------------
SETUP_LOG (JobQueue)
JobQueue::State::State ()
: lastJob (0)
{
}
//------------------------------------------------------------------------------
JobQueue::JobQueue ()
: mLock (this, "JobQueue", __FILE__, __LINE__)
, m_workers (*this, "JobQueue", 0)
, mLastJob (0)
: m_workers (*this, "JobQueue", 0)
{
// Initialize the job counts.
// The 'limit' field in particular will be set based on the limit
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
JobType const type (static_cast <JobType> (i));
mJobCounts [type] = Count (type);
SharedState::WriteAccess state (m_state);
// Initialize the job counts.
// The 'limit' field in particular will be set based on the limit
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
JobType const type (static_cast <JobType> (i));
state->jobCounts [type] = Count (type);
}
}
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
@@ -53,6 +64,9 @@ JobQueue::JobQueue ()
mJobLoads [ jtPEER ].setTargetLatency (200, 2500);
mJobLoads [ jtDISK ].setTargetLatency (500, 1000);
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
mJobLoads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds
mJobLoads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second
}
JobQueue::~JobQueue ()
@@ -61,38 +75,40 @@ JobQueue::~JobQueue ()
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
assert (type != jtINVALID);
ScopedLockType lock (mLock, __FILE__, __LINE__);
bassert (type != jtINVALID);
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
SharedState::WriteAccess state (m_state);
std::pair< std::set <Job>::iterator, bool > it =
mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc));
state->jobSet.insert (Job (
type, name, ++state->lastJob, mJobLoads[type], jobFunc));
it.first->peekEvent().start(); // start timing how long it stays in the queue
// start timing how long it stays in the queue
it.first->peekEvent().start();
queueJob (*it.first, lock);
queueJob (*it.first, state);
}
int JobQueue::getJobCount (JobType t)
{
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::ReadAccess state (m_state);
JobCounts::const_iterator c = mJobCounts.find (t);
JobCounts::const_iterator c = state->jobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : c->second.waiting;
return (c == state->jobCounts.end ()) ? 0 : c->second.waiting;
}
int JobQueue::getJobCountTotal (JobType t)
{
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::ReadAccess state (m_state);
JobCounts::const_iterator c = mJobCounts.find (t);
JobCounts::const_iterator c = state->jobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
return (c == state->jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
}
int JobQueue::getJobCountGE (JobType t)
@@ -100,11 +116,11 @@ int JobQueue::getJobCountGE (JobType t)
// return the number of jobs at this priority level or greater
int ret = 0;
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::ReadAccess state (m_state);
typedef JobCounts::value_type jt_int_pair;
BOOST_FOREACH (jt_int_pair const& it, mJobCounts)
BOOST_FOREACH (jt_int_pair const& it, state->jobCounts)
{
if (it.first >= t)
ret += it.second.waiting;
@@ -118,13 +134,13 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::ReadAccess state (m_state);
ret.reserve (mJobCounts.size ());
ret.reserve (state->jobCounts.size ());
typedef JobCounts::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
BOOST_FOREACH (const jt_int_pair & it, state->jobCounts)
{
ret.push_back (std::make_pair (it.second.type,
std::make_pair (it.second.waiting, it.second.running)));
@@ -137,24 +153,31 @@ Json::Value JobQueue::getJson (int)
{
Json::Value ret (Json::objectValue);
ScopedLockType lock (mLock, __FILE__, __LINE__);
ret["threads"] = m_workers.getNumberOfThreads ();
Json::Value priorities = Json::arrayValue;
SharedState::ReadAccess state (m_state);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
if (static_cast<JobType>(i) == jtGENERIC)
JobType const type (static_cast <JobType> (i));
if (type == jtGENERIC)
continue;
uint64 count, latencyAvg, latencyPeak;
int jobCount, threadCount;
uint64 count;
uint64 latencyAvg;
uint64 latencyPeak;
int jobCount;
int threadCount;
bool isOver;
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
JobCounts::const_iterator it = mJobCounts.find (static_cast<JobType> (i));
if (it == mJobCounts.end ())
mJobLoads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
JobCounts::const_iterator it = state->jobCounts.find (type);
if (it == state->jobCounts.end ())
{
jobCount = 0;
threadCount = 0;
@@ -172,7 +195,7 @@ Json::Value JobQueue::getJson (int)
if (isOver)
pri["over_target"] = true;
pri["job_type"] = Job::toString (static_cast<JobType> (i));
pri["job_type"] = Job::toString (type);
if (jobCount != 0)
pri["waiting"] = jobCount;
@@ -202,8 +225,6 @@ bool JobQueue::isOverloaded ()
{
int count = 0;
ScopedLockType lock (mLock, __FILE__, __LINE__);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (mJobLoads[i].isOver ())
++count;
@@ -263,15 +284,14 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::queueJob (Job const& job, ScopedLockType& lock)
void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state)
{
JobType const type (job.getType ());
check_precondition (type != jtINVALID);
bassert (type != jtINVALID);
bassert (state->jobSet.find (job) != state->jobSet.end ());
check_precondition (mJobSet.find (job) != mJobSet.end ());
Count& count (mJobCounts [type]);
Count& count (state->jobCounts [type]);
if (count.waiting + count.running < getJobLimit (type))
{
@@ -306,14 +326,14 @@ void JobQueue::queueJob (Job const& job, ScopedLockType& lock)
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::getNextJob (Job& job, ScopedLockType&)
void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state)
{
check_precondition (! mJobSet.empty ());
bassert (! state->jobSet.empty ());
JobSet::const_iterator iter;
for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter)
for (iter = state->jobSet.begin (); iter != state->jobSet.end (); ++iter)
{
Count& count (mJobCounts [iter->getType ()]);
Count& count (state->jobCounts [iter->getType ()]);
bassert (count.running <= getJobLimit (count.type));
@@ -325,17 +345,17 @@ void JobQueue::getNextJob (Job& job, ScopedLockType&)
}
}
check_precondition (iter != mJobSet.end ());
bassert (iter != state->jobSet.end ());
JobType const type = iter->getType ();
Count& count (mJobCounts [type]);
Count& count (state->jobCounts [type]);
check_postcondition (type != jtINVALID);
bassert (type != jtINVALID);
job = *iter;
mJobSet.erase (iter);
state->jobSet.erase (iter);
--count.waiting;
++count.running;
@@ -361,14 +381,14 @@ void JobQueue::finishJob (Job const& job)
JobType const type = job.getType ();
{
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::WriteAccess state (m_state);
check_precondition (mJobSet.find (job) == mJobSet.end ());
check_precondition (type != jtINVALID);
bassert (state->jobSet.find (job) == state->jobSet.end ());
bassert (type != jtINVALID);
Count& count (mJobCounts [type]);
Count& count (state->jobCounts [type]);
// Queue a deferred task
// Queue a deferred task if possible
if (count.deferred > 0)
{
bassert (count.running + count.waiting >= getJobLimit (type));
@@ -399,9 +419,9 @@ void JobQueue::processTask ()
Job job;
{
ScopedLockType lock (mLock, __FILE__, __LINE__);
SharedState::WriteAccess state (m_state);
getNextJob (job, lock);
getNextJob (job, state);
}
JobType const type (job.getType ());
@@ -442,10 +462,14 @@ int JobQueue::getJobLimit (JobType type)
case jtHO_READ:
case jtHO_WRITE:
case jtGENERIC:
default:
limit = 0;
break;
default:
// Someone added a JobType but forgot to set a limit.
// Did they also forget to add it to ripple_Job.cpp?
bassertfalse;
case jtVALIDATION_ut:
case jtPROOFWORK:
case jtTRANSACTION_l:
@@ -468,6 +492,15 @@ int JobQueue::getJobLimit (JobType type)
case jtPACK: limit = 1; break;
case jtPUBOLDLEDGER: limit = 2; break;
case jtTXN_DATA: limit = 1; break;
// If either of the next two are processing so slowly
// or we are so busy we have two of them at once, it
// indicates a serious problem!
//
case jtNETOP_TIMER:
case jtNETOP_CLUSTER:
limit = 1;
break;
};
return limit;

View File

@@ -68,26 +68,30 @@ public:
Json::Value getJson (int c = 0);
private:
typedef RippleMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
typedef std::set <Job> JobSet;
void queueJob (Job const& job, ScopedLockType&);
void getNextJob (Job& job, ScopedLockType&);
void finishJob (Job const& job);
struct State
{
State ();
uint64 lastJob;
JobSet jobSet;
JobCounts jobCounts;
};
typedef SharedData <State> SharedState;
void queueJob (Job const& job, SharedState::WriteAccess& state);
void getNextJob (Job& job, SharedState::WriteAccess& state);
void finishJob (Job const& job);
void processTask ();
static int getJobLimit (JobType type);
private:
SharedState m_state;
Workers m_workers;
uint64 mLastJob;
JobSet mJobSet;
LoadMonitor mJobLoads [NUM_JOB_TYPES];
JobCounts mJobCounts;
};
#endif