diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 0582ed7eba..513cfcb46e 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -108,7 +108,7 @@ void JobQueue::addJob(JobType type, const std::string& name, const boost::functi assert(mThreadCount != 0); // do not add jobs to a queue with no threads mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc)); - ++mJobCounts[type]; + ++mJobCounts[type].first; mJobCond.notify_one(); } @@ -116,8 +116,8 @@ int JobQueue::getJobCount(JobType t) { boost::mutex::scoped_lock sl(mJobLock); - std::map::iterator c = mJobCounts.find(t); - return (c == mJobCounts.end()) ? 0 : c->second; + std::map< JobType, std::pair >::iterator c = mJobCounts.find(t); + return (c == mJobCounts.end()) ? 0 : c->second.first; } int JobQueue::getJobCountGE(JobType t) @@ -126,21 +126,21 @@ int JobQueue::getJobCountGE(JobType t) boost::mutex::scoped_lock sl(mJobLock); - typedef std::map::value_type jt_int_pair; + typedef std::map< JobType, std::pair >::value_type jt_int_pair; BOOST_FOREACH(const jt_int_pair& it, mJobCounts) if (it.first >= t) - ret += it.second; + ret += it.second.first; return ret; } -std::vector< std::pair > JobQueue::getJobCounts() +std::vector< std::pair > > JobQueue::getJobCounts() { // return all jobs at all priority levels - std::vector< std::pair > ret; + std::vector< std::pair > > ret; boost::mutex::scoped_lock sl(mJobLock); ret.reserve(mJobCounts.size()); - typedef std::map::value_type jt_int_pair; + typedef std::map< JobType, std::pair >::value_type jt_int_pair; BOOST_FOREACH(const jt_int_pair& it, mJobCounts) ret.push_back(it); @@ -157,28 +157,38 @@ Json::Value JobQueue::getJson(int) Json::Value priorities = Json::arrayValue; for (int i = 0; i < NUM_JOB_TYPES; ++i) { - uint64 count, latencyAvg, latencyPeak, jobCount; + uint64 count, latencyAvg, latencyPeak; + int jobCount, threadCount; bool isOver; mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak, isOver); - std::map::iterator it = mJobCounts.find(static_cast(i)); + std::map< JobType, std::pair >::iterator it = mJobCounts.find(static_cast(i)); if (it == mJobCounts.end()) + { jobCount = 0; + threadCount = 0; + } else - jobCount = it->second; - if ((count != 0) || (jobCount != 0) || (latencyPeak != 0)) + { + jobCount = it->second.first; + threadCount = it->second.second; + } + + if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0)) { Json::Value pri(Json::objectValue); if (isOver) pri["over_target"] = true; pri["job_type"] = Job::toString(static_cast(i)); if (jobCount != 0) - pri["waiting"] = static_cast(jobCount); + pri["waiting"] = jobCount; if (count != 0) pri["per_second"] = static_cast(count); if (latencyPeak != 0) pri["peak_time"] = static_cast(latencyPeak); if (latencyAvg != 0) pri["avg_time"] = static_cast(latencyAvg); + if (threadCount != 0) + pri["in_progress"] = threadCount; priorities.append(pri); } } @@ -223,7 +233,7 @@ void JobQueue::setThreadCount(int c) boost::mutex::scoped_lock sl(mJobLock); - while (mJobCounts[jtDEATH] != 0) + while (mJobCounts[jtDEATH].first != 0) mJobCond.wait(sl); while (mThreadCount < c) @@ -233,12 +243,12 @@ void JobQueue::setThreadCount(int c) } while (mThreadCount > c) { - if (mJobCounts[jtDEATH] != 0) + if (mJobCounts[jtDEATH].first != 0) mJobCond.wait(sl); else { mJobSet.insert(Job(jtDEATH, 0)); - ++mJobCounts[jtDEATH]; + ++(mJobCounts[jtDEATH].first); } } mJobCond.notify_one(); // in case we sucked up someone else's signal @@ -255,20 +265,25 @@ void JobQueue::threadEntry() if (mShuttingDown) break; + JobType type; std::set::iterator it = mJobSet.begin(); { Job job(*it); mJobSet.erase(it); - --mJobCounts[job.getType()]; - if (job.getType() == jtDEATH) + type = job.getType(); + --(mJobCounts[type].first); + + if (type == jtDEATH) break; + ++(mJobCounts[type].second); sl.unlock(); - cLog(lsTRACE) << "Doing " << Job::toString(job.getType()) << " job"; + cLog(lsTRACE) << "Doing " << Job::toString(type) << " job"; job.doJob(); } // must destroy job without holding lock sl.lock(); + --(mJobCounts[type].second); } --mThreadCount; mJobCond.notify_all(); diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 76602678f4..2567537f15 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -89,11 +89,12 @@ protected: uint64 mLastJob; std::set mJobSet; - std::map mJobCounts; LoadMonitor mJobLoads[NUM_JOB_TYPES]; int mThreadCount; bool mShuttingDown; + std::map > mJobCounts; + void threadEntry(void); @@ -105,7 +106,7 @@ public: int getJobCount(JobType t); // Jobs at this priority int getJobCountGE(JobType t); // All jobs at or greater than this priority - std::vector< std::pair > getJobCounts(); + std::vector< std::pair > > getJobCounts(); // jobs waiting, threads doing void shutdown(); void setThreadCount(int c = 0);