diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index cbb2bee83f..cdf126e234 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -68,7 +68,7 @@ void JobQueue::addJob(JobType type, const boost::function& jobFunc) boost::mutex::scoped_lock sl(mJobLock); assert(mThreadCount != 0); // do not add jobs to a queue with no threads - mJobSet.insert(Job(type, ++mLastJob, jobFunc)); + mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc)); ++mJobCounts[type]; mJobCond.notify_one(); } @@ -108,6 +108,43 @@ std::vector< std::pair > JobQueue::getJobCounts() return ret; } +Json::Value JobQueue::getJson(int) +{ + Json::Value ret(Json::objectValue); + boost::mutex::scoped_lock sl(mJobLock); + + ret["threads"] = mThreadCount; + + Json::Value priorities = Json::arrayValue; + for (int i = 0; i < NUM_JOB_TYPES; ++i) + { + uint64 count, latencyAvg, latencyPeak, jobCount; + mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak); + std::map::iterator it = mJobCounts.find(static_cast(i)); + if (it == mJobCounts.end()) + jobCount = 0; + else + jobCount = it->second; + if ((count != 0) || (jobCount != 0) || (latencyPeak != 0)) + { + Json::Value pri(Json::objectValue); + pri["priority_level"] = Job::toString(static_cast(i)); + if (count != 0) + pri["waiting"] = static_cast(jobCount); + if (jobCount != 0) + pri["per_second"] = static_cast(count); + if (latencyPeak != 0) + pri["peak_latency"] = static_cast(latencyPeak); + if (latencyAvg != 0) + pri["avg_latency"] = static_cast(latencyAvg); + priorities.append(pri); + } + } + ret["priorities"] = priorities; + + return ret; +} + void JobQueue::shutdown() { // shut down the job queue without completing pending jobs cLog(lsINFO) << "Job queue shutting down"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index bdd23bc690..060c9f1da8 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -8,26 +8,31 @@ #include #include #include +#include + +#include "../json/value.h" #include "types.h" +#include "LoadMonitor.h" // Note that this queue should only be used for CPU-bound jobs // It is primarily intended for signature checking enum JobType { // must be in priority order, low to high - jtINVALID, - jtVALIDATION_ut, // A validation from an untrusted source - jtCLIENTOP_ut, // A client operation from a non-local/untrusted source - jtTRANSACTION, // A transaction received from the network - jtPROPOSAL_ut, // A proposal from an untrusted source - jtCLIENTOP_t, // A client operation from a trusted source - jtVALIDATION_t, // A validation from a trusted source - jtTRANSACTION_l, // A local transaction - jtPROPOSAL_t, // A proposal from a trusted source - jtADMIN, // An administrative operation - jtDEATH, // job of death, used internally + jtINVALID = -1, + jtVALIDATION_ut = 0, // A validation from an untrusted source + jtCLIENTOP_ut = 1, // A client operation from a non-local/untrusted source + jtTRANSACTION = 2, // A transaction received from the network + jtPROPOSAL_ut = 3, // A proposal from an untrusted source + jtCLIENTOP_t = 4, // A client operation from a trusted source + jtVALIDATION_t = 5, // A validation from a trusted source + jtTRANSACTION_l = 6, // A local transaction + jtPROPOSAL_t = 7, // A proposal from a trusted source + jtADMIN = 8, // An administrative operation + jtDEATH = 9, // job of death, used internally }; +#define NUM_JOB_TYPES 10 class Job { @@ -35,13 +40,18 @@ protected: JobType mType; uint64 mJobIndex; boost::function mJob; + LoadEvent::pointer mLoadMonitor; public: - Job() : mType(jtINVALID), mJobIndex(0) { ; } - Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; } - Job(JobType type, uint64 index, const boost::function& job) - : mType(type), mJobIndex(index), mJob(job) { ; } + Job() : mType(jtINVALID), mJobIndex(0) { ; } + + Job(JobType type, uint64 index) : mType(type), mJobIndex(index) + { ; } + + Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function& job) + : mType(type), mJobIndex(index), mJob(job) + { mLoadMonitor = boost::make_shared(boost::ref(lm), true, 1); } JobType getType() const { return mType; } void doJob(void) { mJob(*this); } @@ -57,14 +67,15 @@ public: class JobQueue { protected: - boost::mutex mJobLock; - boost::condition_variable mJobCond; + boost::mutex mJobLock; + boost::condition_variable mJobCond; - uint64 mLastJob; - std::set mJobSet; - std::map mJobCounts; - int mThreadCount; - bool mShuttingDown; + uint64 mLastJob; + std::set mJobSet; + std::map mJobCounts; + LoadMonitor mJobLoads[NUM_JOB_TYPES]; + int mThreadCount; + bool mShuttingDown; void threadEntry(void); @@ -81,6 +92,8 @@ public: void shutdown(); void setThreadCount(int c = 0); + + Json::Value getJson(int c = 0); }; #endif diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp index 76e23ccf43..55470eab73 100644 --- a/src/cpp/ripple/LoadMonitor.cpp +++ b/src/cpp/ripple/LoadMonitor.cpp @@ -11,7 +11,8 @@ void LoadMonitor::LoadMonitor::update() { // way out of date mCounts = 0; mLatencyEvents = 0; - mLatencyMS = 0; + mLatencyMSAvg = 0; + mLatencyMSPeak = 0; mLastUpdate = now; return; } @@ -21,7 +22,8 @@ void LoadMonitor::LoadMonitor::update() ++mLastUpdate; mCounts -= (mCounts / 4); mLatencyEvents -= (mLatencyEvents / 4); - mLatencyMS -= (mLatencyMS / 4); + mLatencyMSAvg -= (mLatencyMSAvg / 4); + mLatencyMSPeak -= (mLatencyMSPeak / 4); } while (mLastUpdate < now); } @@ -38,8 +40,14 @@ void LoadMonitor::addLatency(int latency) boost::mutex::scoped_lock sl(mLock); update(); + ++mLatencyEvents; - mLatencyMS += latency; + mLatencyMSAvg += latency; + mLatencyMSPeak += latency; + + int lp = mLatencyEvents * latency * 4; + if (mLatencyMSPeak < lp) + mLatencyMSPeak = lp; } void LoadMonitor::addCountAndLatency(int counts, int latency) @@ -49,10 +57,15 @@ void LoadMonitor::addCountAndLatency(int counts, int latency) update(); mCounts += counts; ++mLatencyEvents; - mLatencyMS += latency; + mLatencyMSAvg += latency; + mLatencyMSPeak += latency; + + int lp = mLatencyEvents * latency * 4; + if (mLatencyMSPeak < lp) + mLatencyMSPeak = lp; } -void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency) +void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak) { boost::mutex::scoped_lock sl(mLock); @@ -61,6 +74,13 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency) count = mCounts / 4; if (mLatencyEvents == 0) - latency = 0; - else latency = mLatencyMS / (mLatencyEvents * 4); + { + latencyAvg = 0; + latencyPeak = 0; + } + else + { + latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4); + latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4); + } } diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h index 17d7cf8a52..08c2afbf21 100644 --- a/src/cpp/ripple/LoadMonitor.h +++ b/src/cpp/ripple/LoadMonitor.h @@ -4,6 +4,7 @@ #include #include +#include #include "types.h" @@ -12,32 +13,31 @@ class LoadMonitor { protected: - std::string mName; uint64 mCounts; uint64 mLatencyEvents; - uint64 mLatencyMS; + uint64 mLatencyMSAvg; + uint64 mLatencyMSPeak; time_t mLastUpdate; boost::mutex mLock; void update(); public: - LoadMonitor(const std::string& n) : mName(n), mCounts(0), mLatencyEvents(0), mLatencyMS(0) + LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0) { mLastUpdate = time(NULL); } - void setName(const std::string& n) { mName = n; } - - const std::string& getName() const { return mName; } - void addCount(int counts); void addLatency(int latency); void addCountAndLatency(int counts, int latency); - void getCountAndLatency(uint64& count, uint64& latency); + void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak); }; class LoadEvent { +public: + typedef boost::shared_ptr pointer; + protected: LoadMonitor& mMonitor; bool mRunning; diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 89a5d77e76..546e759094 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -957,22 +957,7 @@ Json::Value NetworkOPs::getServerInfo() if (mConsensus) info["consensus"] = mConsensus->getJson(); - typedef std::pair jt_int_pair; - bool anyJobs = false; - Json::Value jobs = Json::arrayValue; - std::vector< std::pair > jobCounts = theApp->getJobQueue().getJobCounts(); - BOOST_FOREACH(jt_int_pair& it, jobCounts) - { - if (it.second != 0) - { - Json::Value o = Json::objectValue; - o[Job::toString(it.first)] = it.second; - jobs.append(o); - anyJobs = true; - } - } - if (anyJobs) - info["jobs"] = jobs; + info["jobs"] = theApp->getJobQueue().getJson(); return info; }