diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index cda9f17fde..e7d0026a8d 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -7,7 +7,8 @@ SETUP_LOG (JobQueue) JobQueue::JobQueue () - : mLastJob (0) + : m_workers (*this, 0) + , mLastJob (0) , mThreadCount (0) , mShuttingDown (false) { @@ -29,6 +30,10 @@ JobQueue::JobQueue () mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500); } +JobQueue::~JobQueue () +{ +} + void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc) { addLimitJob(type, name, 0, jobFunc); @@ -47,7 +52,10 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); it.first->peekEvent().start(); // start timing how long it stays in the queue ++mJobCounts[type].first; - mJobCond.notify_one (); + + //mJobCond.notify_one (); + + m_workers.addTask (); } int JobQueue::getJobCount (JobType t) @@ -212,6 +220,10 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) // VFALCO TODO Split the function up. The lower part actually does the "do", // The part above this comment figures out the value for numThreads // + + m_workers.setNumberOfThreads (c); + + boost::mutex::scoped_lock sl (mJobLock); while (mJobCounts[jtDEATH].first != 0) @@ -318,4 +330,43 @@ void JobQueue::threadEntry () } } -// vim:ts=4 +void JobQueue::processTask () +{ + { + // This lock shouldn't be needed + boost::mutex::scoped_lock lock (mJobLock); + + JobType type (jtINVALID); + + { + Job job; + + bool const haveJob = getJob (job); + + if (haveJob) + { + type = job.getType (); + + // VFALCO TODO Replace with Atomic <> + --(mJobCounts[type].first); + ++(mJobCounts[type].second); + + lock.unlock (); + + Thread::setCurrentThreadName (Job::toString (type)); + + WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job"; + + job.doJob (); + } + + // must destroy job, here, without holding lock + } + + if (type != jtINVALID) + { + lock.lock (); + -- (mJobCounts[type].second); + } + } +} diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index 1cc1aac28c..0663604c89 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -4,14 +4,16 @@ */ //============================================================================== -#ifndef RIPPLE_JOBQUEUE_H -#define RIPPLE_JOBQUEUE_H +#ifndef RIPPLE_JOBQUEUE_H_INCLUDED +#define RIPPLE_JOBQUEUE_H_INCLUDED -class JobQueue +class JobQueue : private Workers::Callback { public: JobQueue (); + ~JobQueue (); + // VFALCO TODO make convenience functions that allow the caller to not // have to call bind. // @@ -48,6 +50,11 @@ public: private: void threadEntry (); + void processTask (); + +private: + Workers m_workers; + boost::mutex mJobLock; boost::condition_variable mJobCond;