diff --git a/src/ripple_app/ledger/LedgerMaster.cpp b/src/ripple_app/ledger/LedgerMaster.cpp index 3df3bbba71..60a01b1d4f 100644 --- a/src/ripple_app/ledger/LedgerMaster.cpp +++ b/src/ripple_app/ledger/LedgerMaster.cpp @@ -68,7 +68,7 @@ public: bool mAdvanceWork; // Publish thread has work to do int mFillInProgress; - bool mPathFindThread; // Pathfind thread is running + int mPathFindThread; // Pathfinder jobs dispatched bool mPathFindNewLedger; bool mPathFindNewRequest; @@ -90,7 +90,7 @@ public: , mAdvanceThread (false) , mAdvanceWork (false) , mFillInProgress (0) - , mPathFindThread (false) + , mPathFindThread (0) , mPathFindNewRequest (false) , mPubLedgerClose (0) , mPubLedgerSeq (0) @@ -844,13 +844,7 @@ public: } getApp().getOPs().clearNeedNetworkLedger(); - - if (!mPathFindThread) - { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", - BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); - } + newPFWork ("pf:newLedger"); } if (progress) mAdvanceWork = true; @@ -1000,47 +994,55 @@ public: void updatePaths (Job& job) { - Ledger::pointer lastLedger; - - if (getApp().getOPs().isNeedNetworkLedger ()) { ScopedLockType ml (m_mutex, __FILE__, __LINE__); - mPathFindThread = false; - return; + if (getApp().getOPs().isNeedNetworkLedger () || !mCurrentLedger) + { + --mPathFindThread; + return; + } } + while (! job.shouldCancel()) { - bool newOnly = true; - bool hasNew = mPathFindNewRequest; - + Ledger::pointer lastLedger; { ScopedLockType ml (m_mutex, __FILE__, __LINE__); - if (!mPathLedger || (mPathLedger->getLedgerSeq() < mValidLedger->getLedgerSeq())) + if (mValidLedger && + (!mPathLedger || (mPathLedger->getLedgerSeq() != mValidLedger->getLedgerSeq()))) { // We have a new valid ledger since the last full pathfinding - newOnly = false; mPathLedger = mValidLedger; lastLedger = mPathLedger; } else if (mPathFindNewRequest) { // We have a new request but no new ledger - newOnly = true; lastLedger = boost::make_shared (boost::ref (*mCurrentLedger), false); } else { // Nothing to do - mPathFindThread = false; + --mPathFindThread; return; } + } - mPathFindNewRequest = false; + if (!getConfig().RUN_STANDALONE) + { // don't pathfind with a ledger that's more than 60 seconds old + int64 age = getApp().getOPs().getCloseTimeNC(); + age -= static_cast (lastLedger->getCloseTimeNC()); + if (age > 60) + { + WriteLog (lsDEBUG, LedgerMaster) << "Published ledger too old for updating paths"; + --mPathFindThread; + return; + } } try { // VFALCO TODO Fix this global variable - PathRequest::updateAll (lastLedger, newOnly, hasNew, job.getCancelCallback ()); + PathRequest::updateAll (lastLedger, job.getCancelCallback ()); } catch (SHAMapMissingNode&) { @@ -1055,12 +1057,16 @@ public: ScopedLockType ml (m_mutex, __FILE__, __LINE__); mPathFindNewRequest = true; - if (!mPathFindThread) - { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", - BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); - } + newPFWork("pf:newRequest"); + } + + bool isNewPathRequest () + { + ScopedLockType ml (m_mutex, __FILE__, __LINE__); + if (!mPathFindNewRequest) + return false; + mPathFindNewRequest = false; + return true; } // If the order book is radically updated, we need to reprocess all pathfinding requests @@ -1069,10 +1075,17 @@ public: ScopedLockType ml (m_mutex, __FILE__, __LINE__); mPathLedger.reset(); - if (!mPathFindThread) + newPFWork("pf:newOBDB"); + } + + /** A thread needs to be dispatched to handle pathfinding work of some kind + */ + void newPFWork (const char *name) + { + if (mPathFindThread < 2) { - mPathFindThread = true; - getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths", + ++mPathFindThread; + getApp().getJobQueue().addJob (jtUPDATE_PF, name, BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1)); } } diff --git a/src/ripple_app/ledger/LedgerMaster.h b/src/ripple_app/ledger/LedgerMaster.h index 308faef5b0..c03e9a4093 100644 --- a/src/ripple_app/ledger/LedgerMaster.h +++ b/src/ripple_app/ledger/LedgerMaster.h @@ -129,6 +129,7 @@ public: virtual void tryAdvance () = 0; virtual void newPathRequest () = 0; + virtual bool isNewPathRequest () = 0; virtual void newOrderBookDB () = 0; virtual bool fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) = 0; diff --git a/src/ripple_app/paths/PathRequest.cpp b/src/ripple_app/paths/PathRequest.cpp index 54554ffa69..80c9c8c5a2 100644 --- a/src/ripple_app/paths/PathRequest.cpp +++ b/src/ripple_app/paths/PathRequest.cpp @@ -21,7 +21,7 @@ SETUP_LOG (PathRequest) // VFALCO TODO Move these globals into a PathRequests collection inteface PathRequest::StaticLockType PathRequest::sLock ("PathRequest", __FILE__, __LINE__); -std::set PathRequest::sRequests; +std::vector PathRequest::sRequests; RippleLineCache::pointer PathRequest::sLineCache; std::atomic PathRequest::s_last_id (0); @@ -30,17 +30,42 @@ PathRequest::PathRequest (const boost::shared_ptr& subscriber) , wpSubscriber (subscriber) , jvStatus (Json::objectValue) , bValid (false) - , bNew (true) + , iLastIndex (0) , iLastLevel (0) , bLastSuccess (false) , iIdentifier (++s_last_id) { - WriteLog (lsINFO, PathRequest) << iIdentifier << " created"; + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " created"; + ptCreated = boost::posix_time::microsec_clock::universal_time (); +} + +static std::string const get_milli_diff (boost::posix_time::ptime const& after, boost::posix_time::ptime const& before) +{ + return lexicalCastThrow (static_cast ((after - before).total_milliseconds())); +} + +static std::string const get_milli_diff (boost::posix_time::ptime const& before) +{ + return get_milli_diff(boost::posix_time::microsec_clock::universal_time(), before); } PathRequest::~PathRequest() { - WriteLog (lsINFO, PathRequest) << iIdentifier << " destroyed"; + std::string fast, full; + if (!ptQuickReply.is_not_a_date_time()) + { + fast = " fast:"; + fast += get_milli_diff (ptQuickReply, ptCreated); + fast += "ms"; + } + if (!ptFullReply.is_not_a_date_time()) + { + full = " full:"; + full += get_milli_diff (ptFullReply, ptCreated); + full += "ms"; + } + WriteLog (lsINFO, PathRequest) << iIdentifier << " complete:" << fast << full << + " total:" << get_milli_diff(ptCreated) << "ms"; } bool PathRequest::isValid () @@ -49,10 +74,23 @@ bool PathRequest::isValid () return bValid; } -bool PathRequest::isNew () +bool PathRequest::needsUpdate (bool newOnly, LedgerIndex index) { ScopedLockType sl (mLock, __FILE__, __LINE__); - return bNew; + if (newOnly) + { // we only want to handle new requests + if (iLastIndex != 0) + return false; + iLastIndex = 1; + return true; + } + else + { + if (iLastIndex >= index) + return false; + iLastIndex = index; + return true; + } } bool PathRequest::isValid (Ledger::ref lrLedger) @@ -119,7 +157,7 @@ Json::Value PathRequest::doCreate (Ledger::ref lrLedger, const Json::Value& valu RippleLineCache::pointer cache; { StaticScopedLockType sl (sLock, __FILE__, __LINE__); - cache = getLineCache (ledger); + cache = getLineCache (ledger, false); } Json::Value status; @@ -131,27 +169,28 @@ Json::Value PathRequest::doCreate (Ledger::ref lrLedger, const Json::Value& valu mValid = isValid (ledger); if (mValid) - { - doUpdate (cache, true); - } + status = doUpdate (cache, true); + else + status = jvStatus; } else + { mValid = false; - - status = jvStatus; + status = jvStatus; + } } if (mValid) { - WriteLog (lsINFO, PathRequest) << iIdentifier << " valid: " << raSrcAccount.humanAccountID () << + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " valid: " << raSrcAccount.humanAccountID () << " -> " << raDstAccount.humanAccountID (); - WriteLog (lsINFO, PathRequest) << iIdentifier << " Deliver: " << saDstAmount.getFullText (); + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Deliver: " << saDstAmount.getFullText (); StaticScopedLockType sl (sLock, __FILE__, __LINE__); - sRequests.insert (shared_from_this ()); + sRequests.push_back (shared_from_this ()); } else - WriteLog (lsINFO, PathRequest) << iIdentifier << " invalid"; + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " invalid"; return status; } @@ -267,17 +306,14 @@ void PathRequest::resetLevel (int l) iLastLevel = l; } -bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) +Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) { WriteLog (lsDEBUG, PathRequest) << iIdentifier << " update " << (fast ? "fast" : "normal"); ScopedLockType sl (mLock, __FILE__, __LINE__); - jvStatus = Json::objectValue; if (!isValid (cache->getLedger ())) - return false; - - if (!fast) - bNew = false; + return jvStatus; + jvStatus = Json::objectValue; std::set sourceCurrencies (sciSourceCurrencies); @@ -312,12 +348,8 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) if (iLevel == 0) { // first pass - if (loaded) + if (loaded || fast) iLevel = getConfig().PATH_SEARCH_FAST; - else if (!fast) - iLevel = getConfig().PATH_SEARCH_OLD; - else if (getConfig().PATH_SEARCH < getConfig().PATH_SEARCH_MAX) - iLevel = getConfig().PATH_SEARCH + 1; // start with an extra boost else iLevel = getConfig().PATH_SEARCH; } @@ -326,8 +358,6 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) iLevel = getConfig().PATH_SEARCH; if (loaded && (iLevel > getConfig().PATH_SEARCH_FAST)) --iLevel; - else if (!loaded && (iLevel < getConfig().PATH_SEARCH)) - ++iLevel; } else if (bLastSuccess) { // decrement, if possible @@ -356,7 +386,7 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) STPathSet& spsPaths = mContext[currIssuer]; Pathfinder pf (cache, raSrcAccount, raDstAccount, currIssuer.first, currIssuer.second, saDstAmount, valid); - CondLog (!valid, lsINFO, PathRequest) << iIdentifier << " PF request not valid"; + CondLog (!valid, lsDEBUG, PathRequest) << iIdentifier << " PF request not valid"; STPath extraPath; if (valid && pf.findPaths (iLevel, 4, spsPaths, extraPath)) @@ -398,34 +428,42 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) } else { - WriteLog (lsINFO, PathRequest) << iIdentifier << " rippleCalc returns " << transHuman (terResult); + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " rippleCalc returns " << transHuman (terResult); } } else { - WriteLog (lsINFO, PathRequest) << iIdentifier << " No paths found"; + WriteLog (lsDEBUG, PathRequest) << iIdentifier << " No paths found"; } } iLastLevel = iLevel; bLastSuccess = found; + if (fast && ptQuickReply.is_not_a_date_time()) + ptQuickReply = boost::posix_time::microsec_clock::universal_time(); + else if (!fast && ptFullReply.is_not_a_date_time()) + ptFullReply = boost::posix_time::microsec_clock::universal_time(); + jvStatus["alternatives"] = jvArray; - return true; + return jvStatus; } /** Get the current RippleLineCache, updating it if necessary. Get the correct ledger to use. Call with a lock */ -RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger) +RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger, bool authoritative) { uint32 lineSeq = sLineCache ? sLineCache->getLedger()->getLedgerSeq() : 0; - if ( (lineSeq == 0) || // No cache - (lineSeq < ledger->getLedgerSeq()) || // Cache is out of date - (lineSeq > (ledger->getLedgerSeq() + 16))) // We jumped way back somehow + uint32 lgrSeq = ledger->getLedgerSeq(); + + if ( (lineSeq == 0) || // no ledger + (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger + (authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason + (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason { - ledger = boost::make_shared(*ledger, false); + ledger = boost::make_shared(*ledger, false); // Take a snapshot of the ledger sLineCache = boost::make_shared (ledger); } else @@ -435,9 +473,9 @@ RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger) return sLineCache; } -void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, CancelCallback shouldCancel) +void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel) { - std::set requests; + std::vector requests; LoadEvent::autoptr event (getApp().getJobQueue().getLoadEventAP(jtPATH_FIND, "PathRequest::updateAll")); @@ -447,60 +485,108 @@ void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, Ca { StaticScopedLockType sl (sLock, __FILE__, __LINE__); requests = sRequests; - cache = getLineCache (ledger); + cache = getLineCache (ledger, true); } - if (requests.empty ()) - return; - - WriteLog (lsDEBUG, PathRequest) << "updateAll seq=" << ledger->getLedgerSeq() << - (newOnly ? " newOnly, " : " all, ") << requests.size() << " requests"; + bool newRequests = getApp().getLedgerMaster().isNewPathRequest(); + bool mustBreak = false; + WriteLog (lsTRACE, PathRequest) << "updateAll seq=" << ledger->getLedgerSeq() << ", " << + requests.size() << " requests"; int processed = 0, removed = 0; - BOOST_FOREACH (wref wRequest, requests) + do { - if (shouldCancel()) - break; - bool remove = true; - PathRequest::pointer pRequest = wRequest.lock (); + { // Get the latest requests, cache, and ledger + StaticScopedLockType sl (sLock, __FILE__, __LINE__); - if (pRequest) - { - // Drop old requests level to get new ones done faster - if (hasNew) - pRequest->resetLevel(getConfig().PATH_SEARCH); + if (sRequests.empty()) + return; - if (newOnly && !pRequest->isNew ()) - remove = false; - else + // Newest request is last in sRequests, but we want to serve it first + requests.empty(); + requests.reserve(sRequests.size ()); + BOOST_REVERSE_FOREACH (wptr& req, sRequests) { - InfoSub::pointer ipSub = pRequest->wpSubscriber.lock (); + requests.push_back (req); + } - if (ipSub) - { - Json::Value update; - { - ScopedLockType sl (pRequest->mLock, __FILE__, __LINE__); - pRequest->doUpdate (cache, false); - update = pRequest->jvStatus; - } - update["type"] = "path_find"; - ipSub->send (update, false); + cache = getLineCache (ledger, false); + } + + BOOST_FOREACH (wref wRequest, requests) + { + if (shouldCancel()) + break; + + bool remove = true; + PathRequest::pointer pRequest = wRequest.lock (); + + if (pRequest) + { + if (!pRequest->needsUpdate (newRequests, ledger->getLedgerSeq ())) remove = false; - ++processed; + else + { + InfoSub::pointer ipSub = pRequest->wpSubscriber.lock (); + + if (ipSub) + { + Json::Value update = pRequest->doUpdate (cache, false); + update["type"] = "path_find"; + ipSub->send (update, false); + remove = false; + ++processed; + } } } + + if (remove) + { + PathRequest::pointer pRequest = wRequest.lock (); + + StaticScopedLockType sl (sLock, __FILE__, __LINE__); + + // Remove any dangling weak pointers or weak pointers that refer to this path request. + std::vector::iterator it = sRequests.begin(); + while (it != sRequests.end()) + { + PathRequest::pointer itRequest = it->lock (); + if (!itRequest || (itRequest == pRequest)) + { + ++removed; + it = sRequests.erase (it); + } + else + ++it; + } + } + + mustBreak = !newRequests && getApp().getLedgerMaster().isNewPathRequest(); + if (mustBreak) // We weren't handling new requests and then there was a new request + break; + } - if (remove) - { - ++removed; - StaticScopedLockType sl (sLock, __FILE__, __LINE__); - sRequests.erase (wRequest); + if (mustBreak) + { // a new request came in while we were working + newRequests = true; } + else if (newRequests) + { // we only did new requests, so we always need a last pass + newRequests = getApp().getLedgerMaster().isNewPathRequest(); + } + else + { // check if there are any new requests, otherwise we are done + newRequests = getApp().getLedgerMaster().isNewPathRequest(); + if (!newRequests) // We did a full pass and there are no new requests + return; + } + } + while (!shouldCancel ()); + WriteLog (lsDEBUG, PathRequest) << "updateAll complete " << processed << " process and " << removed << " removed"; } diff --git a/src/ripple_app/paths/PathRequest.h b/src/ripple_app/paths/PathRequest.h index cb1ce726ff..3e308ddb52 100644 --- a/src/ripple_app/paths/PathRequest.h +++ b/src/ripple_app/paths/PathRequest.h @@ -49,22 +49,21 @@ public: bool isValid (const boost::shared_ptr&); bool isValid (); - bool isNew (); + bool needsUpdate (bool newOnly, LedgerIndex index); Json::Value getStatus (); Json::Value doCreate (const boost::shared_ptr&, const Json::Value&); Json::Value doClose (const Json::Value&); Json::Value doStatus (const Json::Value&); + Json::Value doUpdate (const boost::shared_ptr&, bool fast); // update jvStatus - bool doUpdate (const boost::shared_ptr&, bool fast); // update jvStatus - - static void updateAll (const boost::shared_ptr& ledger, bool newOnly, bool hasNew, CancelCallback shouldCancel); + static void updateAll (const boost::shared_ptr& ledger, CancelCallback shouldCancel); + static RippleLineCache::pointer getLineCache (Ledger::pointer& ledger, bool authoritative); private: void setValid (); void resetLevel (int level); int parseJson (const Json::Value&, bool complete); - static RippleLineCache::pointer getLineCache (Ledger::pointer& ledger); typedef RippleRecursiveMutex LockType; typedef LockType::ScopedLockType ScopedLockType; @@ -83,7 +82,7 @@ private: std::map mContext; bool bValid; - bool bNew; + LedgerIndex iLastIndex; int iLastLevel; bool bLastSuccess; @@ -92,8 +91,12 @@ private: static std::atomic s_last_id; + boost::posix_time::ptime ptCreated; + boost::posix_time::ptime ptQuickReply; + boost::posix_time::ptime ptFullReply; + // Track all requests - static std::set sRequests; + static std::vector sRequests; // Use a RippleLineCache static RippleLineCache::pointer sLineCache; diff --git a/src/ripple_app/rpc/RPCHandler.cpp b/src/ripple_app/rpc/RPCHandler.cpp index c71b35f012..c11b33fead 100644 --- a/src/ripple_app/rpc/RPCHandler.cpp +++ b/src/ripple_app/rpc/RPCHandler.cpp @@ -1479,12 +1479,7 @@ Json::Value RPCHandler::doPathFind (Json::Value params, Resource::Charge& loadTy return rpcError (rpcINVALID_PARAMS); } -// TODO: -// - Add support for specifying non-endpoint issuer. -// - Return fully expanded path with proof. -// - Allows clients to verify path exists. -// - Return canonicalized path. -// - From a trusted server, allows clients to use path without manipulation. +// This interface is deprecated. Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& masterLockHolder) { int jc = getApp().getJobQueue ().getJobCountGE (jtCLIENT); @@ -1500,10 +1495,15 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& RippleAddress raDst; STAmount saDstAmount; Ledger::pointer lpLedger; - Json::Value jvResult = lookupLedger (params, lpLedger); - if (!lpLedger) - return jvResult; + Json::Value jvResult; + + if (getConfig().RUN_STANDALONE || params.isMember("ledger") || params.isMember("ledger_index") || params.isMember("ledger_hash")) + { // The caller specified a ledger + jvResult = lookupLedger (params, lpLedger); + if (!lpLedger) + return jvResult; + } if (!params.isMember ("source_account")) { @@ -1546,7 +1546,18 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& else { loadType = Resource::feeHighBurdenRPC; - Ledger::pointer lSnapShot = boost::make_shared (boost::ref (*lpLedger), false); + RippleLineCache::pointer cache; + + if (lpLedger) + { // The caller specified a ledger + lpLedger = boost::make_shared (boost::ref (*lpLedger), false); + cache = boost::make_shared(lpLedger); + } + else + { // Use the default ledger and cache + lpLedger = mNetOps->getValidatedLedger(); + cache = PathRequest::getLineCache(lpLedger, false); + } masterLockHolder.unlock (); // As long as we have a locked copy of the ledger, we can unlock. @@ -1583,7 +1594,6 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& jvResult["destination_account"] = raDst.humanAccountID (); Json::Value jvArray (Json::arrayValue); - RippleLineCache::pointer cache = boost::make_shared (lSnapShot); for (unsigned int i = 0; i != jvSrcCurrencies.size (); ++i) { @@ -1646,7 +1656,7 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& 1); saMaxAmount.negate (); - LedgerEntrySet lesSandbox (lSnapShot, tapNONE); + LedgerEntrySet lesSandbox (lpLedger, tapNONE); TER terResult = RippleCalc::rippleCalc (