Switch back to mutex for JobQueue

This commit is contained in:
Vinnie Falco
2013-09-05 19:46:16 -07:00
parent 5db25ede25
commit abcb1dce57
2 changed files with 40 additions and 40 deletions

View File

@@ -37,14 +37,14 @@ JobQueue::JobQueue ()
: m_workers (*this, "JobQueue", 0)
{
{
SharedState::WriteAccess state (m_state);
ScopedLock lock (m_mutex);
// Initialize the job counts.
// The 'limit' field in particular will be set based on the limit
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
JobType const type (static_cast <JobType> (i));
state->jobCounts [type] = Count (type);
m_state.jobCounts [type] = Count (type);
}
}
@@ -81,34 +81,34 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
SharedState::WriteAccess state (m_state);
ScopedLock lock (m_mutex);
std::pair< std::set <Job>::iterator, bool > it =
state->jobSet.insert (Job (
type, name, ++state->lastJob, mJobLoads[type], jobFunc));
m_state.jobSet.insert (Job (
type, name, ++m_state.lastJob, mJobLoads[type], jobFunc));
// start timing how long it stays in the queue
it.first->peekEvent().start();
queueJob (*it.first, state);
queueJob (*it.first, lock);
}
int JobQueue::getJobCount (JobType t)
{
SharedState::ReadAccess state (m_state);
ScopedLock lock (m_mutex);
JobCounts::const_iterator c = state->jobCounts.find (t);
JobCounts::const_iterator c = m_state.jobCounts.find (t);
return (c == state->jobCounts.end ()) ? 0 : c->second.waiting;
return (c == m_state.jobCounts.end ()) ? 0 : c->second.waiting;
}
int JobQueue::getJobCountTotal (JobType t)
{
SharedState::ReadAccess state (m_state);
ScopedLock lock (m_mutex);
JobCounts::const_iterator c = state->jobCounts.find (t);
JobCounts::const_iterator c = m_state.jobCounts.find (t);
return (c == state->jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
return (c == m_state.jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
}
int JobQueue::getJobCountGE (JobType t)
@@ -116,11 +116,11 @@ int JobQueue::getJobCountGE (JobType t)
// return the number of jobs at this priority level or greater
int ret = 0;
SharedState::ReadAccess state (m_state);
ScopedLock lock (m_mutex);
typedef JobCounts::value_type jt_int_pair;
BOOST_FOREACH (jt_int_pair const& it, state->jobCounts)
BOOST_FOREACH (jt_int_pair const& it, m_state.jobCounts)
{
if (it.first >= t)
ret += it.second.waiting;
@@ -134,13 +134,13 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
SharedState::ReadAccess state (m_state);
ScopedLock lock (m_mutex);
ret.reserve (state->jobCounts.size ());
ret.reserve (m_state.jobCounts.size ());
typedef JobCounts::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, state->jobCounts)
BOOST_FOREACH (const jt_int_pair & it, m_state.jobCounts)
{
ret.push_back (std::make_pair (it.second.type,
std::make_pair (it.second.waiting, it.second.running)));
@@ -157,7 +157,7 @@ Json::Value JobQueue::getJson (int)
Json::Value priorities = Json::arrayValue;
SharedState::ReadAccess state (m_state);
ScopedLock lock (m_mutex);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
{
@@ -175,9 +175,9 @@ Json::Value JobQueue::getJson (int)
mJobLoads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
JobCounts::const_iterator it = state->jobCounts.find (type);
JobCounts::const_iterator it = m_state.jobCounts.find (type);
if (it == state->jobCounts.end ())
if (it == m_state.jobCounts.end ())
{
jobCount = 0;
threadCount = 0;
@@ -284,14 +284,14 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state)
void JobQueue::queueJob (Job const& job, ScopedLock const& lock)
{
JobType const type (job.getType ());
bassert (type != jtINVALID);
bassert (state->jobSet.find (job) != state->jobSet.end ());
bassert (m_state.jobSet.find (job) != m_state.jobSet.end ());
Count& count (state->jobCounts [type]);
Count& count (m_state.jobCounts [type]);
if (count.waiting + count.running < getJobLimit (type))
{
@@ -326,14 +326,14 @@ void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state)
// Invariants:
// The calling thread owns the JobLock
//
void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state)
void JobQueue::getNextJob (Job& job, ScopedLock const& lock)
{
bassert (! state->jobSet.empty ());
bassert (! m_state.jobSet.empty ());
JobSet::const_iterator iter;
for (iter = state->jobSet.begin (); iter != state->jobSet.end (); ++iter)
for (iter = m_state.jobSet.begin (); iter != m_state.jobSet.end (); ++iter)
{
Count& count (state->jobCounts [iter->getType ()]);
Count& count (m_state.jobCounts [iter->getType ()]);
bassert (count.running <= getJobLimit (count.type));
@@ -345,17 +345,17 @@ void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state)
}
}
bassert (iter != state->jobSet.end ());
bassert (iter != m_state.jobSet.end ());
JobType const type = iter->getType ();
Count& count (state->jobCounts [type]);
Count& count (m_state.jobCounts [type]);
bassert (type != jtINVALID);
job = *iter;
state->jobSet.erase (iter);
m_state.jobSet.erase (iter);
--count.waiting;
++count.running;
@@ -381,12 +381,12 @@ void JobQueue::finishJob (Job const& job)
JobType const type = job.getType ();
{
SharedState::WriteAccess state (m_state);
ScopedLock lock (m_mutex);
bassert (state->jobSet.find (job) == state->jobSet.end ());
bassert (m_state.jobSet.find (job) == m_state.jobSet.end ());
bassert (type != jtINVALID);
Count& count (state->jobCounts [type]);
Count& count (m_state.jobCounts [type]);
// Queue a deferred task if possible
if (count.deferred > 0)
@@ -419,9 +419,9 @@ void JobQueue::processTask ()
Job job;
{
SharedState::WriteAccess state (m_state);
ScopedLock lock (m_mutex);
getNextJob (job, state);
getNextJob (job, lock);
}
JobType const type (job.getType ());