Use Workers in JobQueue

This commit is contained in:
Vinnie Falco
2013-07-29 08:46:09 -07:00
parent bf9806b07f
commit ddef0ae7c7
2 changed files with 64 additions and 6 deletions

View File

@@ -7,7 +7,8 @@
SETUP_LOG (JobQueue) SETUP_LOG (JobQueue)
JobQueue::JobQueue () JobQueue::JobQueue ()
: mLastJob (0) : m_workers (*this, 0)
, mLastJob (0)
, mThreadCount (0) , mThreadCount (0)
, mShuttingDown (false) , mShuttingDown (false)
{ {
@@ -29,6 +30,10 @@ JobQueue::JobQueue ()
mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500); mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
} }
JobQueue::~JobQueue ()
{
}
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc) void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
{ {
addLimitJob(type, name, 0, jobFunc); addLimitJob(type, name, 0, jobFunc);
@@ -47,7 +52,10 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
it.first->peekEvent().start(); // start timing how long it stays in the queue it.first->peekEvent().start(); // start timing how long it stays in the queue
++mJobCounts[type].first; ++mJobCounts[type].first;
mJobCond.notify_one ();
//mJobCond.notify_one ();
m_workers.addTask ();
} }
int JobQueue::getJobCount (JobType t) int JobQueue::getJobCount (JobType t)
@@ -212,6 +220,10 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
// VFALCO TODO Split the function up. The lower part actually does the "do", // VFALCO TODO Split the function up. The lower part actually does the "do",
// The part above this comment figures out the value for numThreads // The part above this comment figures out the value for numThreads
// //
m_workers.setNumberOfThreads (c);
boost::mutex::scoped_lock sl (mJobLock); boost::mutex::scoped_lock sl (mJobLock);
while (mJobCounts[jtDEATH].first != 0) while (mJobCounts[jtDEATH].first != 0)
@@ -318,4 +330,43 @@ void JobQueue::threadEntry ()
} }
} }
// vim:ts=4 void JobQueue::processTask ()
{
{
// This lock shouldn't be needed
boost::mutex::scoped_lock lock (mJobLock);
JobType type (jtINVALID);
{
Job job;
bool const haveJob = getJob (job);
if (haveJob)
{
type = job.getType ();
// VFALCO TODO Replace with Atomic <>
--(mJobCounts[type].first);
++(mJobCounts[type].second);
lock.unlock ();
Thread::setCurrentThreadName (Job::toString (type));
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
job.doJob ();
}
// must destroy job, here, without holding lock
}
if (type != jtINVALID)
{
lock.lock ();
-- (mJobCounts[type].second);
}
}
}

View File

@@ -4,14 +4,16 @@
*/ */
//============================================================================== //==============================================================================
#ifndef RIPPLE_JOBQUEUE_H #ifndef RIPPLE_JOBQUEUE_H_INCLUDED
#define RIPPLE_JOBQUEUE_H #define RIPPLE_JOBQUEUE_H_INCLUDED
class JobQueue class JobQueue : private Workers::Callback
{ {
public: public:
JobQueue (); JobQueue ();
~JobQueue ();
// VFALCO TODO make convenience functions that allow the caller to not // VFALCO TODO make convenience functions that allow the caller to not
// have to call bind. // have to call bind.
// //
@@ -48,6 +50,11 @@ public:
private: private:
void threadEntry (); void threadEntry ();
void processTask ();
private:
Workers m_workers;
boost::mutex mJobLock; boost::mutex mJobLock;
boost::condition_variable mJobCond; boost::condition_variable mJobCond;