Dispatch HashedObject background writes through our job queue.

This commit is contained in:
JoelKatz
2013-01-28 07:13:05 -08:00
parent e9d405302c
commit e48ef29f8c
3 changed files with 9 additions and 7 deletions

View File

@@ -51,7 +51,7 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
if (!mWritePending) if (!mWritePending)
{ {
mWritePending = true; mWritePending = true;
boost::thread(boost::bind(&HashedObjectStore::bulkWrite, this)).detach(); theApp->getJobQueue().addJob(jtWRITE, boost::bind(&HashedObjectStore::bulkWrite, this));
} }
} }
// else // else
@@ -70,7 +70,6 @@ void HashedObjectStore::waitWrite()
void HashedObjectStore::bulkWrite() void HashedObjectStore::bulkWrite()
{ {
LoadEvent::autoptr event(theApp->getJobQueue().getLoadEventAP(jtDISK));
while (1) while (1)
{ {
std::vector< boost::shared_ptr<HashedObject> > set; std::vector< boost::shared_ptr<HashedObject> > set;

View File

@@ -17,6 +17,7 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250);
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500);
mJobLoads[jtWRITE].setTargetLatency(750, 1500);
mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500);
mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500); mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500);
@@ -40,6 +41,7 @@ const char* Job::toString(JobType t)
case jtTRANSACTION: return "transaction"; case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishLedger"; case jtPUBLEDGER: return "publishLedger";
case jtVALIDATION_t: return "trustedValidation"; case jtVALIDATION_t: return "trustedValidation";
case jtWRITE: return "writeObjects";
case jtTRANSACTION_l: return "localTransaction"; case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_t: return "trustedProposal"; case jtPROPOSAL_t: return "trustedProposal";
case jtADMIN: return "administration"; case jtADMIN: return "administration";
@@ -196,7 +198,7 @@ void JobQueue::setThreadCount(int c)
{ {
c = boost::thread::hardware_concurrency(); c = boost::thread::hardware_concurrency();
if (c < 0) if (c < 0)
c = 0; c = 2;
c += 2; c += 2;
cLog(lsINFO) << "Auto-tuning to " << c << " validation/transaction/proposal threads"; cLog(lsINFO) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
} }

View File

@@ -28,10 +28,11 @@ enum JobType
jtTRANSACTION = 5, // A transaction received from the network jtTRANSACTION = 5, // A transaction received from the network
jtPUBLEDGER = 6, // Publish a fully-accepted ledger jtPUBLEDGER = 6, // Publish a fully-accepted ledger
jtVALIDATION_t = 7, // A validation from a trusted source jtVALIDATION_t = 7, // A validation from a trusted source
jtTRANSACTION_l = 8, // A local transaction jtWRITE = 8, // Write out hashed objects
jtPROPOSAL_t = 9, // A proposal from a trusted source jtTRANSACTION_l = 9, // A local transaction
jtADMIN = 10, // An administrative operation jtPROPOSAL_t = 10, // A proposal from a trusted source
jtDEATH = 11, // job of death, used internally jtADMIN = 11, // An administrative operation
jtDEATH = 12, // job of death, used internally
// special types not dispatched by the job pool // special types not dispatched by the job pool
jtPEER = 17, jtPEER = 17,