diff --git a/src/cpp/database/SqliteDatabase.cpp b/src/cpp/database/SqliteDatabase.cpp index 35062d29e0..e9e70fed4a 100644 --- a/src/cpp/database/SqliteDatabase.cpp +++ b/src/cpp/database/SqliteDatabase.cpp @@ -223,7 +223,7 @@ void SqliteDatabase::doHook(const char *db, int pages) { walRunning = true; if (mWalQ) - mWalQ->addJob(jtWAL, boost::bind(&SqliteDatabase::runWal, this)); + mWalQ->addJob(jtWAL, std::string("WAL:") + db, boost::bind(&SqliteDatabase::runWal, this)); else boost::thread(boost::bind(&SqliteDatabase::runWal, this)).detach(); } diff --git a/src/cpp/ripple/Application.cpp b/src/cpp/ripple/Application.cpp index 3081fb3743..fac0ce76a6 100644 --- a/src/cpp/ripple/Application.cpp +++ b/src/cpp/ripple/Application.cpp @@ -45,7 +45,7 @@ Application::Application() : mSNTPClient(mAuxService), mRPCHandler(&mNetOps), mFeeTrack(), mRpcDB(NULL), mTxnDB(NULL), mLedgerDB(NULL), mWalletDB(NULL), mHashNodeDB(NULL), mNetNodeDB(NULL), mConnectionPool(mIOService), mPeerDoor(NULL), mRPCDoor(NULL), mWSPublicDoor(NULL), mWSPrivateDoor(NULL), - mSweepTimer(mAuxService) + mSweepTimer(mAuxService), mShutdown(false) { getRand(mNonce256.begin(), mNonce256.size()); getRand(reinterpret_cast(&mNonceST), sizeof(mNonceST)); @@ -58,6 +58,7 @@ bool Instance::running = true; void Application::stop() { cLog(lsINFO) << "Received shutdown request"; + mShutdown = true; mIOService.stop(); mHashedObjectStore.bulkWrite(); mValidations.flush(); diff --git a/src/cpp/ripple/Application.h b/src/cpp/ripple/Application.h index b8ef316499..cafa5b7c77 100644 --- a/src/cpp/ripple/Application.h +++ b/src/cpp/ripple/Application.h @@ -87,6 +87,8 @@ class Application std::map mPeerMap; boost::recursive_mutex mPeerMapLock; + volatile bool mShutdown; + void startNewLedger(); void loadOldLedger(const std::string&); @@ -140,6 +142,7 @@ public: uint256 getNonce256() { return mNonce256; } std::size_t getNonceST() { return mNonceST; } + bool isShutdown() { return mShutdown; } void setup(); void run(); void stop(); diff --git a/src/cpp/ripple/ConnectionPool.cpp b/src/cpp/ripple/ConnectionPool.cpp index 7bbfc9e158..3d184d4cda 100644 --- a/src/cpp/ripple/ConnectionPool.cpp +++ b/src/cpp/ripple/ConnectionPool.cpp @@ -235,14 +235,10 @@ void ConnectionPool::policyHandler(const boost::system::error_code& ecResult) int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& msg) { int sentTo = 0; - boost::recursive_mutex::scoped_lock sl(mPeerLock); - - BOOST_FOREACH(const vtConMap& pair, mConnectedMap) + std::vector peerVector = getPeerVector(); + BOOST_FOREACH(Peer::ref peer, peerVector) { - Peer::ref peer = pair.second; - if (!peer) - std::cerr << "CP::RM null peer in list" << std::endl; - else if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected()) + if ((!fromPeer || !(peer.get() == fromPeer)) && peer->isConnected()) { ++sentTo; peer->sendPacket(msg); @@ -254,11 +250,9 @@ int ConnectionPool::relayMessage(Peer* fromPeer, const PackedMessage::pointer& m void ConnectionPool::relayMessageBut(const std::set& fromPeers, const PackedMessage::pointer& msg) { // Relay message to all but the specified peers - boost::recursive_mutex::scoped_lock sl(mPeerLock); - - BOOST_FOREACH(const vtConMap& pair, mConnectedMap) + std::vector peerVector = getPeerVector(); + BOOST_FOREACH(Peer::ref peer, peerVector) { - Peer::ref peer = pair.second; if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) == 0)) peer->sendPacket(msg); } @@ -267,13 +261,11 @@ void ConnectionPool::relayMessageBut(const std::set& fromPeers, const Pa void ConnectionPool::relayMessageTo(const std::set& fromPeers, const PackedMessage::pointer& msg) { // Relay message to the specified peers - boost::recursive_mutex::scoped_lock sl(mPeerLock); - - BOOST_FOREACH(const uint64& peerID, fromPeers) + std::vector peerVector = getPeerVector(); + BOOST_FOREACH(Peer::ref peer, peerVector) { - const boost::unordered_map::iterator& it = mPeerIdMap.find(peerID); - if ((it != mPeerIdMap.end()) && it->second->isConnected()) - it->second->sendPacket(msg); + if (peer->isConnected() && (fromPeers.count(peer->getPeerId()) != 0)) + peer->sendPacket(msg); } } diff --git a/src/cpp/ripple/DBInit.cpp b/src/cpp/ripple/DBInit.cpp index bdc6c29b5e..fcf5a3a84a 100644 --- a/src/cpp/ripple/DBInit.cpp +++ b/src/cpp/ripple/DBInit.cpp @@ -20,7 +20,7 @@ const char *TxnDBInit[] = { TxnMeta BLOB \ );", "CREATE TABLE AccountTransactions ( \ - TransID CHARACTER(64), \ + TransID CHARACTER(64) PRIMARY KEY, \ Account CHARACTER(64), \ LedgerSeq BIGINT UNSIGNED \ );", diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp index 6dd34b6ef2..3515b5f55c 100644 --- a/src/cpp/ripple/HashedObject.cpp +++ b/src/cpp/ripple/HashedObject.cpp @@ -51,7 +51,8 @@ bool HashedObjectStore::store(HashedObjectType type, uint32 index, if (!mWritePending) { mWritePending = true; - theApp->getJobQueue().addJob(jtWRITE, boost::bind(&HashedObjectStore::bulkWrite, this)); + theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store", + boost::bind(&HashedObjectStore::bulkWrite, this)); } } // else diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index 3dc9386d0e..c673a45577 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -98,7 +98,7 @@ bool Job::operator<=(const Job& j) const return mJobIndex <= j.mJobIndex; } -void JobQueue::addJob(JobType type, const boost::function& jobFunc) +void JobQueue::addJob(JobType type, const std::string& name, const boost::function& jobFunc) { assert(type != jtINVALID); @@ -107,7 +107,7 @@ void JobQueue::addJob(JobType type, const boost::function& jobFunc) if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering assert(mThreadCount != 0); // do not add jobs to a queue with no threads - mJobSet.insert(Job(type, ++mLastJob, mJobLoads[type], jobFunc)); + mJobSet.insert(Job(type, name, ++mLastJob, mJobLoads[type], jobFunc)); ++mJobCounts[type]; mJobCond.notify_one(); } @@ -176,9 +176,9 @@ Json::Value JobQueue::getJson(int) if (count != 0) pri["per_second"] = static_cast(count); if (latencyPeak != 0) - pri["peak_latency"] = static_cast(latencyPeak); + pri["peak_time"] = static_cast(latencyPeak); if (latencyAvg != 0) - pri["avg_latency"] = static_cast(latencyAvg); + pri["avg_time"] = static_cast(latencyAvg); priorities.append(pri); } } diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 10dfbc7ddc..386ccf5ff4 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -54,6 +54,7 @@ protected: uint64 mJobIndex; boost::function mJob; LoadEvent::pointer mLoadMonitor; + std::string mName; public: @@ -62,12 +63,15 @@ public: Job(JobType type, uint64 index) : mType(type), mJobIndex(index) { ; } - Job(JobType type, uint64 index, LoadMonitor& lm, const boost::function& job) - : mType(type), mJobIndex(index), mJob(job) - { mLoadMonitor = boost::make_shared(boost::ref(lm), true, 1); } + Job(JobType type, const std::string& name, uint64 index, LoadMonitor& lm, const boost::function& job) + : mType(type), mJobIndex(index), mJob(job), mName(name) + { + mLoadMonitor = boost::make_shared(boost::ref(lm), false, 1); + } JobType getType() const { return mType; } - void doJob(void) { mJob(*this); } + void doJob(void) { mLoadMonitor->start(); mJob(*this); mLoadMonitor->setName(mName); } + void rename(const std::string& n) { mName = n; } bool operator<(const Job& j) const; bool operator>(const Job& j) const; @@ -97,7 +101,7 @@ public: JobQueue(); - void addJob(JobType type, const boost::function& job); + void addJob(JobType type, const std::string& name, const boost::function& job); int getJobCount(JobType t); // Jobs at this priority int getJobCountGE(JobType t); // All jobs at or greater than this priority diff --git a/src/cpp/ripple/Ledger.cpp b/src/cpp/ripple/Ledger.cpp index 0ca0b7d93c..e34b6d678a 100644 --- a/src/cpp/ripple/Ledger.cpp +++ b/src/cpp/ripple/Ledger.cpp @@ -465,7 +465,7 @@ void Ledger::saveAcceptedLedger(bool fromConsensus, LoadEvent::pointer event) if (!accts.empty()) { - std::string sql = "INSERT INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; + std::string sql = "INSERT OR REPLACE INTO AccountTransactions (TransID, Account, LedgerSeq) VALUES "; bool first = true; for (std::vector::const_iterator it = accts.begin(), end = accts.end(); it != end; ++it) { @@ -781,6 +781,45 @@ bool Ledger::getHashesByIndex(uint32 ledgerIndex, uint256& ledgerHash, uint256& #endif } +std::map< uint32, std::pair > Ledger::getHashesByIndex(uint32 minSeq, uint32 maxSeq) +{ +#ifndef NO_SQLITE_PREPARE + std::map< uint32, std::pair > ret; + DatabaseCon *con = theApp->getLedgerDB(); + ScopedLock sl(con->getDBLock()); + + static SqliteStatement pSt(con->getDB()->getSqliteDB(), + "SELECT LedgerSeq,LedgerHash,PrevHash FROM Ledgers INDEXED BY SeqLedger " + "WHERE LedgerSeq >= ? AND LedgerSeq <= ?;"); + + std::pair hashes; + + pSt.bind(1, minSeq); + pSt.bind(2, maxSeq); + + do + { + int r = pSt.step(); + if (pSt.isDone(r)) + { + pSt.reset(); + return ret; + } + if (!pSt.isRow(r)) + { + pSt.reset(); + return ret; + } + hashes.first.SetHex(pSt.peekString(1), true); + hashes.second.SetHex(pSt.peekString(2), true); + ret[pSt.getUInt32(0)] = hashes; + } while(1); + +#else +#error SQLite prepare is required +#endif +} + Ledger::pointer Ledger::getLastFullLedger() { try diff --git a/src/cpp/ripple/Ledger.h b/src/cpp/ripple/Ledger.h index 62de1c8195..34d88b8fd7 100644 --- a/src/cpp/ripple/Ledger.h +++ b/src/cpp/ripple/Ledger.h @@ -202,6 +202,7 @@ public: static Ledger::pointer loadByHash(const uint256& ledgerHash); static uint256 getHashByIndex(uint32 index); static bool getHashesByIndex(uint32 index, uint256& ledgerHash, uint256& parentHash); + static std::map< uint32, std::pair > getHashesByIndex(uint32 minSeq, uint32 maxSeq); void pendSave(bool fromConsensus); // next/prev function diff --git a/src/cpp/ripple/LedgerConsensus.cpp b/src/cpp/ripple/LedgerConsensus.cpp index b9816ede8f..623cd542fa 100644 --- a/src/cpp/ripple/LedgerConsensus.cpp +++ b/src/cpp/ripple/LedgerConsensus.cpp @@ -392,7 +392,7 @@ void LedgerConsensus::checkLCL() { case lcsPRE_CLOSE: status = "PreClose"; break; case lcsESTABLISH: status = "Establish"; break; - case lcsFINISHED: status = "Finised"; break; + case lcsFINISHED: status = "Finished"; break; case lcsACCEPTED: status = "Accepted"; break; default: status = "unknown"; } @@ -400,6 +400,7 @@ void LedgerConsensus::checkLCL() cLog(lsWARNING) << "View of consensus changed during " << status << " (" << netLgrCount << ") status=" << status << ", " << (mHaveCorrectLCL ? "CorrectLCL" : "IncorrectLCL"); cLog(lsWARNING) << mPrevLedgerHash << " to " << netLgr; + cLog(lsWARNING) << mPreviousLedger->getJson(0); if (sLog(lsDEBUG)) { diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index a3f35cdf26..ab87238da1 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -145,6 +145,8 @@ void LedgerMaster::asyncAccept(Ledger::pointer ledger) uint32 seq = ledger->getLedgerSeq(); uint256 prevHash = ledger->getParentHash(); + std::map< uint32, std::pair > ledgerHashes; + while (seq > 0) { { @@ -155,10 +157,20 @@ void LedgerMaster::asyncAccept(Ledger::pointer ledger) break; } - uint256 tHash, pHash; - if (!Ledger::getHashesByIndex(seq, tHash, pHash) || (tHash != prevHash)) + std::map< uint32, std::pair >::iterator it = ledgerHashes.find(seq); + if (it == ledgerHashes.end()) + { + if (theApp->isShutdown()) + return; + ledgerHashes = Ledger::getHashesByIndex((seq < 500) ? 0 : (seq - 499), seq); + it = ledgerHashes.find(seq); + if (it == ledgerHashes.end()) + break; + } + + if (it->second.first != prevHash) break; - prevHash = pHash; + prevHash = it->second.second; } resumeAcquiring(); @@ -174,7 +186,8 @@ bool LedgerMaster::acquireMissingLedger(Ledger::ref origLedger, const uint256& l { cLog(lsTRACE) << "Ledger hash found in database"; mTooFast = true; - theApp->getJobQueue().addJob(jtPUBOLDLEDGER, boost::bind(&LedgerMaster::asyncAccept, this, ledger)); + theApp->getJobQueue().addJob(jtPUBOLDLEDGER, "LedgerMaster::asyncAccept", + boost::bind(&LedgerMaster::asyncAccept, this, ledger)); return true; } @@ -528,7 +541,8 @@ void LedgerMaster::tryPublish() { theApp->getOPs().clearNeedNetworkLedger(); mPubThread = true; - theApp->getJobQueue().addJob(jtPUBLEDGER, boost::bind(&LedgerMaster::pubThread, this)); + theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread", + boost::bind(&LedgerMaster::pubThread, this)); } } diff --git a/src/cpp/ripple/LoadManager.cpp b/src/cpp/ripple/LoadManager.cpp index a476c37e19..dc5ac0891e 100644 --- a/src/cpp/ripple/LoadManager.cpp +++ b/src/cpp/ripple/LoadManager.cpp @@ -318,7 +318,10 @@ void LoadManager::threadEntry() bool change; if (theApp->getJobQueue().isOverloaded()) + { + cLog(lsINFO) << theApp->getJobQueue().getJson(0); change = theApp->getFeeTrack().raiseLocalFee(); + } else change = theApp->getFeeTrack().lowerLocalFee(); if (change) diff --git a/src/cpp/ripple/LoadMonitor.cpp b/src/cpp/ripple/LoadMonitor.cpp index 567d08c948..1302339d13 100644 --- a/src/cpp/ripple/LoadMonitor.cpp +++ b/src/cpp/ripple/LoadMonitor.cpp @@ -1,4 +1,7 @@ #include "LoadMonitor.h" +#include "Log.h" + +SETUP_LOG(); void LoadMonitor::update() { // call with the mutex @@ -52,8 +55,12 @@ void LoadMonitor::addLatency(int latency) mLatencyMSPeak = lp; } -void LoadMonitor::addCountAndLatency(int counts, int latency) +void LoadMonitor::addCountAndLatency(const std::string& name, int counts, int latency) { + if (latency > 1000) + { + cLog(lsWARNING) << "Job: " << name << " ExecutionTime: " << latency; + } if (latency == 1) latency = 0; boost::mutex::scoped_lock sl(mLock); diff --git a/src/cpp/ripple/LoadMonitor.h b/src/cpp/ripple/LoadMonitor.h index 18cbb228dd..4989f1e14d 100644 --- a/src/cpp/ripple/LoadMonitor.h +++ b/src/cpp/ripple/LoadMonitor.h @@ -32,7 +32,7 @@ public: void addCount(int counts); void addLatency(int latency); - void addCountAndLatency(int counts, int latency); + void addCountAndLatency(const std::string& name, int counts, int latency); void setTargetLatency(uint64 avg, uint64 pk) { @@ -60,6 +60,7 @@ protected: LoadMonitor& mMonitor; bool mRunning; int mCount; + std::string mName; boost::posix_time::ptime mStartTime; public: @@ -76,6 +77,11 @@ public: stop(); } + void setName(const std::string& name) + { + mName = name; + } + void start() { // okay to call if already started mRunning = true; @@ -86,7 +92,7 @@ public: { assert(mRunning); mRunning = false; - mMonitor.addCountAndLatency(mCount, + mMonitor.addCountAndLatency(mName, mCount, static_cast((boost::posix_time::microsec_clock::universal_time() - mStartTime).total_milliseconds())); } }; diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 61b2ded308..fd3ff143a4 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -426,30 +426,6 @@ Transaction::pointer NetworkOPs::findTransactionByID(const uint256& transactionI return Transaction::load(transactionID); } -#if 0 -int NetworkOPs::findTransactionsBySource(const uint256& uLedger, std::list& txns, - const RippleAddress& sourceAccount, uint32 minSeq, uint32 maxSeq) -{ - AccountState::pointer state = getAccountState(uLedger, sourceAccount); - if (!state) return 0; - if (minSeq > state->getSeq()) return 0; - if (maxSeq > state->getSeq()) maxSeq = state->getSeq(); - if (maxSeq > minSeq) return 0; - - int count = 0; - for(unsigned int i = minSeq; i <= maxSeq; ++i) - { - Transaction::pointer txn = Transaction::findFrom(sourceAccount, i); - if(txn) - { - txns.push_back(txn); - ++count; - } - } - return count; -} -#endif - int NetworkOPs::findTransactionsByDestination(std::list& txns, const RippleAddress& destinationAccount, uint32 startLedgerSeq, uint32 endLedgerSeq, int maxTransactions) { @@ -1401,7 +1377,7 @@ void NetworkOPs::reportFeeChange() (theApp->getFeeTrack().getLoadFactor() == mLastLoadFactor)) return; - theApp->getJobQueue().addJob(jtCLIENT, boost::bind(&NetworkOPs::pubServer, this)); + theApp->getJobQueue().addJob(jtCLIENT, "reportFeeChange->pubServer", boost::bind(&NetworkOPs::pubServer, this)); } Json::Value NetworkOPs::transJson(const SerializedTransaction& stTxn, TER terResult, bool bAccepted, Ledger::ref lpCurrent, const std::string& strType) diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 37e9bcb210..e8a8b88154 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -857,7 +857,7 @@ void Peer::recvTransaction(ripple::TMTransaction& packet) return; } - theApp->getJobQueue().addJob(jtTRANSACTION, + theApp->getJobQueue().addJob(jtTRANSACTION, "recvTransction->checkTransaction", boost::bind(&checkTransaction, _1, flags, stx, boost::weak_ptr(shared_from_this()))); #ifndef TRUST_NETWORK @@ -986,7 +986,7 @@ void Peer::recvPropose(const boost::shared_ptr& packet) prevLedger.isNonZero() ? prevLedger : consensusLCL, set.proposeseq(), proposeHash, set.closetime(), signerPublic, suppression); - theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, + theApp->getJobQueue().addJob(isTrusted ? jtPROPOSAL_t : jtPROPOSAL_ut, "recvPropose->checkPropose", boost::bind(&checkPropose, _1, packet, proposal, consensusLCL, mNodePublic, boost::weak_ptr(shared_from_this()))); } @@ -1062,7 +1062,7 @@ void Peer::recvValidation(const boost::shared_ptr& packet) } bool isTrusted = theApp->getUNL().nodeInUNL(val->getSignerPublic()); - theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, + theApp->getJobQueue().addJob(isTrusted ? jtVALIDATION_t : jtVALIDATION_ut, "recvValidation->checkValidation", boost::bind(&checkValidation, _1, val, signingHash, isTrusted, packet, boost::weak_ptr(shared_from_this()))); } @@ -1299,7 +1299,7 @@ void Peer::recvProofWork(ripple::TMProofWork& packet) return; } - theApp->getJobQueue().addJob(jtPROOFWORK, + theApp->getJobQueue().addJob(jtPROOFWORK, "recvProof->doProof", boost::bind(&Peer::doProofOfWork, _1, boost::weak_ptr(shared_from_this()), pow)); return; diff --git a/src/cpp/ripple/SHAMap.cpp b/src/cpp/ripple/SHAMap.cpp index 294cad85c7..77051f17ff 100644 --- a/src/cpp/ripple/SHAMap.cpp +++ b/src/cpp/ripple/SHAMap.cpp @@ -23,8 +23,14 @@ DECLARE_INSTANCE(SHAMapTreeNode); void SHAMapNode::setHash() const { - std::size_t h = theApp->getNonceST() + mDepth; + std::size_t h = theApp->getNonceST() + (mDepth * 0x9e3779b9); mHash = mNodeID.hash_combine(h); +#if 0 + const unsigned int *ptr = reinterpret_cast(mNodeID.begin()); + for (int i = (mDepth + 3) / 4; i != 0; --i) + boost::hash_combine(h, *ptr++); + mHash = h; +#endif } std::size_t hash_value(const SHAMapNode& mn) @@ -64,6 +70,7 @@ SHAMap::SHAMap(SHAMapType t, const uint256& hash) : mSeq(1), mState(smsSynching) SHAMap::pointer SHAMap::snapShot(bool isMutable) { // Return a new SHAMap that is an immutable snapshot of this one // Initially nodes are shared, but CoW is forced on both ledgers + boost::recursive_mutex::scoped_lock sl(mLock); SHAMap::pointer ret = boost::make_shared(mType); SHAMap& newMap = *ret; newMap.mSeq = ++mSeq; @@ -158,10 +165,11 @@ SHAMapTreeNode::pointer SHAMap::walkTo(const uint256& id, bool modify) while (!inNode->isLeaf()) { int branch = inNode->selectBranch(id); - if (inNode->isEmptyBranch(branch)) - return inNode; uint256 childHash = inNode->getChildHash(branch); + if (childHash.isZero()) + return inNode; + try { inNode = getNode(inNode->getChildNodeID(branch), childHash, false); @@ -205,7 +213,6 @@ SHAMapTreeNode::pointer SHAMap::getNode(const SHAMapNode& id, const uint256& has std::cerr << "ID: " << id << std::endl; std::cerr << "TgtHash " << hash << std::endl; std::cerr << "NodHash " << node->getNodeHash() << std::endl; - dump(); throw std::runtime_error("invalid node"); } #endif diff --git a/src/cpp/ripple/SHAMap.h b/src/cpp/ripple/SHAMap.h index 5e2eb8d093..ff42121f0f 100644 --- a/src/cpp/ripple/SHAMap.h +++ b/src/cpp/ripple/SHAMap.h @@ -43,20 +43,21 @@ public: SHAMapNode() : mDepth(0), mHash(0) { ; } SHAMapNode(int depth, const uint256& hash); - virtual ~SHAMapNode() { ; } int getDepth() const { return mDepth; } const uint256& getNodeID() const { return mNodeID; } - bool isValid() const { return (mDepth >= 0) && (mDepth < 64); } + bool isValid() const { return (mDepth >= 0) && (mDepth < 64); } + bool isRoot() const { return mDepth == 0; } size_t getHash() const { if (mHash == 0) setHash(); return mHash; } - virtual bool isPopulated() const { return false; } + virtual bool isPopulated() const { return false; } SHAMapNode getParentNodeID() const { assert(mDepth); return SHAMapNode(mDepth - 1, mNodeID); } + SHAMapNode getChildNodeID(int m) const; int selectBranch(const uint256& hash) const; @@ -68,7 +69,6 @@ public: bool operator!=(const uint256&) const; bool operator<=(const SHAMapNode&) const; bool operator>=(const SHAMapNode&) const; - bool isRoot() const { return mDepth == 0; } virtual std::string getString() const; void dump() const; diff --git a/src/cpp/ripple/SerializedTransaction.cpp b/src/cpp/ripple/SerializedTransaction.cpp index 7486e0329d..0aec1aacb8 100644 --- a/src/cpp/ripple/SerializedTransaction.cpp +++ b/src/cpp/ripple/SerializedTransaction.cpp @@ -233,7 +233,7 @@ std::string SerializedTransaction::getSQLInsertReplaceHeader() std::string SerializedTransaction::getMetaSQLInsertHeader() { - return "INSERT INTO Transactions " + getMetaSQLValueHeader() + " VALUES "; + return "INSERT OR REPLACE INTO Transactions " + getMetaSQLValueHeader() + " VALUES "; } std::string SerializedTransaction::getSQL(uint32 inLedger, char status) const diff --git a/src/cpp/ripple/Transaction.cpp b/src/cpp/ripple/Transaction.cpp index 4ae2a4636a..7d4369e745 100644 --- a/src/cpp/ripple/Transaction.cpp +++ b/src/cpp/ripple/Transaction.cpp @@ -249,16 +249,6 @@ Transaction::pointer Transaction::load(const uint256& id) return transactionFromSQL(sql); } -Transaction::pointer Transaction::findFrom(const RippleAddress& fromID, uint32 seq) -{ - std::string sql = "SELECT LedgerSeq,Status,RawTxn FROM Transactions WHERE FromID='"; - sql.append(fromID.humanAccountID()); - sql.append("' AND FromSeq='"); - sql.append(boost::lexical_cast(seq)); - sql.append("';"); - return transactionFromSQL(sql); -} - bool Transaction::convertToTransactions(uint32 firstLedgerSeq, uint32 secondLedgerSeq, bool checkFirstTransactions, bool checkSecondTransactions, const SHAMap::SHAMapDiff& inMap, std::map >& outMap) diff --git a/src/cpp/ripple/Transaction.h b/src/cpp/ripple/Transaction.h index d000dea5aa..0ec8eadba9 100644 --- a/src/cpp/ripple/Transaction.h +++ b/src/cpp/ripple/Transaction.h @@ -99,7 +99,6 @@ public: // database functions void save(); static Transaction::pointer load(const uint256& id); - static Transaction::pointer findFrom(const RippleAddress& fromID, uint32 seq); // conversion function static bool convertToTransactions(uint32 ourLedgerSeq, uint32 otherLedgerSeq, diff --git a/src/cpp/ripple/ValidationCollection.cpp b/src/cpp/ripple/ValidationCollection.cpp index cfdec56cd8..699dc8afb4 100644 --- a/src/cpp/ripple/ValidationCollection.cpp +++ b/src/cpp/ripple/ValidationCollection.cpp @@ -293,7 +293,8 @@ void ValidationCollection::condWrite() if (mWriting) return; mWriting = true; - theApp->getJobQueue().addJob(jtWRITE, boost::bind(&ValidationCollection::doWrite, this, _1)); + theApp->getJobQueue().addJob(jtWRITE, "ValidationCollection::doWrite", + boost::bind(&ValidationCollection::doWrite, this, _1)); } void ValidationCollection::doWrite(Job&) diff --git a/src/cpp/ripple/WSHandler.h b/src/cpp/ripple/WSHandler.h index b525859b94..81011b24b5 100644 --- a/src/cpp/ripple/WSHandler.h +++ b/src/cpp/ripple/WSHandler.h @@ -152,17 +152,17 @@ public: ptr->preDestroy(); // Must be done before we return // Must be done without holding the websocket send lock - theApp->getJobQueue().addJob(jtCLIENT, + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::destroy", boost::bind(&WSConnection::destroy, ptr)); } void on_message(connection_ptr cpClient, message_ptr mpMessage) { - theApp->getJobQueue().addJob(jtCLIENT, + theApp->getJobQueue().addJob(jtCLIENT, "WSClient::command", boost::bind(&WSServerHandler::do_message, this, _1, cpClient, mpMessage)); } - void do_message(Job&, connection_ptr cpClient, message_ptr mpMessage) + void do_message(Job& job, connection_ptr cpClient, message_ptr mpMessage) { Json::Value jvRequest; Json::Reader jrReader; @@ -190,6 +190,8 @@ public: } else { + if (jvRequest.isMember("command")) + job.rename(std::string("WSClient::") + jvRequest["command"].asString()); boost::shared_ptr< WSConnection > conn; { boost::mutex::scoped_lock sl(mMapLock);