Track uptime. Update local fee schedule based on load manager.

This commit is contained in:
JoelKatz
2013-02-05 21:33:42 -08:00
parent 1b2c0b9242
commit 5aa810404b
8 changed files with 128 additions and 3 deletions

View File

@@ -52,6 +52,7 @@ Application::Application() :
mJobQueue.setThreadCount();
mSweepTimer.expires_from_now(boost::posix_time::seconds(10));
mSweepTimer.async_wait(boost::bind(&Application::sweep, this));
mLoadMgr.init();
}
extern const char *RpcDBInit[], *TxnDBInit[], *LedgerDBInit[], *WalletDBInit[], *HashNodeDBInit[], *NetNodeDBInit[];

View File

@@ -181,6 +181,16 @@ Json::Value JobQueue::getJson(int)
return ret;
}
int JobQueue::isOverloaded()
{
int count = 0;
boost::mutex::scoped_lock sl(mJobLock);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (mJobLoads[i].isOver())
++count;
return count;
}
void JobQueue::shutdown()
{ // shut down the job queue without completing pending jobs
cLog(lsINFO) << "Job queue shutting down";

View File

@@ -108,6 +108,7 @@ public:
LoadEvent::autoptr getLoadEventAP(JobType t)
{ return LoadEvent::autoptr(new LoadEvent(mJobLoads[t], true, 1)); }
int isOverloaded();
Json::Value getJson(int c = 0);
};

View File

@@ -1,15 +1,18 @@
#include "LoadManager.h"
#include <boost/test/unit_test.hpp>
#include <boost/thread.hpp>
#include <boost/date_time/posix_time/posix_time.hpp>
#include "Log.h"
#include "Config.h"
#include "Application.h"
SETUP_LOG();
LoadManager::LoadManager(int creditRate, int creditLimit, int debitWarn, int debitLimit) :
mCreditRate(creditRate), mCreditLimit(creditLimit), mDebitWarn(debitWarn), mDebitLimit(debitLimit),
mCosts(LT_MAX)
mShutdown(false), mUptime(0), mCosts(LT_MAX)
{
addLoadCost(LoadCost(LT_InvalidRequest, 10, LC_CPU | LC_Network));
addLoadCost(LoadCost(LT_RequestNoReply, 1, LC_CPU | LC_Disk));
@@ -25,6 +28,30 @@ LoadManager::LoadManager(int creditRate, int creditLimit, int debitWarn, int deb
addLoadCost(LoadCost(LT_CheapQuery, 1, LC_CPU));
}
void LoadManager::init()
{
boost::thread(boost::bind(&LoadManager::threadEntry, this)).detach();
}
LoadManager::~LoadManager()
{
{
boost::mutex::scoped_lock sl(mLock);
mShutdown = true;
}
do
{
boost::this_thread::sleep(boost::posix_time::milliseconds(100));
{
boost::mutex::scoped_lock sl(mLock);
if (!mShutdown)
return;
}
}
while (1);
}
int LoadManager::getCreditRate() const
{
@@ -239,6 +266,45 @@ Json::Value LoadFeeTrack::getJson(uint64 baseFee, uint32 referenceFeeUnits)
return j;
}
int LoadManager::getUptime()
{
boost::mutex::scoped_lock sl(mLock);
return mUptime;
}
void LoadManager::threadEntry()
{
boost::posix_time::ptime t = boost::posix_time::microsec_clock::universal_time();
while (1)
{
{
boost::mutex::scoped_lock sl(mLock);
if (mShutdown)
{
mShutdown = false;
return;
}
++mUptime;
}
if (theApp->getJobQueue().isOverloaded())
theApp->getFeeTrack().raiseLocalFee();
else
theApp->getFeeTrack().lowerLocalFee();
t += boost::posix_time::seconds(1);
boost::posix_time::time_duration when = t - boost::posix_time::microsec_clock::universal_time();
if (when.is_negative())
{
cLog(lsWARNING) << "Backwards time jump";
t = boost::posix_time::microsec_clock::universal_time();
}
else
boost::this_thread::sleep(when);
}
}
BOOST_AUTO_TEST_SUITE(LoadManager_test)
BOOST_AUTO_TEST_CASE(LoadFeeTrack_test)

View File

@@ -86,6 +86,9 @@ protected:
int mDebitWarn; // when a source drops below this, we warn
int mDebitLimit; // when a source drops below this, we cut it off (should be negative)
bool mShutdown;
int mUptime;
mutable boost::mutex mLock;
void canonicalize(LoadSource&, const time_t now) const;
@@ -94,9 +97,13 @@ protected:
void addLoadCost(const LoadCost& c) { mCosts[static_cast<int>(c.mType)] = c; }
void threadEntry();
public:
LoadManager(int creditRate = 10, int creditLimit = 50, int debitWarn = -50, int debitLimit = -100);
~LoadManager();
void init();
int getCreditRate() const;
int getCreditLimit() const;
@@ -113,6 +120,7 @@ public:
bool adjust(LoadSource&, LoadType l) const;
int getCost(LoadType t) { return mCosts[static_cast<int>(t)].mCost; }
int getUptime();
};
class LoadFeeTrack

View File

@@ -69,6 +69,18 @@ void LoadMonitor::addCountAndLatency(int counts, int latency)
mLatencyMSPeak = lp;
}
bool LoadMonitor::isOver()
{
boost::mutex::scoped_lock sl(mLock);
update();
if (mLatencyEvents == 0)
return 0;
return isOverTarget(mLatencyMSAvg / (mLatencyEvents * 4), mLatencyMSPeak / (mLatencyEvents * 4));
}
void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver)
{
boost::mutex::scoped_lock sl(mLock);
@@ -89,3 +101,5 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64&
}
isOver = isOverTarget(latencyAvg, latencyPeak);
}
// vim:ts=4

View File

@@ -35,8 +35,8 @@ public:
void setTargetLatency(uint64 avg, uint64 pk)
{
mTargetLatencyAvg = avg * 4;
mTargetLatencyPk = pk * 4;
mTargetLatencyAvg = avg;
mTargetLatencyPk = pk;
}
bool isOverTarget(uint64 avg, uint64 peak)
@@ -46,6 +46,7 @@ public:
}
void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak, bool& isOver);
bool isOver();
};
class LoadEvent

View File

@@ -1786,6 +1786,21 @@ Json::Value RPCHandler::doLogin(Json::Value jvRequest)
}
#endif
static void textTime(std::string& text, int& seconds, const char *unitName, int unitVal)
{
int i = seconds / unitVal;
if (i == 0)
return;
seconds -= unitVal * i;
if (!text.empty())
text += ", ";
text += boost::lexical_cast<std::string>(i);
text += " ";
text += unitName;
if (i > 1)
text += "s";
}
// {
// min_count: <number> // optional, defaults to 10
// }
@@ -1807,6 +1822,15 @@ Json::Value RPCHandler::doGetCounts(Json::Value jvRequest)
if (dbKB > 0)
ret["dbKB"] = dbKB;
std::string uptime;
int s = theApp->getLoadManager().getUptime();
textTime(uptime, s, "year", 365*24*60*60);
textTime(uptime, s, "day", 24*60*60);
textTime(uptime, s, "hour", 24*60);
textTime(uptime, s, "minute", 60);
textTime(uptime, s, "second", 1);
ret["uptime"] = uptime;
return ret;
}