diff --git a/modules/ripple_app/ledger/LedgerMaster.cpp b/modules/ripple_app/ledger/LedgerMaster.cpp index a775195ab3..64993b0dc4 100644 --- a/modules/ripple_app/ledger/LedgerMaster.cpp +++ b/modules/ripple_app/ledger/LedgerMaster.cpp @@ -503,10 +503,7 @@ void LedgerMaster::resumeAcquiring () if (nextLedger) acquireMissingLedger (nextLedger, nextLedger->getParentHash (), nextLedger->getLedgerSeq () - 1); else - { - mCompleteLedgers.clearValue (prevMissing); WriteLog (lsINFO, LedgerMaster) << "We have a gap at: " << prevMissing + 1; - } } } diff --git a/modules/ripple_app/ledger/ripple_InboundLedger.cpp b/modules/ripple_app/ledger/ripple_InboundLedger.cpp index c38ed45474..f1bcb5e106 100644 --- a/modules/ripple_app/ledger/ripple_InboundLedger.cpp +++ b/modules/ripple_app/ledger/ripple_InboundLedger.cpp @@ -13,7 +13,7 @@ SETUP_LOG (InboundLedger) #define LEDGER_TIMEOUT_AGGRESSIVE 6 // how many timeouts before we get aggressive InboundLedger::InboundLedger (uint256 const& hash, uint32 seq) - : PeerSet (hash, LEDGER_ACQUIRE_TIMEOUT) + : PeerSet (hash, LEDGER_ACQUIRE_TIMEOUT, false) , mHaveBase (false) , mHaveState (false) , mHaveTransactions (false) @@ -138,7 +138,7 @@ bool InboundLedger::tryLocal () return mComplete; } -void InboundLedger::onTimer (bool progress) +void InboundLedger::onTimer (bool progress, boost::recursive_mutex::scoped_lock&) { mRecentTXNodes.clear (); mRecentASNodes.clear (); @@ -844,7 +844,7 @@ std::vector InboundLedger::getNeededHashes () if (!mHaveTransactions) { TransactionStateSF filter (mLedger->getLedgerSeq ()); - std::vector v = mLedger->getNeededAccountStateHashes (4, &filter); + std::vector v = mLedger->getNeededTransactionHashes (4, &filter); BOOST_FOREACH (uint256 const & h, v) { ret.push_back (std::make_pair (protocol::TMGetObjectByHash::otTRANSACTION_NODE, h)); diff --git a/modules/ripple_app/ledger/ripple_InboundLedger.h b/modules/ripple_app/ledger/ripple_InboundLedger.h index 3988b51be8..d994252869 100644 --- a/modules/ripple_app/ledger/ripple_InboundLedger.h +++ b/modules/ripple_app/ledger/ripple_InboundLedger.h @@ -92,7 +92,7 @@ public: private: void done (); - void onTimer (bool progress); + void onTimer (bool progress, boost::recursive_mutex::scoped_lock& peerSetLock); void newPeer (Peer::ref peer) { diff --git a/modules/ripple_app/ledger/ripple_InboundLedgers.cpp b/modules/ripple_app/ledger/ripple_InboundLedgers.cpp index 2bd0f82be6..b6f091a311 100644 --- a/modules/ripple_app/ledger/ripple_InboundLedgers.cpp +++ b/modules/ripple_app/ledger/ripple_InboundLedgers.cpp @@ -23,7 +23,7 @@ InboundLedger::pointer InboundLedgers::findCreate (uint256 const& hash, uint32 s ptr->addPeers (); ptr->setTimer (); // Cannot call in constructor } - else + else if (ptr->isComplete ()) { Ledger::pointer ledger = ptr->getLedger (); ledger->setClosed (); diff --git a/modules/ripple_basics/utility/ripple_Sustain.cpp b/modules/ripple_basics/utility/ripple_Sustain.cpp index a5b0b88e43..ac6514490d 100644 --- a/modules/ripple_basics/utility/ripple_Sustain.cpp +++ b/modules/ripple_basics/utility/ripple_Sustain.cpp @@ -34,7 +34,7 @@ std::string StopSustain () return "Terminating monitor"; } -std::string DoSustain () +std::string DoSustain (std::string logFile) { int childCount = 0; pManager = getpid (); @@ -72,7 +72,9 @@ std::string DoSustain () while (kill (pChild, 0) == 0); rename ("core", boost::str (boost::format ("core.%d") % static_cast (pChild)).c_str ()); - rename ("debug.log", boost::str (boost::format ("debug.log.%d") % static_cast (pChild)).c_str ()); + if (!logFile.empty()) + rename (logFile.c_str(), + boost::str (boost::format ("%s.%d") % logFile % static_cast (pChild)).c_str ()); } } @@ -82,7 +84,7 @@ bool HaveSustain () { return false; } -std::string DoSustain () +std::string DoSustain (std::string) { return std::string (); } diff --git a/modules/ripple_basics/utility/ripple_Sustain.h b/modules/ripple_basics/utility/ripple_Sustain.h index 2c2a2ce46b..cbd542266c 100644 --- a/modules/ripple_basics/utility/ripple_Sustain.h +++ b/modules/ripple_basics/utility/ripple_Sustain.h @@ -15,6 +15,6 @@ // extern bool HaveSustain (); extern std::string StopSustain (); -extern std::string DoSustain (); +extern std::string DoSustain (std::string logFile); #endif diff --git a/modules/ripple_core/functional/ripple_Job.cpp b/modules/ripple_core/functional/ripple_Job.cpp index 2c4b435b88..3fa8ae36be 100644 --- a/modules/ripple_core/functional/ripple_Job.cpp +++ b/modules/ripple_core/functional/ripple_Job.cpp @@ -40,15 +40,9 @@ JobType Job::getType () const void Job::doJob () { - m_loadEvent->start (); + m_loadEvent->reName (mName); mJob (*this); - - // VFALCO TODO Isn't there a way to construct the load event with - // the proper name? This way the load event object - // can have the invariant "name is always set" - // - m_loadEvent->reName (mName); } void Job::rename (std::string const& newName) @@ -61,6 +55,11 @@ int Job::getLimit () const return m_limit; } +LoadEvent& Job::peekEvent() const +{ + return *m_loadEvent; +} + const char* Job::toString (JobType t) { switch (t) @@ -131,6 +130,9 @@ const char* Job::toString (JobType t) case jtTXN_PROC: return "processTransaction"; + case jtTXN_DATA: + return "fetchTxnData"; + case jtOB_SETUP: return "orderBookSetup"; @@ -143,6 +145,9 @@ const char* Job::toString (JobType t) case jtHO_WRITE: return "nodeWrite"; + case jtSWEEP: + return "sweep"; + default: assert (false); return "unknown"; diff --git a/modules/ripple_core/functional/ripple_Job.h b/modules/ripple_core/functional/ripple_Job.h index 55d54f2f3e..1e97ee4ee0 100644 --- a/modules/ripple_core/functional/ripple_Job.h +++ b/modules/ripple_core/functional/ripple_Job.h @@ -17,20 +17,21 @@ enum JobType jtPUBOLDLEDGER = 2, // An old ledger has been accepted jtVALIDATION_ut = 3, // A validation from an untrusted source jtPROOFWORK = 4, // A proof of work demand from another server - jtPROPOSAL_ut = 5, // A proposal from an untrusted source - jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring - jtUPDATE_PF = 7, // Update pathfinding requests - jtCLIENT = 8, // A websocket command from the client - jtTRANSACTION = 9, // A transaction received from the network - jtPUBLEDGER = 10, // Publish a fully-accepted ledger - jtWAL = 11, // Write-ahead logging - jtVALIDATION_t = 12, // A validation from a trusted source - jtWRITE = 13, // Write out hashed objects - jtTRANSACTION_l = 14, // A local transaction - jtPROPOSAL_t = 15, // A proposal from a trusted source - jtSWEEP = 16, // Sweep for stale structures - jtADMIN = 17, // An administrative operation - jtDEATH = 18, // job of death, used internally + jtTRANSACTION_l = 5, // A local transaction + jtPROPOSAL_ut = 6, // A proposal from an untrusted source + jtLEDGER_DATA = 7, // Received data for a ledger we're acquiring + jtUPDATE_PF = 8, // Update pathfinding requests + jtCLIENT = 9, // A websocket command from the client + jtTRANSACTION = 10, // A transaction received from the network + jtPUBLEDGER = 11, // Publish a fully-accepted ledger + jtTXN_DATA = 12, // Fetch a proposed set + jtWAL = 13, // Write-ahead logging + jtVALIDATION_t = 14, // A validation from a trusted source + jtWRITE = 15, // Write out hashed objects + jtPROPOSAL_t = 16, // A proposal from a trusted source + jtSWEEP = 17, // Sweep for stale structures + jtADMIN = 18, // An administrative operation + jtDEATH = 19, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, @@ -81,6 +82,8 @@ public: int getLimit () const; + LoadEvent& peekEvent() const; + // These comparison operators make the jobs sort in priority order in the job set bool operator< (const Job& j) const; bool operator> (const Job& j) const; diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index 5d88673ea0..30e0e52df1 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -44,7 +44,9 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering assert (mThreadCount != 0); // do not add jobs to a queue with no threads - mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); + std::pair< std::set ::iterator, bool > it = + mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); + it.first->peekEvent().start(); // start timing how long it stays in the queue ++mJobCounts[type].first; mJobCond.notify_one (); } diff --git a/src/cpp/ripple/NetworkOPs.cpp b/src/cpp/ripple/NetworkOPs.cpp index 438d8cb9a7..9418db998b 100644 --- a/src/cpp/ripple/NetworkOPs.cpp +++ b/src/cpp/ripple/NetworkOPs.cpp @@ -1272,15 +1272,6 @@ NetworkOPs::getAccountTxs (const RippleAddress& account, int32 minLedger, int32 ledger->pendSave(false); } - if (rawMeta.getLength() == 0) - { // Work around a bug that could leave the metadata missing - uint32 seq = static_cast(db->getBigInt("AccountTransactions.LedgerSeq")); - WriteLog(lsWARNING, NetworkOPs) << "Recovering ledger " << seq << ", txn " << txn->getID(); - Ledger::pointer ledger = getLedgerBySeq(seq); - if (ledger) - ledger->pendSave(false); - } - TransactionMetaSet::pointer meta = boost::make_shared (txn->getID (), txn->getLedger (), rawMeta.getData ()); ret.push_back (std::pair (txn, meta)); } diff --git a/src/cpp/ripple/ripple_Main.cpp b/src/cpp/ripple/ripple_Main.cpp index 8f79353ee5..580d75abb4 100644 --- a/src/cpp/ripple/ripple_Main.cpp +++ b/src/cpp/ripple/ripple_Main.cpp @@ -299,7 +299,7 @@ int rippleMain (int argc, char** argv) if (HaveSustain () && !iResult && !vm.count ("parameters") && !vm.count ("fg") && !vm.count ("standalone") && !vm.count ("unittest")) { - std::string logMe = DoSustain (); + std::string logMe = DoSustain (theConfig.DEBUG_LOGFILE.c_str()); if (!logMe.empty ()) Log (lsWARNING) << logMe; diff --git a/src/cpp/ripple/ripple_PeerSet.cpp b/src/cpp/ripple/ripple_PeerSet.cpp index 9f9f02b956..de58e4bfa4 100644 --- a/src/cpp/ripple/ripple_PeerSet.cpp +++ b/src/cpp/ripple/ripple_PeerSet.cpp @@ -6,7 +6,7 @@ class InboundLedger; -PeerSet::PeerSet (uint256 const& hash, int interval) +PeerSet::PeerSet (uint256 const& hash, int interval, bool txnData) : mHash (hash) , mTimerInterval (interval) , mTimeouts (0) @@ -14,6 +14,7 @@ PeerSet::PeerSet (uint256 const& hash, int interval) , mFailed (false) , mProgress (true) , mAggressive (false) + , mTxnData (txnData) , mTimer (getApp().getIOService ()) { mLastAction = UptimeTimer::getInstance ().getElapsedSeconds (); @@ -53,12 +54,12 @@ void PeerSet::invokeOnTimer () { ++mTimeouts; WriteLog (lsWARNING, InboundLedger) << "Timeout(" << mTimeouts << ") pc=" << mPeers.size () << " acquiring " << mHash; - onTimer (false); + onTimer (false, sl); } else { mProgress = false; - onTimer (true); + onTimer (true, sl); } if (!isDone ()) @@ -74,16 +75,24 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er if (ptr) { - int jc = getApp().getJobQueue ().getJobCountTotal (jtLEDGER_DATA); - - if (jc > 4) + if (ptr->mTxnData) { - WriteLog (lsDEBUG, InboundLedger) << "Deferring PeerSet timer due to load"; - ptr->setTimer (); + getApp().getJobQueue ().addLimitJob (jtTXN_DATA, "timerEntry", 2, + BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } else - getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "timerEntry", 2, - BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); + { + int jc = getApp().getJobQueue ().getJobCountTotal (jtLEDGER_DATA); + + if (jc > 4) + { + WriteLog (lsDEBUG, InboundLedger) << "Deferring PeerSet timer due to load"; + ptr->setTimer (); + } + else + getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "timerEntry", 2, + BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); + } } } diff --git a/src/cpp/ripple/ripple_PeerSet.h b/src/cpp/ripple/ripple_PeerSet.h index ba56da2c46..b61336fb2f 100644 --- a/src/cpp/ripple/ripple_PeerSet.h +++ b/src/cpp/ripple/ripple_PeerSet.h @@ -67,11 +67,11 @@ private: // VFALCO TODO try to make some of these private protected: - PeerSet (uint256 const& hash, int interval); + PeerSet (uint256 const& hash, int interval, bool txnData); virtual ~PeerSet () { } virtual void newPeer (Peer::ref) = 0; - virtual void onTimer (bool progress) = 0; + virtual void onTimer (bool progress, boost::recursive_mutex::scoped_lock&) = 0; virtual boost::weak_ptr pmDowncast () = 0; void setComplete () @@ -95,6 +95,7 @@ protected: bool mFailed; bool mProgress; bool mAggressive; + bool mTxnData; int mLastAction; diff --git a/src/cpp/ripple/ripple_TransactionAcquire.cpp b/src/cpp/ripple/ripple_TransactionAcquire.cpp index f7cbef4fb7..ac1accaa6d 100644 --- a/src/cpp/ripple/ripple_TransactionAcquire.cpp +++ b/src/cpp/ripple/ripple_TransactionAcquire.cpp @@ -11,7 +11,9 @@ SETUP_LOG (TransactionAcquire) typedef std::map::value_type u160_prop_pair; typedef std::map::value_type u256_lct_pair; -TransactionAcquire::TransactionAcquire (uint256 const& hash) : PeerSet (hash, TX_ACQUIRE_TIMEOUT), mHaveRoot (false) +TransactionAcquire::TransactionAcquire (uint256 const& hash) + : PeerSet (hash, TX_ACQUIRE_TIMEOUT, true) + , mHaveRoot (false) { mMap = boost::make_shared (smtTRANSACTION, hash); } @@ -46,23 +48,30 @@ void TransactionAcquire::done () getApp().getIOService ().post (BIND_TYPE (&TACompletionHandler, mHash, map)); } -void TransactionAcquire::onTimer (bool progress) +void TransactionAcquire::onTimer (bool progress, boost::recursive_mutex::scoped_lock& psl) { bool aggressive = false; if (getTimeouts () > 10) { WriteLog (lsWARNING, TransactionAcquire) << "Ten timeouts on TX set " << getHash (); + psl.unlock(); { Application::ScopedLockType lock (getApp().getMasterLock (), __FILE__, __LINE__); if (getApp().getOPs ().stillNeedTXSet (mHash)) { - WriteLog (lsWARNING, TransactionAcquire) << "Still need it"; - mTimeouts = 0; - aggressive = true; - } + boost::recursive_mutex::scoped_lock sl (getApp().getMasterLock ()); + + if (getApp().getOPs ().stillNeedTXSet (mHash)) + { + WriteLog (lsWARNING, TransactionAcquire) << "Still need it"; + mTimeouts = 0; + aggressive = true; + } + } } + psl.lock(); if (!aggressive) { diff --git a/src/cpp/ripple/ripple_TransactionAcquire.h b/src/cpp/ripple/ripple_TransactionAcquire.h index de057dd96d..dcfa7a81b4 100644 --- a/src/cpp/ripple/ripple_TransactionAcquire.h +++ b/src/cpp/ripple/ripple_TransactionAcquire.h @@ -38,7 +38,7 @@ private: SHAMap::pointer mMap; bool mHaveRoot; - void onTimer (bool progress); + void onTimer (bool progress, boost::recursive_mutex::scoped_lock& peerSetLock); void newPeer (Peer::ref peer) { trigger (peer);