mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-30 16:05:51 +00:00
A small change with big consequences. Allow job queue threads to moonlight as I/O threads.
This commit is contained in:
@@ -48,7 +48,7 @@ DatabaseCon::~DatabaseCon()
|
||||
Application::Application() :
|
||||
mIOWork(mIOService), mAuxWork(mAuxService), mUNL(mIOService), mNetOps(mIOService, &mLedgerMaster),
|
||||
mTempNodeCache("NodeCache", 16384, 90), mHashedObjectStore(16384, 300), mSLECache("LedgerEntryCache", 4096, 120),
|
||||
mSNTPClient(mAuxService), mFeeTrack(),
|
||||
mSNTPClient(mAuxService), mJobQueue(mIOService), mFeeTrack(),
|
||||
mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL),
|
||||
mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL),
|
||||
mSweepTimer(mAuxService), mShutdown(false)
|
||||
|
||||
@@ -6,10 +6,12 @@
|
||||
|
||||
#include "Log.h"
|
||||
#include "Config.h"
|
||||
#include "Application.h"
|
||||
|
||||
SETUP_LOG();
|
||||
|
||||
JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
|
||||
JobQueue::JobQueue(boost::asio::io_service& svc)
|
||||
: mLastJob(0), mThreadCount(0), mShuttingDown(false), mIOThreadCount(0), mMaxIOThreadCount(1), mIOService(svc)
|
||||
{
|
||||
mJobLoads[jtPUBOLDLEDGER].setTargetLatency(10000, 15000);
|
||||
mJobLoads[jtVALIDATION_ut].setTargetLatency(2000, 5000);
|
||||
@@ -240,6 +242,8 @@ void JobQueue::setThreadCount(int c)
|
||||
|
||||
boost::mutex::scoped_lock sl(mJobLock);
|
||||
|
||||
mMaxIOThreadCount = 1 + (c / 3);
|
||||
|
||||
while (mJobCounts[jtDEATH].first != 0)
|
||||
mJobCond.wait(sl);
|
||||
|
||||
@@ -261,6 +265,26 @@ void JobQueue::setThreadCount(int c)
|
||||
mJobCond.notify_one(); // in case we sucked up someone else's signal
|
||||
}
|
||||
|
||||
void JobQueue::IOThread(boost::mutex::scoped_lock& sl)
|
||||
{ // call with a lock
|
||||
++mIOThreadCount;
|
||||
sl.unlock();
|
||||
NameThread("IO+");
|
||||
try
|
||||
{
|
||||
do
|
||||
NameThread("IO+");
|
||||
while ((mIOService.poll_one() == 1) && !theApp->isShutdown());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
cLog(lsWARNING) << "Exception in IOThread";
|
||||
}
|
||||
NameThread("waiting");
|
||||
sl.lock();
|
||||
--mIOThreadCount;
|
||||
}
|
||||
|
||||
void JobQueue::threadEntry()
|
||||
{ // do jobs until asked to stop
|
||||
boost::mutex::scoped_lock sl(mJobLock);
|
||||
@@ -268,7 +292,12 @@ void JobQueue::threadEntry()
|
||||
{
|
||||
NameThread("waiting");
|
||||
while (mJobSet.empty() && !mShuttingDown)
|
||||
{
|
||||
if ((mIOThreadCount < mMaxIOThreadCount) && !theApp->isShutdown())
|
||||
IOThread(sl);
|
||||
else
|
||||
mJobCond.wait(sl);
|
||||
}
|
||||
|
||||
if (mShuttingDown)
|
||||
break;
|
||||
|
||||
@@ -8,6 +8,7 @@
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
#include <boost/make_shared.hpp>
|
||||
#include <boost/asio.hpp>
|
||||
#include <boost/ref.hpp>
|
||||
|
||||
#include "../json/value.h"
|
||||
@@ -93,14 +94,19 @@ protected:
|
||||
int mThreadCount;
|
||||
bool mShuttingDown;
|
||||
|
||||
int mIOThreadCount;
|
||||
int mMaxIOThreadCount;
|
||||
boost::asio::io_service& mIOService;
|
||||
|
||||
std::map<JobType, std::pair<int, int > > mJobCounts;
|
||||
|
||||
|
||||
void threadEntry(void);
|
||||
void threadEntry();
|
||||
void IOThread(boost::mutex::scoped_lock&);
|
||||
|
||||
public:
|
||||
|
||||
JobQueue();
|
||||
JobQueue(boost::asio::io_service&);
|
||||
|
||||
void addJob(JobType type, const std::string& name, const FUNCTION_TYPE<void(Job&)>& job);
|
||||
|
||||
|
||||
Reference in New Issue
Block a user