Refactor JobQueue to fix a bad state and clean up job limits

This commit is contained in:
Vinnie Falco
2013-08-12 10:00:54 -07:00
committed by JoelKatz
parent 72315bffe3
commit 4a949a9cbd
7 changed files with 253 additions and 107 deletions

View File

@@ -36,44 +36,37 @@ JobQueue::~JobQueue ()
}
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
addLimitJob (type, name, 0, jobFunc);
}
void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{
assert (type != jtINVALID);
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
std::pair< std::set <Job>::iterator, bool > it =
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc));
it.first->peekEvent().start(); // start timing how long it stays in the queue
++mJobCounts[type].first;
m_workers.addTask ();
queueJobForRunning (*it.first, lock);
}
int JobQueue::getJobCount (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
JobCounts::iterator c = mJobCounts.find (t);
JobCounts::const_iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : c->second.first;
}
int JobQueue::getJobCountTotal (JobType t)
{
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
JobCounts::iterator c = mJobCounts.find (t);
JobCounts::const_iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second);
}
@@ -83,7 +76,7 @@ int JobQueue::getJobCountGE (JobType t)
// return the number of jobs at this priority level or greater
int ret = 0;
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
typedef JobCounts::value_type jt_int_pair;
@@ -101,7 +94,7 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
ret.reserve (mJobCounts.size ());
@@ -119,7 +112,7 @@ Json::Value JobQueue::getJson (int)
{
Json::Value ret (Json::objectValue);
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
ret["threads"] = m_workers.getNumberOfThreads ();
@@ -134,7 +127,7 @@ Json::Value JobQueue::getJson (int)
int jobCount, threadCount;
bool isOver;
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
JobCounts::iterator it = mJobCounts.find (static_cast<JobType> (i));
JobCounts::const_iterator it = mJobCounts.find (static_cast<JobType> (i));
if (it == mJobCounts.end ())
{
@@ -184,7 +177,7 @@ bool JobQueue::isOverloaded ()
{
int count = 0;
boost::mutex::scoped_lock sl (mJobLock);
ScopedLockType lock (mJobLock);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (mJobLoads[i].isOver ())
@@ -229,82 +222,244 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
m_workers.setNumberOfThreads (c);
}
bool JobQueue::getJob(Job& job)
//------------------------------------------------------------------------------
//
// Determines the number of free task slots for the given JobType.
//
// This can return a negative number.
//
// Pre-conditions:
// <none>
//
// Post-conditions:
// <none>
//
// Invariants:
// The calling thread owns the JobLock
//
int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&)
{
bool gotJob = false;
if (! mJobSet.empty ())
int const limit = getJobLimit (type);
if (limit != 0)
{
std::set<Job>::iterator it = mJobSet.begin ();
int waiting = mJobCounts [type].first;
int running = mJobCounts [type].second;
for (;;)
{
// Are we out of jobs?
if (it == mJobSet.end())
return false; // VFALCO TODO get rid of this return from the middle
// Does this job have no limit?
if (it->getLimit() == 0)
break;
// Is this job category below the limit?
if (mJobCounts[it->getType()].second < it->getLimit())
break;
// Try the next job, if any
++it;
}
job = *it;
mJobSet.erase (it);
gotJob = true;
return (limit - running) - waiting;
}
return gotJob;
// The actual number doesn't matter as long as its positive
return 1;
}
//------------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
// Pre-conditions:
// The JobType must be valid.
// The Job must exist in mJobSet.
// The Job must not have previously been queued.
//
// Post-conditions:
// Count of waiting jobs of that type will be incremented.
// If JobQueue exists, and has at least one thread, Job will eventually run.
//
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock)
{
JobType const type (job.getType ());
check_precondition (type != jtINVALID);
check_precondition (mJobSet.find (job) != mJobSet.end ());
if (freeTaskSlots (type, lock) > 0)
{
m_workers.addTask ();
}
else
{
// Do nothing.
// When the next job of this type finishes, it will
// call addTask if there are more waiting than the limit
}
// This has to happen after the call to freeTaskSlots
++mJobCounts [type].first;
}
//------------------------------------------------------------------------------
//
// Returns the next Job we should run now.
//
// RunnableJob:
// A Job with no limit for its JobType, or
// The number of running Jobs for its JobType is below the limit.
//
// Pre-conditions:
// mJobSet must not be empty.
// mJobSet holds at least one RunnableJob
//
// Post-conditions:
// job is a valid Job object.
// job is removed from mJobQueue.
// Waiting job count of it's type is decremented
// Running job count of it's type is incremented
//
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&)
{
check_precondition (! mJobSet.empty ());
JobSet::const_iterator iter;
for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter)
{
// Check the requirements for RunnableJob
if (getJobLimit (iter->getType ()) <= 0 ||
(mJobCounts [iter->getType ()].second < getJobLimit (iter->getType ())))
{
break;
}
}
check_precondition (iter != mJobSet.end ());
JobType const type = iter->getType ();
check_postcondition (type != JobType::jtINVALID);
job = *iter;
mJobSet.erase (iter);
--(mJobCounts [type].first);
++(mJobCounts [type].second);
}
//------------------------------------------------------------------------------
//
// Indicates that a running Job has completed its task.
//
// Pre-conditions:
// Job must not exist in mJobSet.
// The JobType must not be invalid.
//
// Post-conditions:
// The running count of that JobType is decremented
// A new task is signaled if there are more waiting Jobs than the limit, if any.
//
// Invariants:
// <none>
//
void JobQueue::setRunningJobFinished (Job const& job)
{
JobType const type = job.getType ();
ScopedLockType lock (mJobLock);
check_precondition (mJobSet.find (job) == mJobSet.end ());
check_precondition (type != JobType::jtINVALID);
// If there were previously no free slots and we would
// free one up, and there are any other jobs of this type
// waiting, then go ahead and signal a task
//
if (freeTaskSlots (type, lock) == 0 && mJobCounts [type].first > 0)
{
m_workers.addTask ();
}
--(mJobCounts [type].second);
}
//------------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
// Pre-conditions:
// A RunnableJob must exist in the JobSet
//
// Post-conditions:
// The chosen RunnableJob will have Job::doJob() called.
//
// Invariants:
// <none>
//
void JobQueue::processTask ()
{
boost::mutex::scoped_lock lock (mJobLock);
Job job;
while (1)
{
// This lock shouldn't be needed
getNextJobToRun (job, ScopedLockType (mJobLock));
JobType type (jtINVALID);
JobType const type (job.getType ());
{
Job job;
String const name (Job::toString (type));
bool const haveJob = getJob (job);
Thread::setCurrentThreadName (name);
if (haveJob)
{
type = job.getType ();
WriteLog (lsTRACE, JobQueue) << "Doing " << name << " job";
// VFALCO TODO Replace with Atomic <>
--(mJobCounts[type].first);
++(mJobCounts[type].second);
job.doJob ();
lock.unlock ();
setRunningJobFinished (job);
Thread::setCurrentThreadName (Job::toString (type));
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
job.doJob ();
}
else
return;
// must destroy job, here, without holding lock
}
if (type != jtINVALID)
{
lock.lock ();
-- (mJobCounts[type].second);
}
}
// Note that when Job::~Job is called, the last reference
// to the associated LoadEvent object (in the Job) may be destroyed.
}
//------------------------------------------------------------------------------
// Returns the limit of running jobs for the given job type.
// A value of zero means no limit.
//
int JobQueue::getJobLimit (JobType type)
{
int limit = 0;
switch (type)
{
// These are not dispatched by JobQueue
case jtPEER:
case jtDISK:
case jtACCEPTLEDGER:
case jtTXN_PROC:
case jtOB_SETUP:
case jtPATH_FIND:
case jtHO_READ:
case jtHO_WRITE:
case jtGENERIC:
default:
bassertfalse;
limit = 0;
break;
case jtVALIDATION_ut:
case jtPROOFWORK:
case jtTRANSACTION_l:
case jtPROPOSAL_ut:
case jtUPDATE_PF:
case jtCLIENT:
case jtTRANSACTION:
case jtPUBLEDGER:
case jtWAL:
case jtVALIDATION_t:
case jtWRITE:
case jtPROPOSAL_t:
case jtSWEEP:
case jtADMIN:
limit = 0;
break;
case jtLEDGER_DATA: limit = 2; break;
case jtPACK: limit = 1; break;
case jtPUBOLDLEDGER: limit = 2; break;
case jtTXN_DATA: limit = 1; break;
};
return limit;
}