diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index 508a8d66c..43d5cd9f5 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -37,14 +37,14 @@ JobQueue::JobQueue () : m_workers (*this, "JobQueue", 0) { { - SharedState::WriteAccess state (m_state); + ScopedLock lock (m_mutex); // Initialize the job counts. // The 'limit' field in particular will be set based on the limit for (int i = 0; i < NUM_JOB_TYPES; ++i) { JobType const type (static_cast (i)); - state->jobCounts [type] = Count (type); + m_state.jobCounts [type] = Count (type); } } @@ -81,34 +81,34 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP // do not add jobs to a queue with no threads bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); - SharedState::WriteAccess state (m_state); + ScopedLock lock (m_mutex); std::pair< std::set ::iterator, bool > it = - state->jobSet.insert (Job ( - type, name, ++state->lastJob, mJobLoads[type], jobFunc)); + m_state.jobSet.insert (Job ( + type, name, ++m_state.lastJob, mJobLoads[type], jobFunc)); // start timing how long it stays in the queue it.first->peekEvent().start(); - queueJob (*it.first, state); + queueJob (*it.first, lock); } int JobQueue::getJobCount (JobType t) { - SharedState::ReadAccess state (m_state); + ScopedLock lock (m_mutex); - JobCounts::const_iterator c = state->jobCounts.find (t); + JobCounts::const_iterator c = m_state.jobCounts.find (t); - return (c == state->jobCounts.end ()) ? 0 : c->second.waiting; + return (c == m_state.jobCounts.end ()) ? 0 : c->second.waiting; } int JobQueue::getJobCountTotal (JobType t) { - SharedState::ReadAccess state (m_state); + ScopedLock lock (m_mutex); - JobCounts::const_iterator c = state->jobCounts.find (t); + JobCounts::const_iterator c = m_state.jobCounts.find (t); - return (c == state->jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); + return (c == m_state.jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); } int JobQueue::getJobCountGE (JobType t) @@ -116,11 +116,11 @@ int JobQueue::getJobCountGE (JobType t) // return the number of jobs at this priority level or greater int ret = 0; - SharedState::ReadAccess state (m_state); + ScopedLock lock (m_mutex); typedef JobCounts::value_type jt_int_pair; - BOOST_FOREACH (jt_int_pair const& it, state->jobCounts) + BOOST_FOREACH (jt_int_pair const& it, m_state.jobCounts) { if (it.first >= t) ret += it.second.waiting; @@ -134,13 +134,13 @@ std::vector< std::pair > > JobQueue::getJobCounts ( // return all jobs at all priority levels std::vector< std::pair > > ret; - SharedState::ReadAccess state (m_state); + ScopedLock lock (m_mutex); - ret.reserve (state->jobCounts.size ()); + ret.reserve (m_state.jobCounts.size ()); typedef JobCounts::value_type jt_int_pair; - BOOST_FOREACH (const jt_int_pair & it, state->jobCounts) + BOOST_FOREACH (const jt_int_pair & it, m_state.jobCounts) { ret.push_back (std::make_pair (it.second.type, std::make_pair (it.second.waiting, it.second.running))); @@ -157,7 +157,7 @@ Json::Value JobQueue::getJson (int) Json::Value priorities = Json::arrayValue; - SharedState::ReadAccess state (m_state); + ScopedLock lock (m_mutex); for (int i = 0; i < NUM_JOB_TYPES; ++i) { @@ -175,9 +175,9 @@ Json::Value JobQueue::getJson (int) mJobLoads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver); - JobCounts::const_iterator it = state->jobCounts.find (type); + JobCounts::const_iterator it = m_state.jobCounts.find (type); - if (it == state->jobCounts.end ()) + if (it == m_state.jobCounts.end ()) { jobCount = 0; threadCount = 0; @@ -284,14 +284,14 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) // Invariants: // The calling thread owns the JobLock // -void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state) +void JobQueue::queueJob (Job const& job, ScopedLock const& lock) { JobType const type (job.getType ()); bassert (type != jtINVALID); - bassert (state->jobSet.find (job) != state->jobSet.end ()); + bassert (m_state.jobSet.find (job) != m_state.jobSet.end ()); - Count& count (state->jobCounts [type]); + Count& count (m_state.jobCounts [type]); if (count.waiting + count.running < getJobLimit (type)) { @@ -326,14 +326,14 @@ void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state) // Invariants: // The calling thread owns the JobLock // -void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state) +void JobQueue::getNextJob (Job& job, ScopedLock const& lock) { - bassert (! state->jobSet.empty ()); + bassert (! m_state.jobSet.empty ()); JobSet::const_iterator iter; - for (iter = state->jobSet.begin (); iter != state->jobSet.end (); ++iter) + for (iter = m_state.jobSet.begin (); iter != m_state.jobSet.end (); ++iter) { - Count& count (state->jobCounts [iter->getType ()]); + Count& count (m_state.jobCounts [iter->getType ()]); bassert (count.running <= getJobLimit (count.type)); @@ -345,17 +345,17 @@ void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state) } } - bassert (iter != state->jobSet.end ()); + bassert (iter != m_state.jobSet.end ()); JobType const type = iter->getType (); - Count& count (state->jobCounts [type]); + Count& count (m_state.jobCounts [type]); bassert (type != jtINVALID); job = *iter; - state->jobSet.erase (iter); + m_state.jobSet.erase (iter); --count.waiting; ++count.running; @@ -381,12 +381,12 @@ void JobQueue::finishJob (Job const& job) JobType const type = job.getType (); { - SharedState::WriteAccess state (m_state); + ScopedLock lock (m_mutex); - bassert (state->jobSet.find (job) == state->jobSet.end ()); + bassert (m_state.jobSet.find (job) == m_state.jobSet.end ()); bassert (type != jtINVALID); - Count& count (state->jobCounts [type]); + Count& count (m_state.jobCounts [type]); // Queue a deferred task if possible if (count.deferred > 0) @@ -419,9 +419,9 @@ void JobQueue::processTask () Job job; { - SharedState::WriteAccess state (m_state); + ScopedLock lock (m_mutex); - getNextJob (job, state); + getNextJob (job, lock); } JobType const type (job.getType ()); diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index a80aa66ae..a42f08afa 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -79,17 +79,17 @@ private: JobCounts jobCounts; }; - typedef SharedData SharedState; - - void queueJob (Job const& job, SharedState::WriteAccess& state); - void getNextJob (Job& job, SharedState::WriteAccess& state); + void queueJob (Job const& job, ScopedLock const& lock); + void getNextJob (Job& job, ScopedLock const& lock); void finishJob (Job const& job); void processTask (); static int getJobLimit (JobType type); private: - SharedState m_state; + typedef CriticalSection::ScopedLockType ScopedLock; + CriticalSection m_mutex; + State m_state; Workers m_workers; LoadMonitor mJobLoads [NUM_JOB_TYPES]; };