diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index f62cb0632d..79e179d8b6 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -48,7 +48,7 @@ DatabaseCon::~DatabaseCon() Application::Application() : mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), mNetOps(mIOService, &mLedgerMaster), mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSLECache("LedgerEntryCache", 4096, 120), - mSNTPClient(mAuxService), mFeeTrack(), + mSNTPClient(mAuxService), mJobQueue(mIOService), mFeeTrack(), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), mSweepTimer(mAuxService), mShutdown(false) diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 303035fe44..d169a9ddad 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -6,10 +6,12 @@ #include "Log.h" #include "Config.h" +#include "Application.h" SETUP_LOG(); -JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) +JobQueue::JobQueue(boost::asio::io_service& svc) + : mLastJob(0), mThreadCount(0), mShuttingDown(false), mIOThreadCount(0), mMaxIOThreadCount(1), mIOService(svc) { mJobLoads[jtPUBOLDLEDGER].setTargetLatency(10000, 15000); mJobLoads[jtVALIDATION_ut].setTargetLatency(2000, 5000); @@ -240,6 +242,8 @@ void JobQueue::setThreadCount(int c) boost::mutex::scoped_lock sl(mJobLock); + mMaxIOThreadCount = 1 + (c / 3); + while (mJobCounts[jtDEATH].first != 0) mJobCond.wait(sl); @@ -261,6 +265,26 @@ void JobQueue::setThreadCount(int c) mJobCond.notify_one(); // in case we sucked up someone else's signal } +void JobQueue::IOThread(boost::mutex::scoped_lock& sl) +{ // call with a lock + ++mIOThreadCount; + sl.unlock(); + NameThread("IO+"); + try + { + do + NameThread("IO+"); + while ((mIOService.poll_one() == 1) && !theApp->isShutdown()); + } + catch (...) + { + cLog(lsWARNING) << "Exception in IOThread"; + } + NameThread("waiting"); + sl.lock(); + --mIOThreadCount; +} + void JobQueue::threadEntry() { // do jobs until asked to stop boost::mutex::scoped_lock sl(mJobLock); @@ -268,7 +292,12 @@ void JobQueue::threadEntry() { NameThread("waiting"); while (mJobSet.empty() && !mShuttingDown) - mJobCond.wait(sl); + { + if ((mIOThreadCount < mMaxIOThreadCount) && !theApp->isShutdown()) + IOThread(sl); + else + mJobCond.wait(sl); + } if (mShuttingDown) break; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 705744db66..62e058c009 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -8,6 +8,7 @@ #include #include #include +#include #include #include "../json/value.h" @@ -93,14 +94,19 @@ protected: int mThreadCount; bool mShuttingDown; + int mIOThreadCount; + int mMaxIOThreadCount; + boost::asio::io_service& mIOService; + std::map > mJobCounts; - void threadEntry(void); + void threadEntry(); + void IOThread(boost::mutex::scoped_lock&); public: - JobQueue(); + JobQueue(boost::asio::io_service&); void addJob(JobType type, const std::string& name, const FUNCTION_TYPE& job);