diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 42f0aa209..44ed3bcee 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -8,6 +8,25 @@ SETUP_LOG(); +JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) +{ + mJobLoads[jtVALIDATION_ut].setTargetLatency(2000, 5000); + mJobLoads[jtPROOFWORK].setTargetLatency(2000, 5000); + mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000); + mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250); + mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500); + mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500); + mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500); + + mJobLoads[jtCLIENT].setTargetLatency(250, 1000); + mJobLoads[jtPEER].setTargetLatency(200, 1250); + mJobLoads[jtDISK].setTargetLatency(500, 1000); + mJobLoads[jtRPC].setTargetLatency(250, 750); + mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500); + mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500); +} + + const char* Job::toString(JobType t) { switch(t) @@ -124,7 +143,8 @@ Json::Value JobQueue::getJson(int) for (int i = 0; i < NUM_JOB_TYPES; ++i) { uint64 count, latencyAvg, latencyPeak, jobCount; - mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak); + bool isOver; + mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak, isOver); std::map::iterator it = mJobCounts.find(static_cast(i)); if (it == mJobCounts.end()) jobCount = 0; @@ -133,6 +153,8 @@ Json::Value JobQueue::getJson(int) if ((count != 0) || (jobCount != 0) || (latencyPeak != 0)) { Json::Value pri(Json::objectValue); + if (isOver) + pri["over_target"] = "true"; pri["job_type"] = Job::toString(static_cast(i)); if (jobCount != 0) pri["waiting"] = static_cast(jobCount); diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 6dc495da7..884b9bf7b 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -22,11 +22,9 @@ enum JobType { // must be in priority order, low to high jtINVALID = -1, jtVALIDATION_ut = 0, // A validation from an untrusted source - jtCLIENTOP_ut = 1, // A client operation from a non-local/untrusted source jtPROOFWORK = 2, // A proof of work demand from another server jtTRANSACTION = 3, // A transaction received from the network jtPROPOSAL_ut = 4, // A proposal from an untrusted source - jtCLIENTOP_t = 5, // A client operation from a trusted source jtVALIDATION_t = 6, // A validation from a trusted source jtTRANSACTION_l = 7, // A local transaction jtPROPOSAL_t = 8, // A proposal from a trusted source @@ -40,6 +38,7 @@ enum JobType jtRPC = 19, jtACCEPTLEDGER = 20, jtPUBLEDGER = 21, + jtTXN_PROC = 22, }; #define NUM_JOB_TYPES 24 @@ -91,7 +90,7 @@ protected: public: - JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; } + JobQueue(); void addJob(JobType type, const boost::function& job); diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp index 2011035ae..cffb6fa8d 100644 --- a/src/cpp/ripple/LoadMonitor.cpp +++ b/src/cpp/ripple/LoadMonitor.cpp @@ -69,7 +69,7 @@ void LoadMonitor::addCountAndLatency(int counts, int latency) mLatencyMSPeak = lp; } -void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak) +void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver) { boost::mutex::scoped_lock sl(mLock); @@ -87,4 +87,5 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4); latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4); } + isOver = isOverTarget(); } diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h index 6eb259a79..2f4219462 100644 --- a/src/cpp/ripple/LoadMonitor.h +++ b/src/cpp/ripple/LoadMonitor.h @@ -17,20 +17,35 @@ protected: uint64 mLatencyEvents; uint64 mLatencyMSAvg; uint64 mLatencyMSPeak; + uint64 mTargetLatencyAvg; + uint64 mTargetLatencyPk; time_t mLastUpdate; boost::mutex mLock; void update(); public: - LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0) + LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0), + mTargetLatencyAvg(0), mTargetLatencyPk(0) { mLastUpdate = time(NULL); } void addCount(int counts); void addLatency(int latency); void addCountAndLatency(int counts, int latency); - void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak); + void setTargetLatency(uint64 avg, uint64 pk) + { + mTargetLatencyAvg = avg; + mTargetLatencyPk = pk; + } + + bool isOverTarget() + { + return (mTargetLatencyPk && (mLatencyMSPeak > mTargetLatencyPk)) || + (mTargetLatencyAvg && (mLatencyMSAvg > mTargetLatencyAvg)); + } + + void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver); }; class LoadEvent diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 9a39d3126..b78eae467 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -167,6 +167,7 @@ Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointe Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback) { + LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC); int newFlags = theApp->getSuppression().getFlags(trans->getID()); if ((newFlags & SF_BAD) != 0)