Rework the way load is accounted to be more accurate and more specific.

This commit is contained in:
JoelKatz
2013-03-03 16:24:47 -08:00
parent a3dcc36e05
commit f0c029ef0d
12 changed files with 47 additions and 23 deletions

View File

@@ -223,7 +223,7 @@ void SqliteDatabase::doHook(const char *db, int pages)
{
walRunning = true;
if (mWalQ)
mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this));
mWalQ->addJob(jtWAL, std::string("WAL:") + db, boost::bind(&SqliteDatabase::runWal, this));
else
boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach();
}

View File

@@ -51,7 +51,8 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index,
if (!mWritePending)
{
mWritePending = true;
theApp->getJobQueue().addJob(jtWRITE, boost::bind(&HashedObjectStore::bulkWrite, this));
theApp->getJobQueue().addJob(jtWRITE, "HasedObject::store",
boost::bind(&HashedObjectStore::bulkWrite, this));
}
}
// else

View File

@@ -98,7 +98,7 @@ bool Job::operator<=(const Job& j) const
return mJobIndex <= j.mJobIndex;
}
void JobQueue::addJob(JobType type, const boost::function<void(Job&)>& jobFunc)
void JobQueue::addJob(JobType type, const std::string& name, const boost::function<void(Job&)>& jobFunc)
{
assert(type != jtINVALID);
@@ -107,7 +107,7 @@ void JobQueue::addJob(JobType type, const boost::function<void(Job&)>& jobFunc)
if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
assert(mThreadCount != 0); // do not add jobs to a queue with no threads
mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc));
mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc));
++mJobCounts[type];
mJobCond.notify_one();
}

View File

@@ -54,6 +54,7 @@ protected:
uint64 mJobIndex;
boost::function<void(Job&)> mJob;
LoadEvent::pointer mLoadMonitor;
std::string mName;
public:
@@ -62,12 +63,15 @@ public:
Job(JobType type, uint64 index) : mType(type), mJobIndex(index)
{ ; }
Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function<void(Job&)>& job)
: mType(type), mJobIndex(index), mJob(job)
{ mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), true, 1); }
Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const boost::function<void(Job&)>& job)
: mType(type), mJobIndex(index), mJob(job), mName(name)
{
mLoadMonitor = boost::make_shared<LoadEvent>(boost::ref(lm), false, 1);
}
JobType getType() const { return mType; }
void doJob(void) { mJob(*this); }
void doJob(void) { mLoadMonitor->start(); mJob(*this); mLoadMonitor->setName(mName); }
void rename(const std::string& n) { mName = n; }
bool operator<(const Job& j) const;
bool operator>(const Job& j) const;
@@ -97,7 +101,7 @@ public:
JobQueue();
void addJob(JobType type, const boost::function<void(Job&)>& job);
void addJob(JobType type, const std::string& name, const boost::function<void(Job&)>& job);
int getJobCount(JobType t); // Jobs at this priority
int getJobCountGE(JobType t); // All jobs at or greater than this priority

View File

@@ -186,7 +186,8 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l
{
cLog(lsTRACE) << "Ledger hash found in database";
mTooFast = true;
theApp->getJobQueue().addJob(jtPUBOLDLEDGER, boost::bind(&LedgerMaster::asyncAccept, this, ledger));
theApp->getJobQueue().addJob(jtPUBOLDLEDGER, "LedgerMaster::asyncAccept",
boost::bind(&LedgerMaster::asyncAccept, this, ledger));
return true;
}
@@ -540,7 +541,8 @@ void LedgerMaster::tryPublish()
{
theApp->getOPs().clearNeedNetworkLedger();
mPubThread = true;
theApp->getJobQueue().addJob(jtPUBLEDGER, boost::bind(&LedgerMaster::pubThread, this));
theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread",
boost::bind(&LedgerMaster::pubThread, this));
}
}

View File

@@ -318,7 +318,10 @@ void LoadManager::threadEntry()
bool change;
if (theApp->getJobQueue().isOverloaded())
{
cLog(lsINFO) << theApp->getJobQueue().getJson(0);
change = theApp->getFeeTrack().raiseLocalFee();
}
else
change = theApp->getFeeTrack().lowerLocalFee();
if (change)

View File

@@ -1,4 +1,7 @@
#include "LoadMonitor.h"
#include "Log.h"
SETUP_LOG();
void LoadMonitor::update()
{ // call with the mutex
@@ -52,8 +55,10 @@ void LoadMonitor::addLatency(int latency)
mLatencyMSPeak = lp;
}
void LoadMonitor::addCountAndLatency(int counts, int latency)
void LoadMonitor::addCountAndLatency(const std::string& name, int counts, int latency)
{
if (latency > 1000)
cLog(lsWARNING) << "Job: " << name << " Latency: " << latency;
if (latency == 1)
latency = 0;
boost::mutex::scoped_lock sl(mLock);

View File

@@ -32,7 +32,7 @@ public:
void addCount(int counts);
void addLatency(int latency);
void addCountAndLatency(int counts, int latency);
void addCountAndLatency(const std::string& name, int counts, int latency);
void setTargetLatency(uint64 avg, uint64 pk)
{
@@ -60,6 +60,7 @@ protected:
LoadMonitor& mMonitor;
bool mRunning;
int mCount;
std::string mName;
boost::posix_time::ptime mStartTime;
public:
@@ -76,6 +77,11 @@ public:
stop();
}
void setName(const std::string& name)
{
mName = name;
}
void start()
{ // okay to call if already started
mRunning = true;
@@ -86,7 +92,7 @@ public:
{
assert(mRunning);
mRunning = false;
mMonitor.addCountAndLatency(mCount,
mMonitor.addCountAndLatency(mName, mCount,
static_cast<int>((boost::posix_time::microsec_clock::universal_time() - mStartTime).total_milliseconds()));
}
};

View File

@@ -1377,7 +1377,7 @@ void NetworkOPs::reportFeeChange()
(theApp->getFeeTrack().getLoadFactor() == mLastLoadFactor))
return;
theApp->getJobQueue().addJob(jtCLIENT, boost::bind(&NetworkOPs::pubServer, this));
theApp->getJobQueue().addJob(jtCLIENT, "reportFeeChange->pubServer", boost::bind(&NetworkOPs::pubServer, this));
}
Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TER terResult, bool bAccepted, Ledger::ref lpCurrent, const std::string& strType)

View File

@@ -857,7 +857,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet)
return;
}
theApp->getJobQueue().addJob(jtTRANSACTION,
theApp->getJobQueue().addJob(jtTRANSACTION, "recvTransction->checkTransaction",
boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr<Peer>(shared_from_this())));
#ifndef TRUST_NETWORK
@@ -986,7 +986,7 @@ void Peer::recvPropose(const boost::shared_ptr<ripple::TMProposeSet>& packet)
prevLedger.isNonZero() ? prevLedger : consensusLCL,
set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression);
theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut,
theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose",
boost::bind(&checkPropose, _1, packet, proposal, consensusLCL,
mNodePublic, boost::weak_ptr<Peer>(shared_from_this())));
}
@@ -1062,7 +1062,7 @@ void Peer::recvValidation(const boost::shared_ptr<ripple::TMValidation>& packet)
}
bool isTrusted = theApp->getUNL().nodeInUNL(val->getSignerPublic());
theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut,
theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation",
boost::bind(&checkValidation, _1, val, signingHash, isTrusted, packet,
boost::weak_ptr<Peer>(shared_from_this())));
}
@@ -1299,7 +1299,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet)
return;
}
theApp->getJobQueue().addJob(jtPROOFWORK,
theApp->getJobQueue().addJob(jtPROOFWORK, "recvProof->doProof",
boost::bind(&Peer::doProofOfWork, _1, boost::weak_ptr<Peer>(shared_from_this()), pow));
return;

View File

@@ -293,7 +293,8 @@ void ValidationCollection::condWrite()
if (mWriting)
return;
mWriting = true;
theApp->getJobQueue().addJob(jtWRITE, boost::bind(&ValidationCollection::doWrite, this, _1));
theApp->getJobQueue().addJob(jtWRITE, "ValidationCollection::doWrite",
boost::bind(&ValidationCollection::doWrite, this, _1));
}
void ValidationCollection::doWrite(Job&)

View File

@@ -152,17 +152,17 @@ public:
ptr->preDestroy(); // Must be done before we return
// Must be done without holding the websocket send lock
theApp->getJobQueue().addJob(jtCLIENT,
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::destroy",
boost::bind(&WSConnection<endpoint_type>::destroy, ptr));
}
void on_message(connection_ptr cpClient, message_ptr mpMessage)
{
theApp->getJobQueue().addJob(jtCLIENT,
theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command",
boost::bind(&WSServerHandler<endpoint_type>::do_message, this, _1, cpClient, mpMessage));
}
void do_message(Job&, connection_ptr cpClient, message_ptr mpMessage)
void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage)
{
Json::Value jvRequest;
Json::Reader jrReader;
@@ -190,6 +190,8 @@ public:
}
else
{
if (jvRequest.isMember("command"))
job.rename(std::string("WSClient::") + jvRequest["command"].asString());
boost::shared_ptr< WSConnection<endpoint_type> > conn;
{
boost::mutex::scoped_lock sl(mMapLock);