A job queue that will allow us to have a configurable number of threads servicing

arbitrary jobs in priority order, with an easy way to get counts of how many jobs are pending.
This commit is contained in:
JoelKatz
2012-10-29 16:06:59 -07:00
parent 7cd8be5b2b
commit 4430798506
2 changed files with 236 additions and 0 deletions

156
src/JobQueue.cpp Normal file
View File

@@ -0,0 +1,156 @@
#include "JobQueue.h"
#include <boost/make_shared.hpp>
#include <boost/foreach.hpp>
#include <boost/bind.hpp>
#include <boost/thread.hpp>
const char* Job::toString(JobType t)
{
switch(t)
{
case jtINVALID: return "invalid";
case jtVALIDATION_ut: return "untrustedValidation";
case jtTRANSACTION: return "transaction";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtVALIDATION_t: return "trustedValidation";
case jtPROPOSAL_t: return "trustedProposal";
case jtADMIN: return "administration";
case jtDEATH: return "jobOfDeath";
default: assert(false); return "unknown";
}
}
bool Job::operator<(const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex < j.mJobIndex;
}
bool Job::operator<=(const Job& j) const
{
if (mType < j.mType)
return true;
if (mType > j.mType)
return false;
return mJobIndex <= j.mJobIndex;
}
bool Job::operator>(const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex > j.mJobIndex;
}
bool Job::operator>=(const Job& j) const
{
if (mType < j.mType)
return false;
if (mType > j.mType)
return true;
return mJobIndex >= j.mJobIndex;
}
void JobQueue::addJob(JobType type, const boost::function<void(void)>& jobFunc)
{
assert(type != jtINVALID);
boost::mutex::scoped_lock sl(mJobLock);
mJobSet.insert(Job(type, ++mLastJob, jobFunc));
++mJobCounts[type];
mJobCond.notify_one();
}
int JobQueue::getJobCount(JobType t)
{
int ret = 0;
boost::mutex::scoped_lock sl(mJobLock);
typedef std::pair<JobType, int> jt_int_pair;
BOOST_FOREACH(const jt_int_pair& it, mJobCounts)
if (it.first >= t)
ret += it.second;
return ret;
}
std::vector< std::pair<JobType, int> > JobQueue::getJobCounts()
{
std::vector< std::pair<JobType, int> > ret;
boost::mutex::scoped_lock sl(mJobLock);
ret.reserve(mJobCounts.size());
typedef std::pair<JobType, int> jt_int_pair;
BOOST_FOREACH(const jt_int_pair& it, mJobCounts)
ret.push_back(it);
return ret;
}
void JobQueue::shutdown()
{
boost::mutex::scoped_lock sl(mJobLock);
mShuttingDown = true;
mJobCond.notify_all();
while (mThreadCount != 0)
mJobCond.wait(sl);
}
void JobQueue::setThreadCount(int c)
{
boost::mutex::scoped_lock sl(mJobLock);
while (mThreadCount != c)
{
if (mThreadCount < c)
{
++mThreadCount;
boost::thread t(boost::bind(&JobQueue::threadEntry, this));
t.detach();
}
if (mThreadCount > c)
{
if (mJobCounts[jtDEATH] != 0)
mJobCond.wait(sl);
else
{
mJobSet.insert(Job(jtDEATH, 0));
++mJobCounts[jtDEATH];
}
}
}
}
void JobQueue::threadEntry()
{
boost::mutex::scoped_lock sl(mJobLock);
while (!mShuttingDown)
{
while (mJobSet.empty() && !mShuttingDown)
mJobCond.wait(sl);
if (mShuttingDown)
break;
std::set<Job>::iterator it = mJobSet.begin();
Job job(*it);
mJobSet.erase(it);
--mJobCounts[job.getType()];
if (job.getType() == jtDEATH)
break;
sl.unlock();
job.doJob();
sl.lock();
}
--mThreadCount;
mJobCond.notify_all();
}

80
src/JobQueue.h Normal file
View File

@@ -0,0 +1,80 @@
#ifndef JOB_QUEUE__H
#define JOB_QUEUE__H
#include <map>
#include <set>
#include <vector>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/function.hpp>
#include "types.h"
enum JobType
{ // must be in priority order, low to high
jtINVALID,
jtVALIDATION_ut,
jtTRANSACTION,
jtPROPOSAL_ut,
jtVALIDATION_t,
jtPROPOSAL_t,
jtADMIN,
jtDEATH, // job of death, used internally
};
class Job
{
protected:
JobType mType;
uint64 mJobIndex;
boost::function<void(void)> mJob;
public:
Job() : mType(jtINVALID), mJobIndex(0) { ; }
Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; }
Job(JobType type, uint64 index, const boost::function<void(void)>& job)
: mType(type), mJobIndex(index), mJob(job) { ; }
JobType getType() const { return mType; }
void setIndex(uint64 i) { mJobIndex = i; }
void doJob(void) { mJob(); }
bool operator<(const Job& j) const;
bool operator>(const Job& j) const;
bool operator<=(const Job& j) const;
bool operator>=(const Job& j) const;
static const char* toString(JobType);
};
class JobQueue
{
protected:
boost::mutex mJobLock;
boost::condition_variable mJobCond;
uint64 mLastJob;
std::set<Job> mJobSet;
std::map<JobType, int> mJobCounts;
int mThreadCount;
bool mShuttingDown;
void threadEntry(void);
public:
JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; }
void addJob(JobType type, const boost::function<void(void)>& job);
int getJobCount(JobType t); // All jobs at or greater than this priority
std::vector< std::pair<JobType, int> > getJobCounts();
void shutdown();
void setThreadCount(int c);
};
#endif