Tie in the new job queue code.

This commit is contained in:
JoelKatz
2012-10-29 18:09:55 -07:00
parent 8fc8553fc6
commit 9085efd026
4 changed files with 28 additions and 8 deletions

View File

@@ -46,6 +46,7 @@ Application::Application() :
{
RAND_bytes(mNonce256.begin(), mNonce256.size());
RAND_bytes(reinterpret_cast<unsigned char *>(&mNonceST), sizeof(mNonceST));
mJobQueue.setThreadCount();
}
extern const char *RpcDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[];
@@ -54,6 +55,7 @@ extern int RpcDBCount, TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount
void Application::stop()
{
mIOService.stop();
mJobQueue.shutdown();
mHashedObjectStore.bulkWrite();
mValidations.flush();
mAuxService.stop();

View File

@@ -18,6 +18,7 @@
#include "Suppression.h"
#include "SNTPClient.h"
#include "../database/database.h"
#include "JobQueue.h"
class RPCDoor;
@@ -53,6 +54,7 @@ class Application
SuppressionTable mSuppressions;
HashedObjectStore mHashedObjectStore;
SNTPClient mSNTPClient;
JobQueue mJobQueue;
DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB;
@@ -90,6 +92,7 @@ public:
NodeCache& getTempNodeCache() { return mTempNodeCache; }
HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; }
ValidationCollection& getValidations() { return mValidations; }
JobQueue& getJobQueue() { return mJobQueue; }
bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); }
bool isNew(const uint160& s) { return mSuppressions.addSuppression(s); }
bool running() { return mTxnDB != NULL; }

View File

@@ -5,6 +5,10 @@
#include <boost/bind.hpp>
#include <boost/thread.hpp>
#include "Log.h"
SETUP_LOG();
const char* Job::toString(JobType t)
{
switch(t)
@@ -57,7 +61,7 @@ bool Job::operator>=(const Job& j) const
return mJobIndex >= j.mJobIndex;
}
void JobQueue::addJob(JobType type, const boost::function<void(void)>& jobFunc)
void JobQueue::addJob(JobType type, const boost::function<void(Job&)>& jobFunc)
{
assert(type != jtINVALID);
@@ -106,6 +110,7 @@ std::vector< std::pair<JobType, int> > JobQueue::getJobCounts()
void JobQueue::shutdown()
{ // shut down the job queue without completing pending jobs
cLog(lsINFO) << "Job queue shutting down";
boost::mutex::scoped_lock sl(mJobLock);
mShuttingDown = true;
mJobCond.notify_all();
@@ -115,7 +120,14 @@ void JobQueue::shutdown()
void JobQueue::setThreadCount(int c)
{ // set the number of thread serving the job queue to precisely this number
assert(c != 0);
if (c == 0)
{
c = boost::thread::hardware_concurrency();
if (c < 2)
c = 2;
cLog(lsINFO) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
}
boost::mutex::scoped_lock sl(mJobLock);
while (mJobCounts[jtDEATH] != 0)
@@ -160,6 +172,7 @@ void JobQueue::threadEntry()
break;
sl.unlock();
cLog(lsDEBUG) << "Doing " << Job::toString(job.getType()) << " job";
job.doJob();
sl.lock();
}

View File

@@ -11,6 +11,9 @@
#include "types.h"
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{ // must be in priority order, low to high
jtINVALID,
@@ -28,18 +31,17 @@ class Job
protected:
JobType mType;
uint64 mJobIndex;
boost::function<void(void)> mJob;
boost::function<void(Job&)> mJob;
public:
Job() : mType(jtINVALID), mJobIndex(0) { ; }
Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; }
Job(JobType type, uint64 index, const boost::function<void(void)>& job)
Job(JobType type, uint64 index, const boost::function<void(Job&)>& job)
: mType(type), mJobIndex(index), mJob(job) { ; }
JobType getType() const { return mType; }
void setIndex(uint64 i) { mJobIndex = i; }
void doJob(void) { mJob(); }
void doJob(void) { mJob(*this); }
bool operator<(const Job& j) const;
bool operator>(const Job& j) const;
@@ -68,14 +70,14 @@ public:
JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; }
void addJob(JobType type, const boost::function<void(void)>& job);
void addJob(JobType type, const boost::function<void(Job&)>& job);
int getJobCount(JobType t); // Jobs at this priority
int getJobCountGE(JobType t); // All jobs at or greater than this priority
std::vector< std::pair<JobType, int> > getJobCounts();
void shutdown();
void setThreadCount(int c);
void setThreadCount(int c = 0);
};
#endif