From 9085efd0268f4e2efdbe8be8099c95e1ecea03a2 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Mon, 29 Oct 2012 18:09:55 -0700 Subject: [PATCH] Tie in the new job queue code. --- src/Application.cpp | 2 ++ src/Application.h | 3 +++ src/JobQueue.cpp | 17 +++++++++++++++-- src/JobQueue.h | 14 ++++++++------ 4 files changed, 28 insertions(+), 8 deletions(-) diff --git a/src/Application.cpp b/src/Application.cpp index b898cbdcc..7c459fc12 100644 --- a/src/Application.cpp +++ b/src/Application.cpp @@ -46,6 +46,7 @@ Application::Application() : { RAND_bytes(mNonce256.begin(), mNonce256.size()); RAND_bytes(reinterpret_cast(&mNonceST), sizeof(mNonceST)); + mJobQueue.setThreadCount(); } extern const char *RpcDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[]; @@ -54,6 +55,7 @@ extern int RpcDBCount, TxnDBCount, LedgerDBCount, WalletDBCount, HashNodeDBCount void Application::stop() { mIOService.stop(); + mJobQueue.shutdown(); mHashedObjectStore.bulkWrite(); mValidations.flush(); mAuxService.stop(); diff --git a/src/Application.h b/src/Application.h index 9a37333fb..ebb078492 100644 --- a/src/Application.h +++ b/src/Application.h @@ -18,6 +18,7 @@ #include "Suppression.h" #include "SNTPClient.h" #include "../database/database.h" +#include "JobQueue.h" class RPCDoor; @@ -53,6 +54,7 @@ class Application SuppressionTable mSuppressions; HashedObjectStore mHashedObjectStore; SNTPClient mSNTPClient; + JobQueue mJobQueue; DatabaseCon *mRpcDB, *mTxnDB, *mLedgerDB, *mWalletDB, *mHashNodeDB, *mNetNodeDB; @@ -90,6 +92,7 @@ public: NodeCache& getTempNodeCache() { return mTempNodeCache; } HashedObjectStore& getHashedObjectStore() { return mHashedObjectStore; } ValidationCollection& getValidations() { return mValidations; } + JobQueue& getJobQueue() { return mJobQueue; } bool isNew(const uint256& s) { return mSuppressions.addSuppression(s); } bool isNew(const uint160& s) { return mSuppressions.addSuppression(s); } bool running() { return mTxnDB != NULL; } diff --git a/src/JobQueue.cpp b/src/JobQueue.cpp index 7cf6456b0..cbb2bee83 100644 --- a/src/JobQueue.cpp +++ b/src/JobQueue.cpp @@ -5,6 +5,10 @@ #include #include +#include "Log.h" + +SETUP_LOG(); + const char* Job::toString(JobType t) { switch(t) @@ -57,7 +61,7 @@ bool Job::operator>=(const Job& j) const return mJobIndex >= j.mJobIndex; } -void JobQueue::addJob(JobType type, const boost::function& jobFunc) +void JobQueue::addJob(JobType type, const boost::function& jobFunc) { assert(type != jtINVALID); @@ -106,6 +110,7 @@ std::vector< std::pair > JobQueue::getJobCounts() void JobQueue::shutdown() { // shut down the job queue without completing pending jobs + cLog(lsINFO) << "Job queue shutting down"; boost::mutex::scoped_lock sl(mJobLock); mShuttingDown = true; mJobCond.notify_all(); @@ -115,7 +120,14 @@ void JobQueue::shutdown() void JobQueue::setThreadCount(int c) { // set the number of thread serving the job queue to precisely this number - assert(c != 0); + if (c == 0) + { + c = boost::thread::hardware_concurrency(); + if (c < 2) + c = 2; + cLog(lsINFO) << "Auto-tuning to " << c << " validation/transaction/proposal threads"; + } + boost::mutex::scoped_lock sl(mJobLock); while (mJobCounts[jtDEATH] != 0) @@ -160,6 +172,7 @@ void JobQueue::threadEntry() break; sl.unlock(); + cLog(lsDEBUG) << "Doing " << Job::toString(job.getType()) << " job"; job.doJob(); sl.lock(); } diff --git a/src/JobQueue.h b/src/JobQueue.h index 866b149d4..ddcfa1017 100644 --- a/src/JobQueue.h +++ b/src/JobQueue.h @@ -11,6 +11,9 @@ #include "types.h" +// Note that this queue should only be used for CPU-bound jobs +// It is primarily intended for signature checking + enum JobType { // must be in priority order, low to high jtINVALID, @@ -28,18 +31,17 @@ class Job protected: JobType mType; uint64 mJobIndex; - boost::function mJob; + boost::function mJob; public: Job() : mType(jtINVALID), mJobIndex(0) { ; } Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; } - Job(JobType type, uint64 index, const boost::function& job) + Job(JobType type, uint64 index, const boost::function& job) : mType(type), mJobIndex(index), mJob(job) { ; } JobType getType() const { return mType; } - void setIndex(uint64 i) { mJobIndex = i; } - void doJob(void) { mJob(); } + void doJob(void) { mJob(*this); } bool operator<(const Job& j) const; bool operator>(const Job& j) const; @@ -68,14 +70,14 @@ public: JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; } - void addJob(JobType type, const boost::function& job); + void addJob(JobType type, const boost::function& job); int getJobCount(JobType t); // Jobs at this priority int getJobCountGE(JobType t); // All jobs at or greater than this priority std::vector< std::pair > getJobCounts(); void shutdown(); - void setThreadCount(int c); + void setThreadCount(int c = 0); }; #endif