Track threads working on jobs.

This commit is contained in:
JoelKatz
2013-03-06 19:07:54 -08:00
parent 0ce965948e
commit dc3d82272e
2 changed files with 37 additions and 21 deletions

View File

@@ -108,7 +108,7 @@ void JobQueue::addJob(JobType type, const std::string& name, const boost::functi
assert(mThreadCount != 0); // do not add jobs to a queue with no threads assert(mThreadCount != 0); // do not add jobs to a queue with no threads
mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc)); mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc));
++mJobCounts[type]; ++mJobCounts[type].first;
mJobCond.notify_one(); mJobCond.notify_one();
} }
@@ -116,8 +116,8 @@ int JobQueue::getJobCount(JobType t)
{ {
boost::mutex::scoped_lock sl(mJobLock); boost::mutex::scoped_lock sl(mJobLock);
std::map<JobType, int>::iterator c = mJobCounts.find(t); std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find(t);
return (c == mJobCounts.end()) ? 0 : c->second; return (c == mJobCounts.end()) ? 0 : c->second.first;
} }
int JobQueue::getJobCountGE(JobType t) int JobQueue::getJobCountGE(JobType t)
@@ -126,21 +126,21 @@ int JobQueue::getJobCountGE(JobType t)
boost::mutex::scoped_lock sl(mJobLock); boost::mutex::scoped_lock sl(mJobLock);
typedef std::map<JobType, int>::value_type jt_int_pair; typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
BOOST_FOREACH(const jt_int_pair& it, mJobCounts) BOOST_FOREACH(const jt_int_pair& it, mJobCounts)
if (it.first >= t) if (it.first >= t)
ret += it.second; ret += it.second.first;
return ret; return ret;
} }
std::vector< std::pair<JobType, int> > JobQueue::getJobCounts() std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts()
{ // return all jobs at all priority levels { // return all jobs at all priority levels
std::vector< std::pair<JobType, int> > ret; std::vector< std::pair<JobType, std::pair<int, int> > > ret;
boost::mutex::scoped_lock sl(mJobLock); boost::mutex::scoped_lock sl(mJobLock);
ret.reserve(mJobCounts.size()); ret.reserve(mJobCounts.size());
typedef std::map<JobType, int>::value_type jt_int_pair; typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
BOOST_FOREACH(const jt_int_pair& it, mJobCounts) BOOST_FOREACH(const jt_int_pair& it, mJobCounts)
ret.push_back(it); ret.push_back(it);
@@ -157,28 +157,38 @@ Json::Value JobQueue::getJson(int)
Json::Value priorities = Json::arrayValue; Json::Value priorities = Json::arrayValue;
for (int i = 0; i < NUM_JOB_TYPES; ++i) for (int i = 0; i < NUM_JOB_TYPES; ++i)
{ {
uint64 count, latencyAvg, latencyPeak, jobCount; uint64 count, latencyAvg, latencyPeak;
int jobCount, threadCount;
bool isOver; bool isOver;
mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak, isOver); mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak, isOver);
std::map<JobType, int>::iterator it = mJobCounts.find(static_cast<JobType>(i)); std::map< JobType, std::pair<int, int> >::iterator it = mJobCounts.find(static_cast<JobType>(i));
if (it == mJobCounts.end()) if (it == mJobCounts.end())
{
jobCount = 0; jobCount = 0;
threadCount = 0;
}
else else
jobCount = it->second; {
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0)) jobCount = it->second.first;
threadCount = it->second.second;
}
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0))
{ {
Json::Value pri(Json::objectValue); Json::Value pri(Json::objectValue);
if (isOver) if (isOver)
pri["over_target"] = true; pri["over_target"] = true;
pri["job_type"] = Job::toString(static_cast<JobType>(i)); pri["job_type"] = Job::toString(static_cast<JobType>(i));
if (jobCount != 0) if (jobCount != 0)
pri["waiting"] = static_cast<int>(jobCount); pri["waiting"] = jobCount;
if (count != 0) if (count != 0)
pri["per_second"] = static_cast<int>(count); pri["per_second"] = static_cast<int>(count);
if (latencyPeak != 0) if (latencyPeak != 0)
pri["peak_time"] = static_cast<int>(latencyPeak); pri["peak_time"] = static_cast<int>(latencyPeak);
if (latencyAvg != 0) if (latencyAvg != 0)
pri["avg_time"] = static_cast<int>(latencyAvg); pri["avg_time"] = static_cast<int>(latencyAvg);
if (threadCount != 0)
pri["in_progress"] = threadCount;
priorities.append(pri); priorities.append(pri);
} }
} }
@@ -223,7 +233,7 @@ void JobQueue::setThreadCount(int c)
boost::mutex::scoped_lock sl(mJobLock); boost::mutex::scoped_lock sl(mJobLock);
while (mJobCounts[jtDEATH] != 0) while (mJobCounts[jtDEATH].first != 0)
mJobCond.wait(sl); mJobCond.wait(sl);
while (mThreadCount < c) while (mThreadCount < c)
@@ -233,12 +243,12 @@ void JobQueue::setThreadCount(int c)
} }
while (mThreadCount > c) while (mThreadCount > c)
{ {
if (mJobCounts[jtDEATH] != 0) if (mJobCounts[jtDEATH].first != 0)
mJobCond.wait(sl); mJobCond.wait(sl);
else else
{ {
mJobSet.insert(Job(jtDEATH, 0)); mJobSet.insert(Job(jtDEATH, 0));
++mJobCounts[jtDEATH]; ++(mJobCounts[jtDEATH].first);
} }
} }
mJobCond.notify_one(); // in case we sucked up someone else's signal mJobCond.notify_one(); // in case we sucked up someone else's signal
@@ -255,20 +265,25 @@ void JobQueue::threadEntry()
if (mShuttingDown) if (mShuttingDown)
break; break;
JobType type;
std::set<Job>::iterator it = mJobSet.begin(); std::set<Job>::iterator it = mJobSet.begin();
{ {
Job job(*it); Job job(*it);
mJobSet.erase(it); mJobSet.erase(it);
--mJobCounts[job.getType()];
if (job.getType() == jtDEATH) type = job.getType();
--(mJobCounts[type].first);
if (type == jtDEATH)
break; break;
++(mJobCounts[type].second);
sl.unlock(); sl.unlock();
cLog(lsTRACE) << "Doing " << Job::toString(job.getType()) << " job"; cLog(lsTRACE) << "Doing " << Job::toString(type) << " job";
job.doJob(); job.doJob();
} // must destroy job without holding lock } // must destroy job without holding lock
sl.lock(); sl.lock();
--(mJobCounts[type].second);
} }
--mThreadCount; --mThreadCount;
mJobCond.notify_all(); mJobCond.notify_all();

View File

@@ -89,11 +89,12 @@ protected:
uint64 mLastJob; uint64 mLastJob;
std::set<Job> mJobSet; std::set<Job> mJobSet;
std::map<JobType, int> mJobCounts;
LoadMonitor mJobLoads[NUM_JOB_TYPES]; LoadMonitor mJobLoads[NUM_JOB_TYPES];
int mThreadCount; int mThreadCount;
bool mShuttingDown; bool mShuttingDown;
std::map<JobType, std::pair<int, int > > mJobCounts;
void threadEntry(void); void threadEntry(void);
@@ -105,7 +106,7 @@ public:
int getJobCount(JobType t); // Jobs at this priority int getJobCount(JobType t); // Jobs at this priority
int getJobCountGE(JobType t); // All jobs at or greater than this priority int getJobCountGE(JobType t); // All jobs at or greater than this priority
std::vector< std::pair<JobType, int> > getJobCounts(); std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts(); // jobs waiting, threads doing
void shutdown(); void shutdown();
void setThreadCount(int c = 0); void setThreadCount(int c = 0);