diff --git a/ripple2010.vcxproj b/ripple2010.vcxproj
index 374a65bd6..7867ca8f9 100644
--- a/ripple2010.vcxproj
+++ b/ripple2010.vcxproj
@@ -122,6 +122,7 @@
+
diff --git a/ripple2010.vcxproj.filters b/ripple2010.vcxproj.filters
index 9e03588ae..5650c2763 100644
--- a/ripple2010.vcxproj.filters
+++ b/ripple2010.vcxproj.filters
@@ -345,6 +345,9 @@
Source Files
+
+ Source Files
+
diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp
index aa5dc5661..7505a4c24 100644
--- a/src/cpp/ripple/HashedObject.cpp
+++ b/src/cpp/ripple/HashedObject.cpp
@@ -60,6 +60,7 @@ void HashedObjectStore::waitWrite()
void HashedObjectStore::bulkWrite()
{
+ LoadEvent::pointer event = theApp->getJobQueue().getLoadEvent(jtDISK);
while (1)
{
std::vector< boost::shared_ptr > set;
diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp
index cbb2bee83..a72e8ddae 100644
--- a/src/cpp/ripple/JobQueue.cpp
+++ b/src/cpp/ripple/JobQueue.cpp
@@ -1,6 +1,5 @@
#include "JobQueue.h"
-#include
#include
#include
#include
@@ -21,6 +20,9 @@ const char* Job::toString(JobType t)
case jtPROPOSAL_t: return "trustedProposal";
case jtADMIN: return "administration";
case jtDEATH: return "jobOfDeath";
+ case jtCLIENT: return "clientCommand";
+ case jtPEER: return "peerCommand";
+ case jtDISK: return "diskAccess";
default: assert(false); return "unknown";
}
}
@@ -68,7 +70,7 @@ void JobQueue::addJob(JobType type, const boost::function& jobFunc)
boost::mutex::scoped_lock sl(mJobLock);
assert(mThreadCount != 0); // do not add jobs to a queue with no threads
- mJobSet.insert(Job(type, ++mLastJob, jobFunc));
+ mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc));
++mJobCounts[type];
mJobCond.notify_one();
}
@@ -108,6 +110,43 @@ std::vector< std::pair > JobQueue::getJobCounts()
return ret;
}
+Json::Value JobQueue::getJson(int)
+{
+ Json::Value ret(Json::objectValue);
+ boost::mutex::scoped_lock sl(mJobLock);
+
+ ret["threads"] = mThreadCount;
+
+ Json::Value priorities = Json::arrayValue;
+ for (int i = 0; i < NUM_JOB_TYPES; ++i)
+ {
+ uint64 count, latencyAvg, latencyPeak, jobCount;
+ mJobLoads[i].getCountAndLatency(count, latencyAvg, latencyPeak);
+ std::map::iterator it = mJobCounts.find(static_cast(i));
+ if (it == mJobCounts.end())
+ jobCount = 0;
+ else
+ jobCount = it->second;
+ if ((count != 0) || (jobCount != 0) || (latencyPeak != 0))
+ {
+ Json::Value pri(Json::objectValue);
+ pri["job_type"] = Job::toString(static_cast(i));
+ if (jobCount != 0)
+ pri["waiting"] = static_cast(jobCount);
+ if (count != 0)
+ pri["per_second"] = static_cast(count);
+ if (latencyPeak != 0)
+ pri["peak_latency"] = static_cast(latencyPeak);
+ if (latencyAvg != 0)
+ pri["avg_latency"] = static_cast(latencyAvg);
+ priorities.append(pri);
+ }
+ }
+ ret["job_types"] = priorities;
+
+ return ret;
+}
+
void JobQueue::shutdown()
{ // shut down the job queue without completing pending jobs
cLog(lsINFO) << "Job queue shutting down";
diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h
index bdd23bc69..339804d20 100644
--- a/src/cpp/ripple/JobQueue.h
+++ b/src/cpp/ripple/JobQueue.h
@@ -8,26 +8,36 @@
#include
#include
#include
+#include
+
+#include "../json/value.h"
#include "types.h"
+#include "LoadMonitor.h"
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{ // must be in priority order, low to high
- jtINVALID,
- jtVALIDATION_ut, // A validation from an untrusted source
- jtCLIENTOP_ut, // A client operation from a non-local/untrusted source
- jtTRANSACTION, // A transaction received from the network
- jtPROPOSAL_ut, // A proposal from an untrusted source
- jtCLIENTOP_t, // A client operation from a trusted source
- jtVALIDATION_t, // A validation from a trusted source
- jtTRANSACTION_l, // A local transaction
- jtPROPOSAL_t, // A proposal from a trusted source
- jtADMIN, // An administrative operation
- jtDEATH, // job of death, used internally
+ jtINVALID = -1,
+ jtVALIDATION_ut = 0, // A validation from an untrusted source
+ jtCLIENTOP_ut = 1, // A client operation from a non-local/untrusted source
+ jtTRANSACTION = 2, // A transaction received from the network
+ jtPROPOSAL_ut = 3, // A proposal from an untrusted source
+ jtCLIENTOP_t = 4, // A client operation from a trusted source
+ jtVALIDATION_t = 5, // A validation from a trusted source
+ jtTRANSACTION_l = 6, // A local transaction
+ jtPROPOSAL_t = 7, // A proposal from a trusted source
+ jtADMIN = 8, // An administrative operation
+ jtDEATH = 9, // job of death, used internally
+
+// special types not dispatched by the job pool
+ jtCLIENT = 10,
+ jtPEER = 11,
+ jtDISK = 12,
};
+#define NUM_JOB_TYPES 16
class Job
{
@@ -35,13 +45,18 @@ protected:
JobType mType;
uint64 mJobIndex;
boost::function mJob;
+ LoadEvent::pointer mLoadMonitor;
public:
- Job() : mType(jtINVALID), mJobIndex(0) { ; }
- Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; }
- Job(JobType type, uint64 index, const boost::function& job)
- : mType(type), mJobIndex(index), mJob(job) { ; }
+ Job() : mType(jtINVALID), mJobIndex(0) { ; }
+
+ Job(JobType type, uint64 index) : mType(type), mJobIndex(index)
+ { ; }
+
+ Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function& job)
+ : mType(type), mJobIndex(index), mJob(job)
+ { mLoadMonitor = boost::make_shared(boost::ref(lm), true, 1); }
JobType getType() const { return mType; }
void doJob(void) { mJob(*this); }
@@ -57,14 +72,15 @@ public:
class JobQueue
{
protected:
- boost::mutex mJobLock;
- boost::condition_variable mJobCond;
+ boost::mutex mJobLock;
+ boost::condition_variable mJobCond;
- uint64 mLastJob;
- std::set mJobSet;
- std::map mJobCounts;
- int mThreadCount;
- bool mShuttingDown;
+ uint64 mLastJob;
+ std::set mJobSet;
+ std::map mJobCounts;
+ LoadMonitor mJobLoads[NUM_JOB_TYPES];
+ int mThreadCount;
+ bool mShuttingDown;
void threadEntry(void);
@@ -81,6 +97,11 @@ public:
void shutdown();
void setThreadCount(int c = 0);
+
+ LoadEvent::pointer getLoadEvent(JobType t)
+ { return boost::make_shared(boost::ref(mJobLoads[t]), true, 1); }
+
+ Json::Value getJson(int c = 0);
};
#endif
diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp
index 03e0a48d1..8f6d87e79 100644
--- a/src/cpp/ripple/Ledger.cpp
+++ b/src/cpp/ripple/Ledger.cpp
@@ -341,6 +341,7 @@ uint256 Ledger::getHash()
void Ledger::saveAcceptedLedger(bool fromConsensus)
{ // can be called in a different thread
+ LoadEvent::pointer event = theApp->getJobQueue().getLoadEvent(jtDISK);
cLog(lsTRACE) << "saveAcceptedLedger " << (fromConsensus ? "fromConsensus " : "fromAcquire ") << getLedgerSeq();
static boost::format ledgerExists("SELECT LedgerSeq FROM Ledgers where LedgerSeq = %d;");
static boost::format deleteLedger("DELETE FROM Ledgers WHERE LedgerSeq = %d;");
diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp
index 76e23ccf4..2011035ae 100644
--- a/src/cpp/ripple/LoadMonitor.cpp
+++ b/src/cpp/ripple/LoadMonitor.cpp
@@ -1,6 +1,6 @@
#include "LoadMonitor.h"
-void LoadMonitor::LoadMonitor::update()
+void LoadMonitor::update()
{ // call with the mutex
time_t now = time(NULL);
@@ -11,7 +11,8 @@ void LoadMonitor::LoadMonitor::update()
{ // way out of date
mCounts = 0;
mLatencyEvents = 0;
- mLatencyMS = 0;
+ mLatencyMSAvg = 0;
+ mLatencyMSPeak = 0;
mLastUpdate = now;
return;
}
@@ -19,9 +20,10 @@ void LoadMonitor::LoadMonitor::update()
do
{ // do exponential decay
++mLastUpdate;
- mCounts -= (mCounts / 4);
- mLatencyEvents -= (mLatencyEvents / 4);
- mLatencyMS -= (mLatencyMS / 4);
+ mCounts -= ((mCounts + 3) / 4);
+ mLatencyEvents -= ((mLatencyEvents + 3) / 4);
+ mLatencyMSAvg -= (mLatencyMSAvg / 4);
+ mLatencyMSPeak -= (mLatencyMSPeak / 4);
} while (mLastUpdate < now);
}
@@ -35,24 +37,39 @@ void LoadMonitor::addCount(int counts)
void LoadMonitor::addLatency(int latency)
{
+ if (latency == 1)
+ latency = 0;
boost::mutex::scoped_lock sl(mLock);
update();
+
++mLatencyEvents;
- mLatencyMS += latency;
+ mLatencyMSAvg += latency;
+ mLatencyMSPeak += latency;
+
+ int lp = mLatencyEvents * latency * 4;
+ if (mLatencyMSPeak < lp)
+ mLatencyMSPeak = lp;
}
void LoadMonitor::addCountAndLatency(int counts, int latency)
{
+ if (latency == 1)
+ latency = 0;
boost::mutex::scoped_lock sl(mLock);
update();
mCounts += counts;
++mLatencyEvents;
- mLatencyMS += latency;
+ mLatencyMSAvg += latency;
+ mLatencyMSPeak += latency;
+
+ int lp = mLatencyEvents * latency * 4;
+ if (mLatencyMSPeak < lp)
+ mLatencyMSPeak = lp;
}
-void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency)
+void LoadMonitor::getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak)
{
boost::mutex::scoped_lock sl(mLock);
@@ -61,6 +78,13 @@ void LoadMonitor::getCountAndLatency(uint64& count, uint64& latency)
count = mCounts / 4;
if (mLatencyEvents == 0)
- latency = 0;
- else latency = mLatencyMS / (mLatencyEvents * 4);
+ {
+ latencyAvg = 0;
+ latencyPeak = 0;
+ }
+ else
+ {
+ latencyAvg = mLatencyMSAvg / (mLatencyEvents * 4);
+ latencyPeak = mLatencyMSPeak / (mLatencyEvents * 4);
+ }
}
diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h
index 17d7cf8a5..08c2afbf2 100644
--- a/src/cpp/ripple/LoadMonitor.h
+++ b/src/cpp/ripple/LoadMonitor.h
@@ -4,6 +4,7 @@
#include
#include
+#include
#include "types.h"
@@ -12,32 +13,31 @@
class LoadMonitor
{
protected:
- std::string mName;
uint64 mCounts;
uint64 mLatencyEvents;
- uint64 mLatencyMS;
+ uint64 mLatencyMSAvg;
+ uint64 mLatencyMSPeak;
time_t mLastUpdate;
boost::mutex mLock;
void update();
public:
- LoadMonitor(const std::string& n) : mName(n), mCounts(0), mLatencyEvents(0), mLatencyMS(0)
+ LoadMonitor() : mCounts(0), mLatencyEvents(0), mLatencyMSAvg(0), mLatencyMSPeak(0)
{ mLastUpdate = time(NULL); }
- void setName(const std::string& n) { mName = n; }
-
- const std::string& getName() const { return mName; }
-
void addCount(int counts);
void addLatency(int latency);
void addCountAndLatency(int counts, int latency);
- void getCountAndLatency(uint64& count, uint64& latency);
+ void getCountAndLatency(uint64& count, uint64& latencyAvg, uint64& latencyPeak);
};
class LoadEvent
{
+public:
+ typedef boost::shared_ptr pointer;
+
protected:
LoadMonitor& mMonitor;
bool mRunning;
diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp
index 89a5d77e7..ed4847f9b 100644
--- a/src/cpp/ripple/NetworkOPs.cpp
+++ b/src/cpp/ripple/NetworkOPs.cpp
@@ -957,22 +957,7 @@ Json::Value NetworkOPs::getServerInfo()
if (mConsensus)
info["consensus"] = mConsensus->getJson();
- typedef std::pair jt_int_pair;
- bool anyJobs = false;
- Json::Value jobs = Json::arrayValue;
- std::vector< std::pair > jobCounts = theApp->getJobQueue().getJobCounts();
- BOOST_FOREACH(jt_int_pair& it, jobCounts)
- {
- if (it.second != 0)
- {
- Json::Value o = Json::objectValue;
- o[Job::toString(it.first)] = it.second;
- jobs.append(o);
- anyJobs = true;
- }
- }
- if (anyJobs)
- info["jobs"] = jobs;
+ info["load"] = theApp->getJobQueue().getJson();
return info;
}
diff --git a/src/cpp/ripple/PaymentTransactor.cpp b/src/cpp/ripple/PaymentTransactor.cpp
index 7577341cb..e2e6bceed 100644
--- a/src/cpp/ripple/PaymentTransactor.cpp
+++ b/src/cpp/ripple/PaymentTransactor.cpp
@@ -4,12 +4,15 @@
#define RIPPLE_PATHS_MAX 3
-// TODO: only have the higher fee if the account doesn't in fact exist
+// only have the higher fee if the account doesn't in fact exist
void PaymentTransactor::calculateFee()
{
if (mTxn.getFlags() & tfCreateAccount)
{
- mFeeDue = theConfig.FEE_ACCOUNT_CREATE;
+ const uint160 uDstAccountID = mTxn.getFieldAccount160(sfDestination);
+ SLE::pointer sleDst = mEngine->entryCache(ltACCOUNT_ROOT, Ledger::getAccountRootIndex(uDstAccountID));
+ if(!sleDst) mFeeDue = theConfig.FEE_ACCOUNT_CREATE;
+ else Transactor::calculateFee();
}else Transactor::calculateFee();
}
@@ -146,7 +149,13 @@ TER PaymentTransactor::doApply()
else
{
mTxnAccount->setFieldAmount(sfBalance, saSrcXRPBalance - saDstAmount);
- sleDst->setFieldAmount(sfBalance, sleDst->getFieldAmount(sfBalance) + saDstAmount);
+ // re-arm the password change fee if we can and need to
+ if ( (sleDst->getFlags() & lsfPasswordSpent) &&
+ (saDstAmount > theConfig.FEE_DEFAULT) )
+ {
+ sleDst->setFieldAmount(sfBalance, sleDst->getFieldAmount(sfBalance) + saDstAmount-theConfig.FEE_DEFAULT);
+ sleDst->clearFlag(lsfPasswordSpent);
+ }else sleDst->setFieldAmount(sfBalance, sleDst->getFieldAmount(sfBalance) + saDstAmount);
terResult = tesSUCCESS;
}
diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp
index f364df838..76e396b3e 100644
--- a/src/cpp/ripple/Peer.cpp
+++ b/src/cpp/ripple/Peer.cpp
@@ -373,8 +373,11 @@ void Peer::processReadBuffer()
// std::cerr << "Peer::processReadBuffer: " << mIpPort.first << " " << mIpPort.second << std::endl;
- // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
+ LoadEvent::pointer event = theApp->getJobQueue().getLoadEvent(jtPEER);
+
boost::recursive_mutex::scoped_lock sl(theApp->getMasterLock());
+
+ // If connected and get a mtHELLO or if not connected and get a non-mtHELLO, wrong message was sent.
if (mHelloed == (type == ripple::mtHELLO))
{
cLog(lsWARNING) << "Wrong message type: " << type;
diff --git a/src/cpp/ripple/RegularKeySetTransactor.cpp b/src/cpp/ripple/RegularKeySetTransactor.cpp
index eb89bed28..f74b8f5a9 100644
--- a/src/cpp/ripple/RegularKeySetTransactor.cpp
+++ b/src/cpp/ripple/RegularKeySetTransactor.cpp
@@ -4,43 +4,30 @@
SETUP_LOG();
-// TODO:
-TER RegularKeySetTransactor::checkSig()
-{
- // Transaction's signing public key must be for the source account.
- // To prove the master private key made this transaction.
- if (mSigningPubKey.getAccountID() != mTxnAccountID)
- {
- // Signing Pub Key must be for Source Account ID.
- cLog(lsWARNING) << "sourceAccountID: " << mSigningPubKey.humanAccountID();
- cLog(lsWARNING) << "txn accountID: " << mTxn.getSourceAccount().humanAccountID();
- return temBAD_SET_ID;
- }
- return tesSUCCESS;
-}
-
-// TODO: this should be default fee if flag isn't set
void RegularKeySetTransactor::calculateFee()
{
- mFeeDue = 0;
+ Transactor::calculateFee();
+
+ if ( !(mTxnAccount->getFlags() & lsfPasswordSpent) &&
+ (mSigningPubKey.getAccountID() == mTxnAccountID))
+ { // flag is armed and they signed with the right account
+
+ mSourceBalance = mTxnAccount->getFieldAmount(sfBalance);
+ if(mSourceBalance < mFeeDue) mFeeDue = 0;
+ }
}
-// TODO: change to take a fee if there is one there
TER RegularKeySetTransactor::doApply()
{
std::cerr << "doRegularKeySet>" << std::endl;
- if (mTxnAccount->getFlags() & lsfPasswordSpent)
+ if(mFeeDue.isZero())
{
- std::cerr << "doRegularKeySet: Delay transaction: Funds already spent." << std::endl;
-
- return terFUNDS_SPENT;
+ mTxnAccount->setFlag(lsfPasswordSpent);
}
- mTxnAccount->setFlag(lsfPasswordSpent);
-
uint160 uAuthKeyID=mTxn.getFieldAccount160(sfRegularKey);
mTxnAccount->setFieldAccount(sfRegularKey, uAuthKeyID);
diff --git a/src/cpp/ripple/RegularKeySetTransactor.h b/src/cpp/ripple/RegularKeySetTransactor.h
index a6df0b356..35d744c8f 100644
--- a/src/cpp/ripple/RegularKeySetTransactor.h
+++ b/src/cpp/ripple/RegularKeySetTransactor.h
@@ -6,6 +6,5 @@ class RegularKeySetTransactor : public Transactor
public:
RegularKeySetTransactor(const SerializedTransaction& txn,TransactionEngineParams params, TransactionEngine* engine) : Transactor(txn,params,engine) {}
TER checkFee();
- TER checkSig();
TER doApply();
};
diff --git a/src/cpp/ripple/ValidationCollection.cpp b/src/cpp/ripple/ValidationCollection.cpp
index acb6c2c4b..cab64a950 100644
--- a/src/cpp/ripple/ValidationCollection.cpp
+++ b/src/cpp/ripple/ValidationCollection.cpp
@@ -289,6 +289,7 @@ void ValidationCollection::condWrite()
void ValidationCollection::doWrite()
{
+ LoadEvent::pointer event = theApp->getJobQueue().getLoadEvent(jtDISK);
static boost::format insVal("INSERT INTO LedgerValidations "
"(LedgerHash,NodePubKey,Flags,SignTime,Signature) VALUES ('%s','%s','%u','%u',%s);");
diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h
index 43c9a9ce6..306647c5c 100644
--- a/src/cpp/ripple/WSHandler.h
+++ b/src/cpp/ripple/WSHandler.h
@@ -1,6 +1,8 @@
#ifndef __WSHANDLER__
#define __WSHANDLER__
+#include "Application.h"
+
class WSConnection;
// A single instance of this object is made.
@@ -87,10 +89,11 @@ public:
void on_message(connection_ptr cpClient, message_ptr mpMessage)
{
+ LoadEvent::pointer event = theApp->getJobQueue().getLoadEvent(jtCLIENT);
Json::Value jvRequest;
Json::Reader jrReader;
- cLog(lsDEBUG) << "Ws:: Receiving '" << mpMessage->get_payload() << "'";
+ cLog(lsDEBUG) << "Ws:: Receiving '" << mpMessage->get_payload() << "'";
if (mpMessage->get_opcode() != websocketpp::frame::opcode::TEXT)
{
diff --git a/test/send-test.js b/test/send-test.js
index 043855714..21d261c7a 100644
--- a/test/send-test.js
+++ b/test/send-test.js
@@ -15,13 +15,14 @@ var serverDelay = 1500;
buster.testRunner.timeout = 5000;
+
/*
-buster.testCase("Simple", {
+buster.testCase("Fee Changes", {
'setUp' : testutils.build_setup({no_server: true}), //
'tearDown' : testutils.build_teardown(),
- "simple." :
- function (done) { buster.assert(1);
+ "varying the fee for Payment" :
+ function (done) {
this.remote.transaction()
.payment('root', 'alice', "10000")
@@ -36,8 +37,8 @@ buster.testCase("Simple", {
}).submit();
}
- }); */
-
+ });
+ */
buster.testCase("Sending", {
'setUp' : testutils.build_setup(),
'tearDown' : testutils.build_teardown(),