mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-25 05:25:55 +00:00
Start to hook this stuff up.
This commit is contained in:
@@ -68,7 +68,7 @@ void JobQueue::addJob(JobType type, const boost::function<void(Job&)>& jobFunc)
|
|||||||
boost::mutex::scoped_lock sl(mJobLock);
|
boost::mutex::scoped_lock sl(mJobLock);
|
||||||
assert(mThreadCount != 0); // do not add jobs to a queue with no threads
|
assert(mThreadCount != 0); // do not add jobs to a queue with no threads
|
||||||
|
|
||||||
mJobSet.insert(Job(type, ++mLastJob, jobFunc));
|
mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc));
|
||||||
++mJobCounts[type];
|
++mJobCounts[type];
|
||||||
mJobCond.notify_one();
|
mJobCond.notify_one();
|
||||||
}
|
}
|
||||||
@@ -108,6 +108,43 @@ std::vector< std::pair<JobType, int> > JobQueue::getJobCounts()
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Json::Value JobQueue::getJson(int)
|
||||||
|
{
|
||||||
|
Json::Value ret(Json::objectValue);
|
||||||
|
boost::mutex::scoped_lock sl(mJobLock);
|
||||||
|
|
||||||
|
ret["threads"] = mThreadCount;
|
||||||
|
|
||||||
|
Json::Value priorities = Json::arrayValue;
|
||||||
|
for (int i = 0; i < NUM_JOB_TYPES; ++i)
|
||||||
|
{
|
||||||
|
uint64 count, latencyAvg, latencyPeak, jobCount;
|
||||||
|
mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak);
|
||||||
|
std::map<JobType, int>::iterator it = mJobCounts.find(static_cast<JobType>(i));
|
||||||
|
if (it == mJobCounts.end())
|
||||||
|
jobCount = 0;
|
||||||
|
else
|
||||||
|
jobCount = it->second;
|
||||||
|
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0))
|
||||||
|
{
|
||||||
|
Json::Value pri(Json::objectValue);
|
||||||
|
pri["priority_level"] = Job::toString(static_cast<JobType>(i));
|
||||||
|
if (count != 0)
|
||||||
|
pri["waiting"] = static_cast<int>(jobCount);
|
||||||
|
if (jobCount != 0)
|
||||||
|
pri["per_second"] = static_cast<int>(count);
|
||||||
|
if (latencyPeak != 0)
|
||||||
|
pri["peak_latency"] = static_cast<int>(latencyPeak);
|
||||||
|
if (latencyAvg != 0)
|
||||||
|
pri["avg_latency"] = static_cast<int>(latencyAvg);
|
||||||
|
priorities.append(pri);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret["priorities"] = priorities;
|
||||||
|
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
void JobQueue::shutdown()
|
void JobQueue::shutdown()
|
||||||
{ // shut down the job queue without completing pending jobs
|
{ // shut down the job queue without completing pending jobs
|
||||||
cLog(lsINFO) << "Job queue shutting down";
|
cLog(lsINFO) << "Job queue shutting down";
|
||||||
|
|||||||
@@ -8,26 +8,31 @@
|
|||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
#include <boost/thread/condition_variable.hpp>
|
#include <boost/thread/condition_variable.hpp>
|
||||||
#include <boost/function.hpp>
|
#include <boost/function.hpp>
|
||||||
|
#include <boost/make_shared.hpp>
|
||||||
|
|
||||||
|
#include "../json/value.h"
|
||||||
|
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
|
#include "LoadMonitor.h"
|
||||||
|
|
||||||
// Note that this queue should only be used for CPU-bound jobs
|
// Note that this queue should only be used for CPU-bound jobs
|
||||||
// It is primarily intended for signature checking
|
// It is primarily intended for signature checking
|
||||||
|
|
||||||
enum JobType
|
enum JobType
|
||||||
{ // must be in priority order, low to high
|
{ // must be in priority order, low to high
|
||||||
jtINVALID,
|
jtINVALID = -1,
|
||||||
jtVALIDATION_ut, // A validation from an untrusted source
|
jtVALIDATION_ut = 0, // A validation from an untrusted source
|
||||||
jtCLIENTOP_ut, // A client operation from a non-local/untrusted source
|
jtCLIENTOP_ut = 1, // A client operation from a non-local/untrusted source
|
||||||
jtTRANSACTION, // A transaction received from the network
|
jtTRANSACTION = 2, // A transaction received from the network
|
||||||
jtPROPOSAL_ut, // A proposal from an untrusted source
|
jtPROPOSAL_ut = 3, // A proposal from an untrusted source
|
||||||
jtCLIENTOP_t, // A client operation from a trusted source
|
jtCLIENTOP_t = 4, // A client operation from a trusted source
|
||||||
jtVALIDATION_t, // A validation from a trusted source
|
jtVALIDATION_t = 5, // A validation from a trusted source
|
||||||
jtTRANSACTION_l, // A local transaction
|
jtTRANSACTION_l = 6, // A local transaction
|
||||||
jtPROPOSAL_t, // A proposal from a trusted source
|
jtPROPOSAL_t = 7, // A proposal from a trusted source
|
||||||
jtADMIN, // An administrative operation
|
jtADMIN = 8, // An administrative operation
|
||||||
jtDEATH, // job of death, used internally
|
jtDEATH = 9, // job of death, used internally
|
||||||
};
|
};
|
||||||
|
#define NUM_JOB_TYPES 10
|
||||||
|
|
||||||
class Job
|
class Job
|
||||||
{
|
{
|
||||||
@@ -35,13 +40,18 @@ protected:
|
|||||||
JobType mType;
|
JobType mType;
|
||||||
uint64 mJobIndex;
|
uint64 mJobIndex;
|
||||||
boost::function<void(Job&)> mJob;
|
boost::function<void(Job&)> mJob;
|
||||||
|
LoadEvent::pointer mLoadMonitor;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
Job() : mType(jtINVALID), mJobIndex(0) { ; }
|
|
||||||
Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; }
|
|
||||||
|
|
||||||
Job(JobType type, uint64 index, const boost::function<void(Job&)>& job)
|
Job() : mType(jtINVALID), mJobIndex(0) { ; }
|
||||||
: mType(type), mJobIndex(index), mJob(job) { ; }
|
|
||||||
|
Job(JobType type, uint64 index) : mType(type), mJobIndex(index)
|
||||||
|
{ ; }
|
||||||
|
|
||||||
|
Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function<void(Job&)>& job)
|
||||||
|
: mType(type), mJobIndex(index), mJob(job)
|
||||||
|
{ mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), true, 1); }
|
||||||
|
|
||||||
JobType getType() const { return mType; }
|
JobType getType() const { return mType; }
|
||||||
void doJob(void) { mJob(*this); }
|
void doJob(void) { mJob(*this); }
|
||||||
@@ -63,6 +73,7 @@ protected:
|
|||||||
uint64 mLastJob;
|
uint64 mLastJob;
|
||||||
std::set<Job> mJobSet;
|
std::set<Job> mJobSet;
|
||||||
std::map<JobType, int> mJobCounts;
|
std::map<JobType, int> mJobCounts;
|
||||||
|
LoadMonitor mJobLoads[NUM_JOB_TYPES];
|
||||||
int mThreadCount;
|
int mThreadCount;
|
||||||
bool mShuttingDown;
|
bool mShuttingDown;
|
||||||
|
|
||||||
@@ -81,6 +92,8 @@ public:
|
|||||||
|
|
||||||
void shutdown();
|
void shutdown();
|
||||||
void setThreadCount(int c = 0);
|
void setThreadCount(int c = 0);
|
||||||
|
|
||||||
|
Json::Value getJson(int c = 0);
|
||||||
};
|
};
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|||||||
@@ -11,7 +11,8 @@ void LoadMonitor::LoadMonitor::update()
|
|||||||
{ // way out of date
|
{ // way out of date
|
||||||
mCounts = 0;
|
mCounts = 0;
|
||||||
mLatencyEvents = 0;
|
mLatencyEvents = 0;
|
||||||
mLatencyMS = 0;
|
mLatencyMSAvg = 0;
|
||||||
|
mLatencyMSPeak = 0;
|
||||||
mLastUpdate = now;
|
mLastUpdate = now;
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -21,7 +22,8 @@ void LoadMonitor::LoadMonitor::update()
|
|||||||
++mLastUpdate;
|
++mLastUpdate;
|
||||||
mCounts -= (mCounts / 4);
|
mCounts -= (mCounts / 4);
|
||||||
mLatencyEvents -= (mLatencyEvents / 4);
|
mLatencyEvents -= (mLatencyEvents / 4);
|
||||||
mLatencyMS -= (mLatencyMS / 4);
|
mLatencyMSAvg -= (mLatencyMSAvg / 4);
|
||||||
|
mLatencyMSPeak -= (mLatencyMSPeak / 4);
|
||||||
} while (mLastUpdate < now);
|
} while (mLastUpdate < now);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -38,8 +40,14 @@ void LoadMonitor::addLatency(int latency)
|
|||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
update();
|
update();
|
||||||
|
|
||||||
++mLatencyEvents;
|
++mLatencyEvents;
|
||||||
mLatencyMS += latency;
|
mLatencyMSAvg += latency;
|
||||||
|
mLatencyMSPeak += latency;
|
||||||
|
|
||||||
|
int lp = mLatencyEvents * latency * 4;
|
||||||
|
if (mLatencyMSPeak < lp)
|
||||||
|
mLatencyMSPeak = lp;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LoadMonitor::addCountAndLatency(int counts, int latency)
|
void LoadMonitor::addCountAndLatency(int counts, int latency)
|
||||||
@@ -49,10 +57,15 @@ void LoadMonitor::addCountAndLatency(int counts, int latency)
|
|||||||
update();
|
update();
|
||||||
mCounts += counts;
|
mCounts += counts;
|
||||||
++mLatencyEvents;
|
++mLatencyEvents;
|
||||||
mLatencyMS += latency;
|
mLatencyMSAvg += latency;
|
||||||
|
mLatencyMSPeak += latency;
|
||||||
|
|
||||||
|
int lp = mLatencyEvents * latency * 4;
|
||||||
|
if (mLatencyMSPeak < lp)
|
||||||
|
mLatencyMSPeak = lp;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency)
|
void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
@@ -61,6 +74,13 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency)
|
|||||||
count = mCounts / 4;
|
count = mCounts / 4;
|
||||||
|
|
||||||
if (mLatencyEvents == 0)
|
if (mLatencyEvents == 0)
|
||||||
latency = 0;
|
{
|
||||||
else latency = mLatencyMS / (mLatencyEvents * 4);
|
latencyAvg = 0;
|
||||||
|
latencyPeak = 0;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
|
||||||
|
latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@
|
|||||||
#include <string>
|
#include <string>
|
||||||
|
|
||||||
#include <boost/thread/mutex.hpp>
|
#include <boost/thread/mutex.hpp>
|
||||||
|
#include <boost/shared_ptr.hpp>
|
||||||
|
|
||||||
#include "types.h"
|
#include "types.h"
|
||||||
|
|
||||||
@@ -12,32 +13,31 @@
|
|||||||
class LoadMonitor
|
class LoadMonitor
|
||||||
{
|
{
|
||||||
protected:
|
protected:
|
||||||
std::string mName;
|
|
||||||
uint64 mCounts;
|
uint64 mCounts;
|
||||||
uint64 mLatencyEvents;
|
uint64 mLatencyEvents;
|
||||||
uint64 mLatencyMS;
|
uint64 mLatencyMSAvg;
|
||||||
|
uint64 mLatencyMSPeak;
|
||||||
time_t mLastUpdate;
|
time_t mLastUpdate;
|
||||||
boost::mutex mLock;
|
boost::mutex mLock;
|
||||||
|
|
||||||
void update();
|
void update();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
LoadMonitor(const std::string& n) : mName(n), mCounts(0), mLatencyEvents(0), mLatencyMS(0)
|
LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0)
|
||||||
{ mLastUpdate = time(NULL); }
|
{ mLastUpdate = time(NULL); }
|
||||||
|
|
||||||
void setName(const std::string& n) { mName = n; }
|
|
||||||
|
|
||||||
const std::string& getName() const { return mName; }
|
|
||||||
|
|
||||||
void addCount(int counts);
|
void addCount(int counts);
|
||||||
void addLatency(int latency);
|
void addLatency(int latency);
|
||||||
void addCountAndLatency(int counts, int latency);
|
void addCountAndLatency(int counts, int latency);
|
||||||
|
|
||||||
void getCountAndLatency(uint64& count, uint64& latency);
|
void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak);
|
||||||
};
|
};
|
||||||
|
|
||||||
class LoadEvent
|
class LoadEvent
|
||||||
{
|
{
|
||||||
|
public:
|
||||||
|
typedef boost::shared_ptr<LoadEvent> pointer;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
LoadMonitor& mMonitor;
|
LoadMonitor& mMonitor;
|
||||||
bool mRunning;
|
bool mRunning;
|
||||||
|
|||||||
@@ -957,22 +957,7 @@ Json::Value NetworkOPs::getServerInfo()
|
|||||||
if (mConsensus)
|
if (mConsensus)
|
||||||
info["consensus"] = mConsensus->getJson();
|
info["consensus"] = mConsensus->getJson();
|
||||||
|
|
||||||
typedef std::pair<JobType, int> jt_int_pair;
|
info["jobs"] = theApp->getJobQueue().getJson();
|
||||||
bool anyJobs = false;
|
|
||||||
Json::Value jobs = Json::arrayValue;
|
|
||||||
std::vector< std::pair<JobType, int> > jobCounts = theApp->getJobQueue().getJobCounts();
|
|
||||||
BOOST_FOREACH(jt_int_pair& it, jobCounts)
|
|
||||||
{
|
|
||||||
if (it.second != 0)
|
|
||||||
{
|
|
||||||
Json::Value o = Json::objectValue;
|
|
||||||
o[Job::toString(it.first)] = it.second;
|
|
||||||
jobs.append(o);
|
|
||||||
anyJobs = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (anyJobs)
|
|
||||||
info["jobs"] = jobs;
|
|
||||||
|
|
||||||
return info;
|
return info;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user