From 4430798506263a545c04c172d19d9fbfc7e06b64 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 29 Oct 2012 16:06:59 -0700 Subject: [PATCH] A job queue that will allow us to have a configurable number of threads servicing arbitrary jobs in priority order, with an easy way to get counts of how many jobs are pending. --- src/JobQueue.cpp | 156 +++++++++++++++++++++++++++++++++++++++++++++++ src/JobQueue.h | 80 ++++++++++++++++++++++++ 2 files changed, 236 insertions(+) create mode 100644 src/JobQueue.cpp create mode 100644 src/JobQueue.h diff --git a/src/JobQueue.cpp b/src/JobQueue.cpp new file mode 100644 index 0000000000..8d534dd039 --- /dev/null +++ b/src/JobQueue.cpp @@ -0,0 +1,156 @@ +#include "JobQueue.h" + +#include +#include +#include +#include + +const char* Job::toString(JobType t) +{ + switch(t) + { + case jtINVALID: return "invalid"; + case jtVALIDATION_ut: return "untrustedValidation"; + case jtTRANSACTION: return "transaction"; + case jtPROPOSAL_ut: return "untrustedProposal"; + case jtVALIDATION_t: return "trustedValidation"; + case jtPROPOSAL_t: return "trustedProposal"; + case jtADMIN: return "administration"; + case jtDEATH: return "jobOfDeath"; + default: assert(false); return "unknown"; + } +} + +bool Job::operator<(const Job& j) const +{ + if (mType < j.mType) + return true; + if (mType > j.mType) + return false; + return mJobIndex < j.mJobIndex; +} + +bool Job::operator<=(const Job& j) const +{ + if (mType < j.mType) + return true; + if (mType > j.mType) + return false; + return mJobIndex <= j.mJobIndex; +} + +bool Job::operator>(const Job& j) const +{ + if (mType < j.mType) + return false; + if (mType > j.mType) + return true; + return mJobIndex > j.mJobIndex; +} + +bool Job::operator>=(const Job& j) const +{ + if (mType < j.mType) + return false; + if (mType > j.mType) + return true; + return mJobIndex >= j.mJobIndex; +} + +void JobQueue::addJob(JobType type, const boost::function& jobFunc) +{ + assert(type != jtINVALID); + + boost::mutex::scoped_lock sl(mJobLock); + + mJobSet.insert(Job(type, ++mLastJob, jobFunc)); + ++mJobCounts[type]; + mJobCond.notify_one(); +} + +int JobQueue::getJobCount(JobType t) +{ + int ret = 0; + + boost::mutex::scoped_lock sl(mJobLock); + + typedef std::pair jt_int_pair; + BOOST_FOREACH(const jt_int_pair& it, mJobCounts) + if (it.first >= t) + ret += it.second; + return ret; +} + +std::vector< std::pair > JobQueue::getJobCounts() +{ + std::vector< std::pair > ret; + + boost::mutex::scoped_lock sl(mJobLock); + ret.reserve(mJobCounts.size()); + + typedef std::pair jt_int_pair; + BOOST_FOREACH(const jt_int_pair& it, mJobCounts) + ret.push_back(it); + + return ret; +} + +void JobQueue::shutdown() +{ + boost::mutex::scoped_lock sl(mJobLock); + mShuttingDown = true; + mJobCond.notify_all(); + while (mThreadCount != 0) + mJobCond.wait(sl); +} + +void JobQueue::setThreadCount(int c) +{ + boost::mutex::scoped_lock sl(mJobLock); + while (mThreadCount != c) + { + if (mThreadCount < c) + { + ++mThreadCount; + boost::thread t(boost::bind(&JobQueue::threadEntry, this)); + t.detach(); + } + if (mThreadCount > c) + { + if (mJobCounts[jtDEATH] != 0) + mJobCond.wait(sl); + else + { + mJobSet.insert(Job(jtDEATH, 0)); + ++mJobCounts[jtDEATH]; + } + } + } +} + +void JobQueue::threadEntry() +{ + boost::mutex::scoped_lock sl(mJobLock); + while (!mShuttingDown) + { + while (mJobSet.empty() && !mShuttingDown) + mJobCond.wait(sl); + + if (mShuttingDown) + break; + + std::set::iterator it = mJobSet.begin(); + Job job(*it); + mJobSet.erase(it); + --mJobCounts[job.getType()]; + + if (job.getType() == jtDEATH) + break; + + sl.unlock(); + job.doJob(); + sl.lock(); + } + --mThreadCount; + mJobCond.notify_all(); +} diff --git a/src/JobQueue.h b/src/JobQueue.h new file mode 100644 index 0000000000..48211cfb45 --- /dev/null +++ b/src/JobQueue.h @@ -0,0 +1,80 @@ +#ifndef JOB_QUEUE__H +#define JOB_QUEUE__H + +#include +#include +#include + +#include +#include +#include + +#include "types.h" + +enum JobType +{ // must be in priority order, low to high + jtINVALID, + jtVALIDATION_ut, + jtTRANSACTION, + jtPROPOSAL_ut, + jtVALIDATION_t, + jtPROPOSAL_t, + jtADMIN, + jtDEATH, // job of death, used internally +}; + +class Job +{ +protected: + JobType mType; + uint64 mJobIndex; + boost::function mJob; + +public: + Job() : mType(jtINVALID), mJobIndex(0) { ; } + Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; } + + Job(JobType type, uint64 index, const boost::function& job) + : mType(type), mJobIndex(index), mJob(job) { ; } + + JobType getType() const { return mType; } + void setIndex(uint64 i) { mJobIndex = i; } + void doJob(void) { mJob(); } + + bool operator<(const Job& j) const; + bool operator>(const Job& j) const; + bool operator<=(const Job& j) const; + bool operator>=(const Job& j) const; + + static const char* toString(JobType); +}; + +class JobQueue +{ +protected: + boost::mutex mJobLock; + boost::condition_variable mJobCond; + + uint64 mLastJob; + std::set mJobSet; + std::map mJobCounts; + int mThreadCount; + bool mShuttingDown; + + + void threadEntry(void); + +public: + + JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; } + + void addJob(JobType type, const boost::function& job); + + int getJobCount(JobType t); // All jobs at or greater than this priority + std::vector< std::pair > getJobCounts(); + + void shutdown(); + void setThreadCount(int c); +}; + +#endif