diff --git a/src/cpp/ripple/HashedObject.cpp b/src/cpp/ripple/HashedObject.cpp index 5ff173307..03bb62415 100644 --- a/src/cpp/ripple/HashedObject.cpp +++ b/src/cpp/ripple/HashedObject.cpp @@ -86,14 +86,14 @@ bool HashedObjectStore::storeLevelDB(HashedObjectType type, uint32 index, { mWritePending = true; theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store", - BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this)); + BIND_TYPE(&HashedObjectStore::bulkWriteLevelDB, this, P_1)); } } mNegativeCache.del(hash); return true; } -void HashedObjectStore::bulkWriteLevelDB() +void HashedObjectStore::bulkWriteLevelDB(Job &) { assert(mLevelDB); int setSize = 0; @@ -205,7 +205,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index, { mWritePending = true; theApp->getJobQueue().addJob(jtWRITE, "HashedObject::store", - BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this)); + BIND_TYPE(&HashedObjectStore::bulkWriteSQLite, this, P_1)); } } // else @@ -214,7 +214,7 @@ bool HashedObjectStore::storeSQLite(HashedObjectType type, uint32 index, return true; } -void HashedObjectStore::bulkWriteSQLite() +void HashedObjectStore::bulkWriteSQLite(Job&) { assert(!mLevelDB); while (1) diff --git a/src/cpp/ripple/HashedObject.h b/src/cpp/ripple/HashedObject.h index 7c2ac27f1..c1df75b2c 100644 --- a/src/cpp/ripple/HashedObject.h +++ b/src/cpp/ripple/HashedObject.h @@ -15,6 +15,8 @@ DEFINE_INSTANCE(HashedObject); +class Job; + enum HashedObjectType { hotUNKNOWN = 0, @@ -91,13 +93,13 @@ public: bool storeSQLite(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash); HashedObject::pointer retrieveSQLite(const uint256& hash); - void bulkWriteSQLite(); + void bulkWriteSQLite(Job&); #ifdef USE_LEVELDB bool storeLevelDB(HashedObjectType type, uint32 index, const std::vector& data, const uint256& hash); HashedObject::pointer retrieveLevelDB(const uint256& hash); - void bulkWriteLevelDB(); + void bulkWriteLevelDB(Job&); #endif diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index df3f3b15c..91520c938 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -26,6 +26,22 @@ Ledger::ref LedgerMaster::getCurrentSnapshot() return mCurrentSnapshot; } +int LedgerMaster::getValidatedLedgerAge() +{ + if (!mValidLedger) + { + cLog(lsDEBUG) << "No validated ledger"; + return 999999; + } + + int64 ret = theApp->getOPs().getCloseTimeNC(); + ret -= static_cast(mValidLedger->getCloseTimeNC()); + ret = std::max(0LL, ret); + + cLog(lsTRACE) << "Validated ledger age is " << ret; + return static_cast(ret); +} + void LedgerMaster::addHeldTransaction(Transaction::ref transaction) { // returns true if transaction was added boost::recursive_mutex::scoped_lock ml(mLock); diff --git a/src/cpp/ripple/LedgerMaster.h b/src/cpp/ripple/LedgerMaster.h index 33b634493..37c3a9630 100644 --- a/src/cpp/ripple/LedgerMaster.h +++ b/src/cpp/ripple/LedgerMaster.h @@ -83,6 +83,7 @@ public: // The published ledger is the last fully validated ledger Ledger::ref getValidatedLedger() { return mPubLedger; } + int getValidatedLedgerAge(); TER doTransaction(SerializedTransaction::ref txn, TransactionEngineParams params, bool& didApply); diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 0e206bd72..3d6ec1519 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -49,6 +49,7 @@ std::string NetworkOPs::strOperatingMode() static const char* paStatusToken[] = { "disconnected", "connected", + "syncing", "tracking", "full" }; @@ -615,6 +616,12 @@ void NetworkOPs::checkState(const boost::system::error_code& result) cLog(lsINFO) << "Node count (" << peerList.size() << ") is sufficient."; } + // Check if the last validated ledger forces a change between these states + if (mMode == omSYNCING) + setMode(omSYNCING); + else if (mMode == omCONNECTED) + setMode(omCONNECTED); + if (!mConsensus) tryStartConsensus(); @@ -638,14 +645,14 @@ void NetworkOPs::tryStartConsensus() // there shouldn't be a newer LCL. We need this information to do the next three // tests. - if ((mMode == omCONNECTED) && !ledgerChange) + if (((mMode == omCONNECTED) || (mMode == omSYNCING)) && !ledgerChange) { // count number of peers that agree with us and UNL nodes whose validations we have for LCL // if the ledger is good enough, go to omTRACKING - TODO if (!mNeedNetworkLedger) setMode(omTRACKING); } - if ((mMode == omTRACKING) && !ledgerChange ) + if (((mMode == omCONNECTED) || (mMode == omTRACKING)) && !ledgerChange) { // check if the ledger is good enough to go to omFULL // Note: Do not go to omFULL if we don't have the previous ledger @@ -654,12 +661,6 @@ void NetworkOPs::tryStartConsensus() setMode(omFULL); } - if (mMode == omFULL) - { - // WRITEME - // check if the ledger is bad enough to go to omTRACKING - } - if ((!mConsensus) && (mMode != omDISCONNECTED)) beginConsensus(networkClosed, mLedgerMaster->getCurrentLedger()); } @@ -1036,7 +1037,21 @@ void NetworkOPs::pubServer() void NetworkOPs::setMode(OperatingMode om) { - if (mMode == om) return; + + if (om == omCONNECTED) + { + if (theApp->getLedgerMaster().getValidatedLedgerAge() < 60) + om = omSYNCING; + } + + if (om == omSYNCING) + { + if (theApp->getLedgerMaster().getValidatedLedgerAge() >= 60) + om = omCONNECTED; + } + + if (mMode == om) + return; if ((om >= omCONNECTED) && (mMode == omDISCONNECTED)) mConnectTime = boost::posix_time::second_clock::universal_time(); @@ -1398,7 +1413,7 @@ void NetworkOPs::pubLedger(Ledger::ref accepted) jvObj["txn_count"] = Json::UInt(alpAccepted->getTxnCount()); - if ((mMode == omFULL) || (mMode == omTRACKING)) + if (mMode >= omSYNCING) jvObj["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers(); NetworkOPs::subMapType::const_iterator it = mSubLedger.begin(); @@ -1731,7 +1746,7 @@ bool NetworkOPs::subLedger(InfoSub::ref isrListener, Json::Value& jvResult) jvResult["reserve_inc"] = Json::UInt(lpClosed->getReserveInc()); } - if (((mMode == omFULL) || (mMode == omTRACKING)) && !isNeedNetworkLedger()) + if ((mMode >= omSYNCING) && !isNeedNetworkLedger()) jvResult["validated_ledgers"] = theApp->getLedgerMaster().getCompleteLedgers(); boost::recursive_mutex::scoped_lock sl(mMonitorLock); diff --git a/src/cpp/ripple/NetworkOPs.h b/src/cpp/ripple/NetworkOPs.h index 45499c08e..76a6cd939 100644 --- a/src/cpp/ripple/NetworkOPs.h +++ b/src/cpp/ripple/NetworkOPs.h @@ -98,8 +98,9 @@ public: { // how we process transactions or account balance requests omDISCONNECTED = 0, // not ready to process requests omCONNECTED = 1, // convinced we are talking to the network - omTRACKING = 2, // convinced we agree with the network - omFULL = 3 // we have the ledger and can even validate + omSYNCING = 2, // fallen slightly behind + omTRACKING = 3, // convinced we agree with the network + omFULL = 4 // we have the ledger and can even validate }; typedef boost::unordered_map subMapType; diff --git a/src/cpp/ripple/Peer.cpp b/src/cpp/ripple/Peer.cpp index 3c557b567..3cd739ba3 100644 --- a/src/cpp/ripple/Peer.cpp +++ b/src/cpp/ripple/Peer.cpp @@ -512,7 +512,7 @@ void Peer::processReadBuffer() ripple::TMGetPeers msg; if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) - recvGetPeers(msg); + recvGetPeers(msg, sl); else cLog(lsWARNING) << "parse error: " << type; } @@ -568,7 +568,7 @@ void Peer::processReadBuffer() event->reName("Peer::transaction"); ripple::TMTransaction msg; if (msg.ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) - recvTransaction(msg); + recvTransaction(msg, sl); else cLog(lsWARNING) << "parse error: " << type; } @@ -634,7 +634,7 @@ void Peer::processReadBuffer() event->reName("Peer::validation"); boost::shared_ptr msg = boost::make_shared(); if (msg->ParseFromArray(&mReadbuf[HEADER_SIZE], mReadbuf.size() - HEADER_SIZE)) - recvValidation(msg); + recvValidation(msg, sl); else cLog(lsWARNING) << "parse error: " << type; } @@ -864,15 +864,15 @@ static void checkTransaction(Job&, int flags, SerializedTransaction::pointer stx #endif } -void Peer::recvTransaction(ripple::TMTransaction& packet) +void Peer::recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder) { + MasterLockHolder.unlock(); Transaction::pointer tx; #ifndef TRUST_NETWORK try { #endif - std::string rawTx = packet.rawtransaction(); - Serializer s(rawTx); + Serializer s(packet.rawtransaction()); SerializerIterator sit(s); SerializedTransaction::pointer stx = boost::make_shared(boost::ref(sit)); @@ -1084,8 +1084,9 @@ static void checkValidation(Job&, SerializedValidation::pointer val, uint256 sig #endif } -void Peer::recvValidation(const boost::shared_ptr& packet) +void Peer::recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder) { + MasterLockHolder.unlock(); if (packet->validation().size() < 50) { cLog(lsWARNING) << "Too small validation from peer"; @@ -1137,8 +1138,9 @@ void Peer::recvGetContacts(ripple::TMGetContacts& packet) // Return a list of your favorite people // TODO: filter out all the LAN peers // TODO: filter out the peer you are talking to -void Peer::recvGetPeers(ripple::TMGetPeers& packet) +void Peer::recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder) { + MasterLockHolder.unlock(); std::vector addrs; theApp->getConnectionPool().getTopNAddrs(30, addrs); diff --git a/src/cpp/ripple/Peer.h b/src/cpp/ripple/Peer.h index cfd39595a..67279a10f 100644 --- a/src/cpp/ripple/Peer.h +++ b/src/cpp/ripple/Peer.h @@ -87,12 +87,12 @@ protected: void sendHello(); void recvHello(ripple::TMHello& packet); - void recvTransaction(ripple::TMTransaction& packet); - void recvValidation(const boost::shared_ptr& packet); + void recvTransaction(ripple::TMTransaction& packet, ScopedLock& MasterLockHolder); + void recvValidation(const boost::shared_ptr& packet, ScopedLock& MasterLockHolder); void recvGetValidation(ripple::TMGetValidations& packet); void recvContact(ripple::TMContact& packet); void recvGetContacts(ripple::TMGetContacts& packet); - void recvGetPeers(ripple::TMGetPeers& packet); + void recvGetPeers(ripple::TMGetPeers& packet, ScopedLock& MasterLockHolder); void recvPeers(ripple::TMPeers& packet); void recvGetObjectByHash(const boost::shared_ptr& packet); void recvPing(ripple::TMPing& packet); diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index cf94b0099..a350ae65c 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -2618,11 +2618,13 @@ Json::Value RPCHandler::lookupLedger(Json::Value jvRequest, Ledger::pointer& lpL case LEDGER_CLOSED: lpLedger = theApp->getLedgerMaster().getClosedLedger(); iLedgerIndex = lpLedger->getLedgerSeq(); + assert(lpLedger->isImmutable() && lpLedger->isClosed()); break; case LEDGER_VALIDATED: lpLedger = mNetOps->getValidatedLedger(); iLedgerIndex = lpLedger->getLedgerSeq(); + assert(lpLedger->isImmutable() && lpLedger->isClosed()); break; } @@ -3543,9 +3545,7 @@ Json::Value RPCHandler::doCommand(const Json::Value& jvRequest, int iRole, int & ScopedLock MasterLockHolder(theApp->getMasterLock()); - if (commandsA[i].iOptions & optNetwork - && mNetOps->getOperatingMode() != NetworkOPs::omTRACKING - && mNetOps->getOperatingMode() != NetworkOPs::omFULL) + if ((commandsA[i].iOptions & optNetwork) && (mNetOps->getOperatingMode() < NetworkOPs::omSYNCING)) { cLog(lsINFO) << "Insufficient network mode for RPC: " << mNetOps->strOperatingMode(); diff --git a/src/cpp/ripple/SHAMapSync.h b/src/cpp/ripple/SHAMapSync.h index ea7caee9d..60e55a139 100644 --- a/src/cpp/ripple/SHAMapSync.h +++ b/src/cpp/ripple/SHAMapSync.h @@ -1,5 +1,5 @@ -#ifndef __SHAMAPYSNC__ -#define __SHAMAPSYNC__ +#ifndef SHAMAPSYNC_H +#define SHAMAPSYNC_H #include "SHAMap.h" #include "Application.h"