mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Latency targets.
This commit is contained in:
@@ -8,6 +8,25 @@
|
|||||||
|
|
||||||
SETUP_LOG();
|
SETUP_LOG();
|
||||||
|
|
||||||
|
JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
|
||||||
|
{
|
||||||
|
mJobLoads[jtVALIDATION_ut].setTargetLatency(2000, 5000);
|
||||||
|
mJobLoads[jtPROOFWORK].setTargetLatency(2000, 5000);
|
||||||
|
mJobLoads[jtTRANSACTION].setTargetLatency(250, 1000);
|
||||||
|
mJobLoads[jtPROPOSAL_ut].setTargetLatency(500, 1250);
|
||||||
|
mJobLoads[jtVALIDATION_t].setTargetLatency(500, 1500);
|
||||||
|
mJobLoads[jtTRANSACTION_l].setTargetLatency(100, 500);
|
||||||
|
mJobLoads[jtPROPOSAL_t].setTargetLatency(100, 500);
|
||||||
|
|
||||||
|
mJobLoads[jtCLIENT].setTargetLatency(250, 1000);
|
||||||
|
mJobLoads[jtPEER].setTargetLatency(200, 1250);
|
||||||
|
mJobLoads[jtDISK].setTargetLatency(500, 1000);
|
||||||
|
mJobLoads[jtRPC].setTargetLatency(250, 750);
|
||||||
|
mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500);
|
||||||
|
mJobLoads[jtPUBLEDGER].setTargetLatency(1000, 2500);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
const char* Job::toString(JobType t)
|
const char* Job::toString(JobType t)
|
||||||
{
|
{
|
||||||
switch(t)
|
switch(t)
|
||||||
@@ -124,7 +143,8 @@ Json::Value JobQueue::getJson(int)
|
|||||||
for (int i = 0; i < NUM_JOB_TYPES; ++i)
|
for (int i = 0; i < NUM_JOB_TYPES; ++i)
|
||||||
{
|
{
|
||||||
uint64 count, latencyAvg, latencyPeak, jobCount;
|
uint64 count, latencyAvg, latencyPeak, jobCount;
|
||||||
mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak);
|
bool isOver;
|
||||||
|
mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak, isOver);
|
||||||
std::map<JobType, int>::iterator it = mJobCounts.find(static_cast<JobType>(i));
|
std::map<JobType, int>::iterator it = mJobCounts.find(static_cast<JobType>(i));
|
||||||
if (it == mJobCounts.end())
|
if (it == mJobCounts.end())
|
||||||
jobCount = 0;
|
jobCount = 0;
|
||||||
@@ -133,6 +153,8 @@ Json::Value JobQueue::getJson(int)
|
|||||||
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0))
|
if ((count != 0) || (jobCount != 0) || (latencyPeak != 0))
|
||||||
{
|
{
|
||||||
Json::Value pri(Json::objectValue);
|
Json::Value pri(Json::objectValue);
|
||||||
|
if (isOver)
|
||||||
|
pri["over_target"] = "true";
|
||||||
pri["job_type"] = Job::toString(static_cast<JobType>(i));
|
pri["job_type"] = Job::toString(static_cast<JobType>(i));
|
||||||
if (jobCount != 0)
|
if (jobCount != 0)
|
||||||
pri["waiting"] = static_cast<int>(jobCount);
|
pri["waiting"] = static_cast<int>(jobCount);
|
||||||
|
|||||||
@@ -22,11 +22,9 @@ enum JobType
|
|||||||
{ // must be in priority order, low to high
|
{ // must be in priority order, low to high
|
||||||
jtINVALID = -1,
|
jtINVALID = -1,
|
||||||
jtVALIDATION_ut = 0, // A validation from an untrusted source
|
jtVALIDATION_ut = 0, // A validation from an untrusted source
|
||||||
jtCLIENTOP_ut = 1, // A client operation from a non-local/untrusted source
|
|
||||||
jtPROOFWORK = 2, // A proof of work demand from another server
|
jtPROOFWORK = 2, // A proof of work demand from another server
|
||||||
jtTRANSACTION = 3, // A transaction received from the network
|
jtTRANSACTION = 3, // A transaction received from the network
|
||||||
jtPROPOSAL_ut = 4, // A proposal from an untrusted source
|
jtPROPOSAL_ut = 4, // A proposal from an untrusted source
|
||||||
jtCLIENTOP_t = 5, // A client operation from a trusted source
|
|
||||||
jtVALIDATION_t = 6, // A validation from a trusted source
|
jtVALIDATION_t = 6, // A validation from a trusted source
|
||||||
jtTRANSACTION_l = 7, // A local transaction
|
jtTRANSACTION_l = 7, // A local transaction
|
||||||
jtPROPOSAL_t = 8, // A proposal from a trusted source
|
jtPROPOSAL_t = 8, // A proposal from a trusted source
|
||||||
@@ -40,6 +38,7 @@ enum JobType
|
|||||||
jtRPC = 19,
|
jtRPC = 19,
|
||||||
jtACCEPTLEDGER = 20,
|
jtACCEPTLEDGER = 20,
|
||||||
jtPUBLEDGER = 21,
|
jtPUBLEDGER = 21,
|
||||||
|
jtTXN_PROC = 22,
|
||||||
};
|
};
|
||||||
#define NUM_JOB_TYPES 24
|
#define NUM_JOB_TYPES 24
|
||||||
|
|
||||||
@@ -91,7 +90,7 @@ protected:
|
|||||||
|
|
||||||
public:
|
public:
|
||||||
|
|
||||||
JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false) { ; }
|
JobQueue();
|
||||||
|
|
||||||
void addJob(JobType type, const boost::function<void(Job&)>& job);
|
void addJob(JobType type, const boost::function<void(Job&)>& job);
|
||||||
|
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ void LoadMonitor::addCountAndLatency(int counts, int latency)
|
|||||||
mLatencyMSPeak = lp;
|
mLatencyMSPeak = lp;
|
||||||
}
|
}
|
||||||
|
|
||||||
void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak)
|
void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver)
|
||||||
{
|
{
|
||||||
boost::mutex::scoped_lock sl(mLock);
|
boost::mutex::scoped_lock sl(mLock);
|
||||||
|
|
||||||
@@ -87,4 +87,5 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64&
|
|||||||
latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
|
latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
|
||||||
latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
|
latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
|
||||||
}
|
}
|
||||||
|
isOver = isOverTarget();
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -17,20 +17,35 @@ protected:
|
|||||||
uint64 mLatencyEvents;
|
uint64 mLatencyEvents;
|
||||||
uint64 mLatencyMSAvg;
|
uint64 mLatencyMSAvg;
|
||||||
uint64 mLatencyMSPeak;
|
uint64 mLatencyMSPeak;
|
||||||
|
uint64 mTargetLatencyAvg;
|
||||||
|
uint64 mTargetLatencyPk;
|
||||||
time_t mLastUpdate;
|
time_t mLastUpdate;
|
||||||
boost::mutex mLock;
|
boost::mutex mLock;
|
||||||
|
|
||||||
void update();
|
void update();
|
||||||
|
|
||||||
public:
|
public:
|
||||||
LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0)
|
LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0),
|
||||||
|
mTargetLatencyAvg(0), mTargetLatencyPk(0)
|
||||||
{ mLastUpdate = time(NULL); }
|
{ mLastUpdate = time(NULL); }
|
||||||
|
|
||||||
void addCount(int counts);
|
void addCount(int counts);
|
||||||
void addLatency(int latency);
|
void addLatency(int latency);
|
||||||
void addCountAndLatency(int counts, int latency);
|
void addCountAndLatency(int counts, int latency);
|
||||||
|
|
||||||
void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak);
|
void setTargetLatency(uint64 avg, uint64 pk)
|
||||||
|
{
|
||||||
|
mTargetLatencyAvg = avg;
|
||||||
|
mTargetLatencyPk = pk;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool isOverTarget()
|
||||||
|
{
|
||||||
|
return (mTargetLatencyPk && (mLatencyMSPeak > mTargetLatencyPk)) ||
|
||||||
|
(mTargetLatencyAvg && (mLatencyMSAvg > mTargetLatencyAvg));
|
||||||
|
}
|
||||||
|
|
||||||
|
void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver);
|
||||||
};
|
};
|
||||||
|
|
||||||
class LoadEvent
|
class LoadEvent
|
||||||
|
|||||||
@@ -167,6 +167,7 @@ Transaction::pointer NetworkOPs::submitTransactionSync(const Transaction::pointe
|
|||||||
|
|
||||||
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback)
|
Transaction::pointer NetworkOPs::processTransaction(Transaction::pointer trans, stCallback callback)
|
||||||
{
|
{
|
||||||
|
LoadEvent::autoptr ev = theApp->getJobQueue().getLoadEventAP(jtTXN_PROC);
|
||||||
|
|
||||||
int newFlags = theApp->getSuppression().getFlags(trans->getID());
|
int newFlags = theApp->getSuppression().getFlags(trans->getID());
|
||||||
if ((newFlags & SF_BAD) != 0)
|
if ((newFlags & SF_BAD) != 0)
|
||||||
|
|||||||
Reference in New Issue
Block a user