Job queue simplificaiton

This commit is contained in:
JoelKatz
2013-03-06 19:30:08 -08:00
parent dc3d82272e
commit 0007d76923
6 changed files with 18 additions and 23 deletions

View File

@@ -26,7 +26,6 @@ JobQueue::JobQueue() : mLastJob(0), mThreadCount(0), mShuttingDown(false)
mJobLoads[jtCLIENT].setTargetLatency(2000, 5000); mJobLoads[jtCLIENT].setTargetLatency(2000, 5000);
mJobLoads[jtPEER].setTargetLatency(200, 1250); mJobLoads[jtPEER].setTargetLatency(200, 1250);
mJobLoads[jtDISK].setTargetLatency(500, 1000); mJobLoads[jtDISK].setTargetLatency(500, 1000);
mJobLoads[jtRPC].setTargetLatency(1250, 1750);
mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500); mJobLoads[jtACCEPTLEDGER].setTargetLatency(1000, 2500);
} }
@@ -53,7 +52,6 @@ const char* Job::toString(JobType t)
case jtPEER: return "peerCommand"; case jtPEER: return "peerCommand";
case jtDISK: return "diskAccess"; case jtDISK: return "diskAccess";
case jtRPC: return "rpc";
case jtACCEPTLEDGER: return "acceptLedger"; case jtACCEPTLEDGER: return "acceptLedger";
case jtTXN_PROC: return "processTransaction"; case jtTXN_PROC: return "processTransaction";
case jtOB_SETUP: return "orderBookSetup"; case jtOB_SETUP: return "orderBookSetup";

View File

@@ -39,11 +39,10 @@ enum JobType
// special types not dispatched by the job pool // special types not dispatched by the job pool
jtPEER = 24, jtPEER = 24,
jtDISK = 25, jtDISK = 25,
jtRPC = 26, jtACCEPTLEDGER = 26,
jtACCEPTLEDGER = 27, jtTXN_PROC = 27,
jtTXN_PROC = 28, jtOB_SETUP = 28,
jtOB_SETUP = 29, jtPATH_FIND = 29
jtPATH_FIND = 30
}; // CAUTION: If you add new types, add them to JobType.cpp too }; // CAUTION: If you add new types, add them to JobType.cpp too
#define NUM_JOB_TYPES 32 #define NUM_JOB_TYPES 32
@@ -66,7 +65,7 @@ public:
Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const boost::function<void(Job&)>& job) Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const boost::function<void(Job&)>& job)
: mType(type), mJobIndex(index), mJob(job), mName(name) : mType(type), mJobIndex(index), mJob(job), mName(name)
{ {
mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), name, false, 1); mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), name, false);
} }
JobType getType() const { return mType; } JobType getType() const { return mType; }
@@ -112,9 +111,9 @@ public:
void setThreadCount(int c = 0); void setThreadCount(int c = 0);
LoadEvent::pointer getLoadEvent(JobType t, const std::string& name) LoadEvent::pointer getLoadEvent(JobType t, const std::string& name)
{ return boost::make_shared<LoadEvent>(boost::ref(mJobLoads[t]), name, true, 1); } { return boost::make_shared<LoadEvent>(boost::ref(mJobLoads[t]), name, true); }
LoadEvent::autoptr getLoadEventAP(JobType t, const std::string& name) LoadEvent::autoptr getLoadEventAP(JobType t, const std::string& name)
{ return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], name, true, 1)); } { return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], name, true)); }
int isOverloaded(); int isOverloaded();
Json::Value getJson(int c = 0); Json::Value getJson(int c = 0);

View File

@@ -30,12 +30,12 @@ void LoadMonitor::update()
} while (mLastUpdate < now); } while (mLastUpdate < now);
} }
void LoadMonitor::addCount(int counts) void LoadMonitor::addCount()
{ {
boost::mutex::scoped_lock sl(mLock); boost::mutex::scoped_lock sl(mLock);
update(); update();
mCounts += counts; ++mCounts;
} }
void LoadMonitor::addLatency(int latency) void LoadMonitor::addLatency(int latency)
@@ -55,7 +55,7 @@ void LoadMonitor::addLatency(int latency)
mLatencyMSPeak = lp; mLatencyMSPeak = lp;
} }
void LoadMonitor::addCountAndLatency(const std::string& name, int counts, int latency) void LoadMonitor::addCountAndLatency(const std::string& name, int latency)
{ {
if (latency > 500) if (latency > 500)
{ {
@@ -63,10 +63,11 @@ void LoadMonitor::addCountAndLatency(const std::string& name, int counts, int la
} }
if (latency == 1) if (latency == 1)
latency = 0; latency = 0;
boost::mutex::scoped_lock sl(mLock); boost::mutex::scoped_lock sl(mLock);
update(); update();
mCounts += counts; ++mCounts;
++mLatencyEvents; ++mLatencyEvents;
mLatencyMSAvg += latency; mLatencyMSAvg += latency;
mLatencyMSPeak += latency; mLatencyMSPeak += latency;

View File

@@ -30,9 +30,9 @@ public:
mTargetLatencyAvg(0), mTargetLatencyPk(0) mTargetLatencyAvg(0), mTargetLatencyPk(0)
{ mLastUpdate = upTime(); } { mLastUpdate = upTime(); }
void addCount(int counts); void addCount();
void addLatency(int latency); void addLatency(int latency);
void addCountAndLatency(const std::string& name, int counts, int latency); void addCountAndLatency(const std::string& name, int latency);
void setTargetLatency(uint64 avg, uint64 pk) void setTargetLatency(uint64 avg, uint64 pk)
{ {
@@ -59,13 +59,12 @@ public:
protected: protected:
LoadMonitor& mMonitor; LoadMonitor& mMonitor;
bool mRunning; bool mRunning;
int mCount;
std::string mName; std::string mName;
boost::posix_time::ptime mStartTime; boost::posix_time::ptime mStartTime;
public: public:
LoadEvent(LoadMonitor& monitor, const std::string& name, bool shouldStart, int count) : LoadEvent(LoadMonitor& monitor, const std::string& name, bool shouldStart) :
mMonitor(monitor), mRunning(false), mCount(count), mName(name) mMonitor(monitor), mRunning(false), mName(name)
{ {
mStartTime = boost::posix_time::microsec_clock::universal_time(); mStartTime = boost::posix_time::microsec_clock::universal_time();
if (shouldStart) if (shouldStart)
@@ -93,7 +92,7 @@ public:
{ {
assert(mRunning); assert(mRunning);
mRunning = false; mRunning = false;
mMonitor.addCountAndLatency(mName, mCount, mMonitor.addCountAndLatency(mName,
static_cast<int>((boost::posix_time::microsec_clock::universal_time() - mStartTime).total_milliseconds())); static_cast<int>((boost::posix_time::microsec_clock::universal_time() - mStartTime).total_milliseconds()));
} }
}; };

View File

@@ -913,7 +913,7 @@ static void checkPropose(Job& job, boost::shared_ptr<ripple::TMProposeSet> packe
memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8); memcpy(prevLedger.begin(), set.previousledger().data(), 256 / 8);
if (!proposal->checkSign(set.signature())) if (!proposal->checkSign(set.signature()))
{ {
cLog(lsWARNING) << "proposal with previous ledger fails signature check"; cLog(lsWARNING) << "proposal with previous ledger fails signature check: " << getIP();
Peer::punishPeer(peer, LT_InvalidSignature); Peer::punishPeer(peer, LT_InvalidSignature);
return; return;
} }

View File

@@ -2893,8 +2893,6 @@ Json::Value RPCHandler::doCommand(const Json::Value& jvRequest, int iRole)
cLog(lsTRACE) << "COMMAND:" << strCommand; cLog(lsTRACE) << "COMMAND:" << strCommand;
cLog(lsTRACE) << "REQUEST:" << jvRequest; cLog(lsTRACE) << "REQUEST:" << jvRequest;
LoadEvent::autoptr le(theApp->getJobQueue().getLoadEventAP(jtRPC, "RPC"));
mRole = iRole; mRole = iRole;
static struct { static struct {