Fix signal logic in JobQueue

This commit is contained in:
Vinnie Falco
2013-08-12 17:38:26 -07:00
committed by JoelKatz
parent aef92835f7
commit 824b1dab20
2 changed files with 106 additions and 80 deletions

View File

@@ -4,14 +4,37 @@
*/ */
//============================================================================== //==============================================================================
JobQueue::Count::Count () noexcept
: type (jtINVALID)
, waiting (0)
, running (0)
, deferred (0)
{
}
JobQueue::Count::Count (JobType type_) noexcept
: type (type_)
, waiting (0)
, running (0)
, deferred (0)
{
}
//------------------------------------------------------------------------------
SETUP_LOG (JobQueue) SETUP_LOG (JobQueue)
JobQueue::JobQueue () JobQueue::JobQueue ()
: m_workers (*this, "JobQueue", 0) : m_workers (*this, "JobQueue", 0)
, mLastJob (0) , mLastJob (0)
{ {
// Initialize the job counts.
// The 'limit' field in particular will be set based on the limit
for (int i = 0; i < NUM_JOB_TYPES; ++i) for (int i = 0; i < NUM_JOB_TYPES; ++i)
mJobCounts[static_cast<JobType>(i)] = std::make_pair<int, int>(0, 0); {
JobType const type (static_cast <JobType> (i));
mJobCounts [type] = Count (type);
}
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000); mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000); mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
@@ -50,7 +73,7 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP
it.first->peekEvent().start(); // start timing how long it stays in the queue it.first->peekEvent().start(); // start timing how long it stays in the queue
queueJobForRunning (*it.first, lock); queueJob (*it.first, lock);
} }
int JobQueue::getJobCount (JobType t) int JobQueue::getJobCount (JobType t)
@@ -59,7 +82,7 @@ int JobQueue::getJobCount (JobType t)
JobCounts::const_iterator c = mJobCounts.find (t); JobCounts::const_iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : c->second.first; return (c == mJobCounts.end ()) ? 0 : c->second.waiting;
} }
int JobQueue::getJobCountTotal (JobType t) int JobQueue::getJobCountTotal (JobType t)
@@ -68,7 +91,7 @@ int JobQueue::getJobCountTotal (JobType t)
JobCounts::const_iterator c = mJobCounts.find (t); JobCounts::const_iterator c = mJobCounts.find (t);
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second); return (c == mJobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
} }
int JobQueue::getJobCountGE (JobType t) int JobQueue::getJobCountGE (JobType t)
@@ -83,7 +106,7 @@ int JobQueue::getJobCountGE (JobType t)
BOOST_FOREACH (jt_int_pair const& it, mJobCounts) BOOST_FOREACH (jt_int_pair const& it, mJobCounts)
{ {
if (it.first >= t) if (it.first >= t)
ret += it.second.first; ret += it.second.waiting;
} }
return ret; return ret;
@@ -102,7 +125,8 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
BOOST_FOREACH (const jt_int_pair & it, mJobCounts) BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
{ {
ret.push_back (it); ret.push_back (std::make_pair (it.second.type,
std::make_pair (it.second.waiting, it.second.running)));
} }
return ret; return ret;
@@ -136,8 +160,8 @@ Json::Value JobQueue::getJson (int)
} }
else else
{ {
jobCount = it->second.first; jobCount = it->second.waiting;
threadCount = it->second.second; threadCount = it->second.running;
} }
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0)) if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0))
@@ -222,37 +246,6 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
m_workers.setNumberOfThreads (c); m_workers.setNumberOfThreads (c);
} }
//------------------------------------------------------------------------------
//
// Determines the number of free task slots for the given JobType.
//
// This can return a negative number.
//
// Pre-conditions:
// <none>
//
// Post-conditions:
// <none>
//
// Invariants:
// The calling thread owns the JobLock
//
int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&)
{
int const limit = getJobLimit (type);
if (limit != 0)
{
int waiting = mJobCounts [type].first;
int running = mJobCounts [type].second;
return (limit - running) - waiting;
}
// The actual number doesn't matter as long as its positive
return 1;
}
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// //
// Signals an added Job for processing. // Signals an added Job for processing.
@@ -269,26 +262,28 @@ int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&)
// Invariants: // Invariants:
// The calling thread owns the JobLock // The calling thread owns the JobLock
// //
void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock) void JobQueue::queueJob (Job const& job, ScopedLockType& lock)
{ {
JobType const type (job.getType ()); JobType const type (job.getType ());
check_precondition (type != jtINVALID); check_precondition (type != jtINVALID);
check_precondition (mJobSet.find (job) != mJobSet.end ()); check_precondition (mJobSet.find (job) != mJobSet.end ());
if (freeTaskSlots (type, lock) > 0) Count& count (mJobCounts [type]);
++count.waiting;
if (count.waiting + count.running < getJobLimit (type))
{ {
m_workers.addTask (); m_workers.addTask ();
} }
else else
{ {
// Do nothing. // defer the task until we go below the limit
// When the next job of this type finishes, it will //
// call addTask if there are more waiting than the limit ++count.deferred;
} }
// This has to happen after the call to freeTaskSlots
++mJobCounts [type].first;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -296,8 +291,7 @@ void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock)
// Returns the next Job we should run now. // Returns the next Job we should run now.
// //
// RunnableJob: // RunnableJob:
// A Job with no limit for its JobType, or // A Job in the JobSet whose slots count for its type is greater than zero.
// The number of running Jobs for its JobType is below the limit.
// //
// Pre-conditions: // Pre-conditions:
// mJobSet must not be empty. // mJobSet must not be empty.
@@ -312,17 +306,21 @@ void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock)
// Invariants: // Invariants:
// The calling thread owns the JobLock // The calling thread owns the JobLock
// //
void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&) void JobQueue::getNextJob (Job& job, ScopedLockType&)
{ {
check_precondition (! mJobSet.empty ()); check_precondition (! mJobSet.empty ());
JobSet::const_iterator iter; JobSet::const_iterator iter;
for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter) for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter)
{ {
// Check the requirements for RunnableJob Count& count (mJobCounts [iter->getType ()]);
if (getJobLimit (iter->getType ()) <= 0 ||
(mJobCounts [iter->getType ()].second < getJobLimit (iter->getType ()))) bassert (count.running <= getJobLimit (count.type));
// Run this job if we're running below the limit.
if (count.running < getJobLimit (count.type))
{ {
bassert (count.waiting > 0);
break; break;
} }
} }
@@ -331,14 +329,16 @@ void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&)
JobType const type = iter->getType (); JobType const type = iter->getType ();
Count& count (mJobCounts [type]);
check_postcondition (type != JobType::jtINVALID); check_postcondition (type != JobType::jtINVALID);
job = *iter; job = *iter;
mJobSet.erase (iter); mJobSet.erase (iter);
--(mJobCounts [type].first); --count.waiting;
++(mJobCounts [type].second); ++count.running;
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -356,25 +356,29 @@ void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&)
// Invariants: // Invariants:
// <none> // <none>
// //
void JobQueue::setRunningJobFinished (Job const& job) void JobQueue::finishJob (Job const& job)
{ {
JobType const type = job.getType (); JobType const type = job.getType ();
ScopedLockType lock (mJobLock);
check_precondition (mJobSet.find (job) == mJobSet.end ());
check_precondition (type != JobType::jtINVALID);
// If there were previously no free slots and we would
// free one up, and there are any other jobs of this type
// waiting, then go ahead and signal a task
//
if (freeTaskSlots (type, lock) == 0 && mJobCounts [type].first > 0)
{ {
m_workers.addTask (); ScopedLockType lock (mJobLock);
}
--(mJobCounts [type].second); check_precondition (mJobSet.find (job) == mJobSet.end ());
check_precondition (type != JobType::jtINVALID);
Count& count (mJobCounts [type]);
// Queue a deferred task
if (count.deferred > 0)
{
bassert (count.running + count.waiting >= getJobLimit (type));
--count.deferred;
m_workers.addTask ();
}
--count.running;
}
} }
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
@@ -394,7 +398,11 @@ void JobQueue::processTask ()
{ {
Job job; Job job;
getNextJobToRun (job, ScopedLockType (mJobLock)); {
ScopedLockType lock (mJobLock);
getNextJob (job, lock);
}
JobType const type (job.getType ()); JobType const type (job.getType ());
@@ -406,7 +414,7 @@ void JobQueue::processTask ()
job.doJob (); job.doJob ();
setRunningJobFinished (job); finishJob (job);
// Note that when Job::~Job is called, the last reference // Note that when Job::~Job is called, the last reference
// to the associated LoadEvent object (in the Job) may be destroyed. // to the associated LoadEvent object (in the Job) may be destroyed.
@@ -415,11 +423,12 @@ void JobQueue::processTask ()
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
// Returns the limit of running jobs for the given job type. // Returns the limit of running jobs for the given job type.
// A value of zero means no limit. // For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
// //
int JobQueue::getJobLimit (JobType type) int JobQueue::getJobLimit (JobType type)
{ {
int limit = 0; int limit = std::numeric_limits <int>::max ();
switch (type) switch (type)
{ {
@@ -434,7 +443,6 @@ int JobQueue::getJobLimit (JobType type)
case jtHO_WRITE: case jtHO_WRITE:
case jtGENERIC: case jtGENERIC:
default: default:
bassertfalse;
limit = 0; limit = 0;
break; break;
@@ -452,7 +460,7 @@ int JobQueue::getJobLimit (JobType type)
case jtPROPOSAL_t: case jtPROPOSAL_t:
case jtSWEEP: case jtSWEEP:
case jtADMIN: case jtADMIN:
limit = 0; limit = std::numeric_limits <int>::max ();
break; break;
case jtLEDGER_DATA: limit = 2; break; case jtLEDGER_DATA: limit = 2; break;

View File

@@ -10,7 +10,21 @@
class JobQueue : private Workers::Callback class JobQueue : private Workers::Callback
{ {
public: public:
typedef std::map<JobType, std::pair<int, int > > JobCounts; // Statistics on a particular JobType
struct Count
{
Count () noexcept;
explicit Count (JobType type) noexcept;
JobType type; // The type of Job these counts reflect
int waiting; // The number waiting
int running; // How many are running
int deferred; // Number of jobs we didn't signal due to limits
};
typedef std::map <JobType, Count> JobCounts;
//--------------------------------------------------------------------------
JobQueue (); JobQueue ();
@@ -22,11 +36,15 @@ public:
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job); void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job);
int getJobCount (JobType t); // Jobs waiting at this priority int getJobCount (JobType t); // Jobs waiting at this priority
int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority
int getJobCountGE (JobType t); // All waiting jobs at or greater than this priority int getJobCountGE (JobType t); // All waiting jobs at or greater than this priority
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts (); // jobs waiting, threads doing std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts (); // jobs waiting, threads doing
void shutdown (); void shutdown ();
void setThreadCount (int c, bool const standaloneMode); void setThreadCount (int c, bool const standaloneMode);
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar // VFALCO TODO Rename these to newLoadEventMeasurement or something similar
@@ -46,6 +64,7 @@ public:
} }
bool isOverloaded (); bool isOverloaded ();
Json::Value getJson (int c = 0); Json::Value getJson (int c = 0);
private: private:
@@ -53,10 +72,9 @@ private:
typedef JobLockType::scoped_lock ScopedLockType; typedef JobLockType::scoped_lock ScopedLockType;
typedef std::set <Job> JobSet; typedef std::set <Job> JobSet;
int freeTaskSlots (JobType type, ScopedLockType const&); void queueJob (Job const& job, ScopedLockType&);
void queueJobForRunning (Job const& job, ScopedLockType const&); void getNextJob (Job& job, ScopedLockType&);
void getNextJobToRun (Job& job, ScopedLockType const&); void finishJob (Job const& job);
void setRunningJobFinished (Job const& job);
void processTask (); void processTask ();