From 5aa810404b2df974acfc027bfe6e4db419629ee9 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 5 Feb 2013 21:33:42 -0800 Subject: [PATCH] Track uptime. Update local fee schedule based on load manager. --- src/cpp/ripple/Application.cpp | 1 + src/cpp/ripple/JobQueue.cpp | 10 +++++ src/cpp/ripple/JobQueue.h | 1 + src/cpp/ripple/LoadManager.cpp | 68 +++++++++++++++++++++++++++++++++- src/cpp/ripple/LoadManager.h | 8 ++++ src/cpp/ripple/LoadMonitor.cpp | 14 +++++++ src/cpp/ripple/LoadMonitor.h | 5 ++- src/cpp/ripple/RPCHandler.cpp | 24 ++++++++++++ 8 files changed, 128 insertions(+), 3 deletions(-) diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 2f5d9f68a1..072ceb0b71 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -52,6 +52,7 @@ Application::Application() : mJobQueue.setThreadCount(); mSweepTimer.expires_from_now(boost::posix_time::seconds(10)); mSweepTimer.async_wait(boost::bind(&Application::sweep, this)); + mLoadMgr.init(); } extern const char *RpcDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[]; diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 5d0106f381..3a795fd141 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -181,6 +181,16 @@ Json::Value JobQueue::getJson(int) return ret; } +int JobQueue::isOverloaded() +{ + int count = 0; + boost::mutex::scoped_lock sl(mJobLock); + for (int i = 0; i < NUM_JOB_TYPES; ++i) + if (mJobLoads[i].isOver()) + ++count; + return count; +} + void JobQueue::shutdown() { // shut down the job queue without completing pending jobs cLog(lsINFO) << "Job queue shutting down"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 1df31aca7e..22bfefad95 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -108,6 +108,7 @@ public: LoadEvent::autoptr getLoadEventAP(JobType t) { return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], true, 1)); } + int isOverloaded(); Json::Value getJson(int c = 0); }; diff --git a/src/cpp/ripple/LoadManager.cpp b/src/cpp/ripple/LoadManager.cpp index daacb52153..f8454c663d 100644 --- a/src/cpp/ripple/LoadManager.cpp +++ b/src/cpp/ripple/LoadManager.cpp @@ -1,15 +1,18 @@ #include "LoadManager.h" #include +#include +#include #include "Log.h" #include "Config.h" +#include "Application.h" SETUP_LOG(); LoadManager::LoadManager(int creditRate, int creditLimit, int debitWarn, int debitLimit) : mCreditRate(creditRate), mCreditLimit(creditLimit), mDebitWarn(debitWarn), mDebitLimit(debitLimit), - mCosts(LT_MAX) + mShutdown(false), mUptime(0), mCosts(LT_MAX) { addLoadCost(LoadCost(LT_InvalidRequest, 10, LC_CPU | LC_Network)); addLoadCost(LoadCost(LT_RequestNoReply, 1, LC_CPU | LC_Disk)); @@ -25,6 +28,30 @@ LoadManager::LoadManager(int creditRate, int creditLimit, int debitWarn, int deb addLoadCost(LoadCost(LT_CheapQuery, 1, LC_CPU)); } +void LoadManager::init() +{ + boost::thread(boost::bind(&LoadManager::threadEntry, this)).detach(); +} + +LoadManager::~LoadManager() +{ + { + boost::mutex::scoped_lock sl(mLock); + mShutdown = true; + } + + do + { + boost::this_thread::sleep(boost::posix_time::milliseconds(100)); + { + boost::mutex::scoped_lock sl(mLock); + if (!mShutdown) + return; + } + } + while (1); +} + int LoadManager::getCreditRate() const { @@ -239,6 +266,45 @@ Json::Value LoadFeeTrack::getJson(uint64 baseFee, uint32 referenceFeeUnits) return j; } +int LoadManager::getUptime() +{ + boost::mutex::scoped_lock sl(mLock); + return mUptime; +} + +void LoadManager::threadEntry() +{ + boost::posix_time::ptime t = boost::posix_time::microsec_clock::universal_time(); + while (1) + { + { + boost::mutex::scoped_lock sl(mLock); + if (mShutdown) + { + mShutdown = false; + return; + } + ++mUptime; + } + + if (theApp->getJobQueue().isOverloaded()) + theApp->getFeeTrack().raiseLocalFee(); + else + theApp->getFeeTrack().lowerLocalFee(); + + t += boost::posix_time::seconds(1); + boost::posix_time::time_duration when = t - boost::posix_time::microsec_clock::universal_time(); + + if (when.is_negative()) + { + cLog(lsWARNING) << "Backwards time jump"; + t = boost::posix_time::microsec_clock::universal_time(); + } + else + boost::this_thread::sleep(when); + } +} + BOOST_AUTO_TEST_SUITE(LoadManager_test) BOOST_AUTO_TEST_CASE(LoadFeeTrack_test) diff --git a/src/cpp/ripple/LoadManager.h b/src/cpp/ripple/LoadManager.h index 0cecd6c1e1..071d0f1436 100644 --- a/src/cpp/ripple/LoadManager.h +++ b/src/cpp/ripple/LoadManager.h @@ -86,6 +86,9 @@ protected: int mDebitWarn; // when a source drops below this, we warn int mDebitLimit; // when a source drops below this, we cut it off (should be negative) + bool mShutdown; + int mUptime; + mutable boost::mutex mLock; void canonicalize(LoadSource&, const time_t now) const; @@ -94,9 +97,13 @@ protected: void addLoadCost(const LoadCost& c) { mCosts[static_cast(c.mType)] = c; } + void threadEntry(); + public: LoadManager(int creditRate = 10, int creditLimit = 50, int debitWarn = -50, int debitLimit = -100); + ~LoadManager(); + void init(); int getCreditRate() const; int getCreditLimit() const; @@ -113,6 +120,7 @@ public: bool adjust(LoadSource&, LoadType l) const; int getCost(LoadType t) { return mCosts[static_cast(t)].mCost; } + int getUptime(); }; class LoadFeeTrack diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp index b014c330ec..312de87a72 100644 --- a/src/cpp/ripple/LoadMonitor.cpp +++ b/src/cpp/ripple/LoadMonitor.cpp @@ -69,6 +69,18 @@ void LoadMonitor::addCountAndLatency(int counts, int latency) mLatencyMSPeak = lp; } +bool LoadMonitor::isOver() +{ + boost::mutex::scoped_lock sl(mLock); + + update(); + + if (mLatencyEvents == 0) + return 0; + + return isOverTarget(mLatencyMSAvg / (mLatencyEvents * 4), mLatencyMSPeak / (mLatencyEvents * 4)); +} + void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver) { boost::mutex::scoped_lock sl(mLock); @@ -89,3 +101,5 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& } isOver = isOverTarget(latencyAvg, latencyPeak); } + +// vim:ts=4 diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h index ba210b75c3..efda49b53b 100644 --- a/src/cpp/ripple/LoadMonitor.h +++ b/src/cpp/ripple/LoadMonitor.h @@ -35,8 +35,8 @@ public: void setTargetLatency(uint64 avg, uint64 pk) { - mTargetLatencyAvg = avg * 4; - mTargetLatencyPk = pk * 4; + mTargetLatencyAvg = avg; + mTargetLatencyPk = pk; } bool isOverTarget(uint64 avg, uint64 peak) @@ -46,6 +46,7 @@ public: } void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver); + bool isOver(); }; class LoadEvent diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 8fa825b86a..7def72ca93 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -1786,6 +1786,21 @@ Json::Value RPCHandler::doLogin(Json::Value jvRequest) } #endif +static void textTime(std::string& text, int& seconds, const char *unitName, int unitVal) +{ + int i = seconds / unitVal; + if (i == 0) + return; + seconds -= unitVal * i; + if (!text.empty()) + text += ", "; + text += boost::lexical_cast(i); + text += " "; + text += unitName; + if (i > 1) + text += "s"; +} + // { // min_count: // optional, defaults to 10 // } @@ -1807,6 +1822,15 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest) if (dbKB > 0) ret["dbKB"] = dbKB; + std::string uptime; + int s = theApp->getLoadManager().getUptime(); + textTime(uptime, s, "year", 365*24*60*60); + textTime(uptime, s, "day", 24*60*60); + textTime(uptime, s, "hour", 24*60); + textTime(uptime, s, "minute", 60); + textTime(uptime, s, "second", 1); + ret["uptime"] = uptime; + return ret; }