From f0c029ef0d09e773b1216de1497d007912d44f5e Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sun, 3 Mar 2013 16:24:47 -0800 Subject: [PATCH] Rework the way load is accounted to be more accurate and more specific. --- src/cpp/database/SqliteDatabase.cpp | 2 +- src/cpp/ripple/HashedObject.cpp | 3 ++- src/cpp/ripple/JobQueue.cpp | 4 ++-- src/cpp/ripple/JobQueue.h | 14 +++++++++----- src/cpp/ripple/LedgerMaster.cpp | 6 ++++-- src/cpp/ripple/LoadManager.cpp | 3 +++ src/cpp/ripple/LoadMonitor.cpp | 7 ++++++- src/cpp/ripple/LoadMonitor.h | 10 ++++++++-- src/cpp/ripple/NetworkOPs.cpp | 2 +- src/cpp/ripple/Peer.cpp | 8 ++++---- src/cpp/ripple/ValidationCollection.cpp | 3 ++- src/cpp/ripple/WSHandler.h | 8 +++++--- 12 files changed, 47 insertions(+), 23 deletions(-) diff --git a/src/cpp/database/SqliteDatabase.cpp b/src/cpp/database/SqliteDatabase.cpp index 35062d29e..e9e70fed4 100644 --- a/src/cpp/database/SqliteDatabase.cpp +++ b/src/cpp/database/SqliteDatabase.cpp @@ -223,7 +223,7 @@ void SqliteDatabase::doHook(const char *db, int pages) { walRunning = true; if (mWalQ) - mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this)); + mWalQ->addJob(jtWAL, std::string("WAL:") + db, boost::bind(&SqliteDatabase::runWal, this)); else boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach(); } diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp index 6dd34b6ef..489ab6b45 100644 --- a/src/cpp/ripple/HashedObject.cpp +++ b/src/cpp/ripple/HashedObject.cpp @@ -51,7 +51,8 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, if (!mWritePending) { mWritePending = true; - theApp->getJobQueue().addJob(jtWRITE, boost::bind(&HashedObjectStore::bulkWrite, this)); + theApp->getJobQueue().addJob(jtWRITE, "HasedObject::store", + boost::bind(&HashedObjectStore::bulkWrite, this)); } } // else diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 3dc9386d0..cc4c7ca01 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -98,7 +98,7 @@ bool Job::operator<=(const Job& j) const return mJobIndex <= j.mJobIndex; } -void JobQueue::addJob(JobType type, const boost::function& jobFunc) +void JobQueue::addJob(JobType type, const std::string& name, const boost::function& jobFunc) { assert(type != jtINVALID); @@ -107,7 +107,7 @@ void JobQueue::addJob(JobType type, const boost::function& jobFunc) if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering assert(mThreadCount != 0); // do not add jobs to a queue with no threads - mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc)); + mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc)); ++mJobCounts[type]; mJobCond.notify_one(); } diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 10dfbc7dd..386ccf5ff 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -54,6 +54,7 @@ protected: uint64 mJobIndex; boost::function mJob; LoadEvent::pointer mLoadMonitor; + std::string mName; public: @@ -62,12 +63,15 @@ public: Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; } - Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function& job) - : mType(type), mJobIndex(index), mJob(job) - { mLoadMonitor = boost::make_shared(boost::ref(lm), true, 1); } + Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const boost::function& job) + : mType(type), mJobIndex(index), mJob(job), mName(name) + { + mLoadMonitor = boost::make_shared(boost::ref(lm), false, 1); + } JobType getType() const { return mType; } - void doJob(void) { mJob(*this); } + void doJob(void) { mLoadMonitor->start(); mJob(*this); mLoadMonitor->setName(mName); } + void rename(const std::string& n) { mName = n; } bool operator<(const Job& j) const; bool operator>(const Job& j) const; @@ -97,7 +101,7 @@ public: JobQueue(); - void addJob(JobType type, const boost::function& job); + void addJob(JobType type, const std::string& name, const boost::function& job); int getJobCount(JobType t); // Jobs at this priority int getJobCountGE(JobType t); // All jobs at or greater than this priority diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 8f538f8bf..ab87238da 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -186,7 +186,8 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l { cLog(lsTRACE) << "Ledger hash found in database"; mTooFast = true; - theApp->getJobQueue().addJob(jtPUBOLDLEDGER, boost::bind(&LedgerMaster::asyncAccept, this, ledger)); + theApp->getJobQueue().addJob(jtPUBOLDLEDGER, "LedgerMaster::asyncAccept", + boost::bind(&LedgerMaster::asyncAccept, this, ledger)); return true; } @@ -540,7 +541,8 @@ void LedgerMaster::tryPublish() { theApp->getOPs().clearNeedNetworkLedger(); mPubThread = true; - theApp->getJobQueue().addJob(jtPUBLEDGER, boost::bind(&LedgerMaster::pubThread, this)); + theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread", + boost::bind(&LedgerMaster::pubThread, this)); } } diff --git a/src/cpp/ripple/LoadManager.cpp b/src/cpp/ripple/LoadManager.cpp index a476c37e1..dc5ac0891 100644 --- a/src/cpp/ripple/LoadManager.cpp +++ b/src/cpp/ripple/LoadManager.cpp @@ -318,7 +318,10 @@ void LoadManager::threadEntry() bool change; if (theApp->getJobQueue().isOverloaded()) + { + cLog(lsINFO) << theApp->getJobQueue().getJson(0); change = theApp->getFeeTrack().raiseLocalFee(); + } else change = theApp->getFeeTrack().lowerLocalFee(); if (change) diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp index 567d08c94..88f304c6d 100644 --- a/src/cpp/ripple/LoadMonitor.cpp +++ b/src/cpp/ripple/LoadMonitor.cpp @@ -1,4 +1,7 @@ #include "LoadMonitor.h" +#include "Log.h" + +SETUP_LOG(); void LoadMonitor::update() { // call with the mutex @@ -52,8 +55,10 @@ void LoadMonitor::addLatency(int latency) mLatencyMSPeak = lp; } -void LoadMonitor::addCountAndLatency(int counts, int latency) +void LoadMonitor::addCountAndLatency(const std::string& name, int counts, int latency) { + if (latency > 1000) + cLog(lsWARNING) << "Job: " << name << " Latency: " << latency; if (latency == 1) latency = 0; boost::mutex::scoped_lock sl(mLock); diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h index 18cbb228d..4989f1e14 100644 --- a/src/cpp/ripple/LoadMonitor.h +++ b/src/cpp/ripple/LoadMonitor.h @@ -32,7 +32,7 @@ public: void addCount(int counts); void addLatency(int latency); - void addCountAndLatency(int counts, int latency); + void addCountAndLatency(const std::string& name, int counts, int latency); void setTargetLatency(uint64 avg, uint64 pk) { @@ -60,6 +60,7 @@ protected: LoadMonitor& mMonitor; bool mRunning; int mCount; + std::string mName; boost::posix_time::ptime mStartTime; public: @@ -76,6 +77,11 @@ public: stop(); } + void setName(const std::string& name) + { + mName = name; + } + void start() { // okay to call if already started mRunning = true; @@ -86,7 +92,7 @@ public: { assert(mRunning); mRunning = false; - mMonitor.addCountAndLatency(mCount, + mMonitor.addCountAndLatency(mName, mCount, static_cast((boost::posix_time::microsec_clock::universal_time() - mStartTime).total_milliseconds())); } }; diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 1c81e5c40..fd3ff143a 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -1377,7 +1377,7 @@ void NetworkOPs::reportFeeChange() (theApp->getFeeTrack().getLoadFactor() == mLastLoadFactor)) return; - theApp->getJobQueue().addJob(jtCLIENT, boost::bind(&NetworkOPs::pubServer, this)); + theApp->getJobQueue().addJob(jtCLIENT, "reportFeeChange->pubServer", boost::bind(&NetworkOPs::pubServer, this)); } Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TER terResult, bool bAccepted, Ledger::ref lpCurrent, const std::string& strType) diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 37e9bcb21..e8a8b8815 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -857,7 +857,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) return; } - theApp->getJobQueue().addJob(jtTRANSACTION, + theApp->getJobQueue().addJob(jtTRANSACTION, "recvTransction->checkTransaction", boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); #ifndef TRUST_NETWORK @@ -986,7 +986,7 @@ void Peer::recvPropose(const boost::shared_ptr& packet) prevLedger.isNonZero() ? prevLedger : consensusLCL, set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression); - theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", boost::bind(&checkPropose, _1, packet, proposal, consensusLCL, mNodePublic, boost::weak_ptr(shared_from_this()))); } @@ -1062,7 +1062,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet) } bool isTrusted = theApp->getUNL().nodeInUNL(val->getSignerPublic()); - theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, + theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", boost::bind(&checkValidation, _1, val, signingHash, isTrusted, packet, boost::weak_ptr(shared_from_this()))); } @@ -1299,7 +1299,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet) return; } - theApp->getJobQueue().addJob(jtPROOFWORK, + theApp->getJobQueue().addJob(jtPROOFWORK, "recvProof->doProof", boost::bind(&Peer::doProofOfWork, _1, boost::weak_ptr(shared_from_this()), pow)); return; diff --git a/src/cpp/ripple/ValidationCollection.cpp b/src/cpp/ripple/ValidationCollection.cpp index cfdec56cd..699dc8afb 100644 --- a/src/cpp/ripple/ValidationCollection.cpp +++ b/src/cpp/ripple/ValidationCollection.cpp @@ -293,7 +293,8 @@ void ValidationCollection::condWrite() if (mWriting) return; mWriting = true; - theApp->getJobQueue().addJob(jtWRITE, boost::bind(&ValidationCollection::doWrite, this, _1)); + theApp->getJobQueue().addJob(jtWRITE, "ValidationCollection::doWrite", + boost::bind(&ValidationCollection::doWrite, this, _1)); } void ValidationCollection::doWrite(Job&) diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index b525859b9..81011b24b 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -152,17 +152,17 @@ public: ptr->preDestroy(); // Must be done before we return // Must be done without holding the websocket send lock - theApp->getJobQueue().addJob(jtCLIENT, + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::destroy", boost::bind(&WSConnection::destroy, ptr)); } void on_message(connection_ptr cpClient, message_ptr mpMessage) { - theApp->getJobQueue().addJob(jtCLIENT, + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", boost::bind(&WSServerHandler::do_message, this, _1, cpClient, mpMessage)); } - void do_message(Job&, connection_ptr cpClient, message_ptr mpMessage) + void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage) { Json::Value jvRequest; Json::Reader jrReader; @@ -190,6 +190,8 @@ public: } else { + if (jvRequest.isMember("command")) + job.rename(std::string("WSClient::") + jvRequest["command"].asString()); boost::shared_ptr< WSConnection > conn; { boost::mutex::scoped_lock sl(mMapLock);