Pathfinding improvements:

* Make each path request track whether it needs updating.
* Improve new request handling, reverse order for processing requests.
* Break to handle new requests immediately.
* Make mPathFindThread an integer rather than a bool. Allow two threads.
* For old pathfinding, if the ledger is unspecified, use the PathRequest's RippleLineCache.
* Log new pathfinding request latencies.
* Suspend processing requests if server is backed up.
This commit is contained in:
JoelKatz
2013-12-09 00:09:06 -08:00
committed by Vinnie Falco
parent 087301933a
commit de85a7c2bd
5 changed files with 240 additions and 127 deletions

View File

@@ -68,7 +68,7 @@ public:
bool mAdvanceWork; // Publish thread has work to do
int mFillInProgress;
bool mPathFindThread; // Pathfind thread is running
int mPathFindThread; // Pathfinder jobs dispatched
bool mPathFindNewLedger;
bool mPathFindNewRequest;
@@ -90,7 +90,7 @@ public:
, mAdvanceThread (false)
, mAdvanceWork (false)
, mFillInProgress (0)
, mPathFindThread (false)
, mPathFindThread (0)
, mPathFindNewRequest (false)
, mPubLedgerClose (0)
, mPubLedgerSeq (0)
@@ -844,13 +844,7 @@ public:
}
getApp().getOPs().clearNeedNetworkLedger();
if (!mPathFindThread)
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1));
}
newPFWork ("pf:newLedger");
}
if (progress)
mAdvanceWork = true;
@@ -1000,47 +994,55 @@ public:
void updatePaths (Job& job)
{
Ledger::pointer lastLedger;
if (getApp().getOPs().isNeedNetworkLedger ())
{
ScopedLockType ml (m_mutex, __FILE__, __LINE__);
mPathFindThread = false;
if (getApp().getOPs().isNeedNetworkLedger () || !mCurrentLedger)
{
--mPathFindThread;
return;
}
}
while (! job.shouldCancel())
{
bool newOnly = true;
bool hasNew = mPathFindNewRequest;
Ledger::pointer lastLedger;
{
ScopedLockType ml (m_mutex, __FILE__, __LINE__);
if (!mPathLedger || (mPathLedger->getLedgerSeq() < mValidLedger->getLedgerSeq()))
if (mValidLedger &&
(!mPathLedger || (mPathLedger->getLedgerSeq() != mValidLedger->getLedgerSeq())))
{ // We have a new valid ledger since the last full pathfinding
newOnly = false;
mPathLedger = mValidLedger;
lastLedger = mPathLedger;
}
else if (mPathFindNewRequest)
{ // We have a new request but no new ledger
newOnly = true;
lastLedger = boost::make_shared<Ledger> (boost::ref (*mCurrentLedger), false);
}
else
{ // Nothing to do
mPathFindThread = false;
--mPathFindThread;
return;
}
}
mPathFindNewRequest = false;
if (!getConfig().RUN_STANDALONE)
{ // don't pathfind with a ledger that's more than 60 seconds old
int64 age = getApp().getOPs().getCloseTimeNC();
age -= static_cast<int64> (lastLedger->getCloseTimeNC());
if (age > 60)
{
WriteLog (lsDEBUG, LedgerMaster) << "Published ledger too old for updating paths";
--mPathFindThread;
return;
}
}
try
{
// VFALCO TODO Fix this global variable
PathRequest::updateAll (lastLedger, newOnly, hasNew, job.getCancelCallback ());
PathRequest::updateAll (lastLedger, job.getCancelCallback ());
}
catch (SHAMapMissingNode&)
{
@@ -1055,12 +1057,16 @@ public:
ScopedLockType ml (m_mutex, __FILE__, __LINE__);
mPathFindNewRequest = true;
if (!mPathFindThread)
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1));
newPFWork("pf:newRequest");
}
bool isNewPathRequest ()
{
ScopedLockType ml (m_mutex, __FILE__, __LINE__);
if (!mPathFindNewRequest)
return false;
mPathFindNewRequest = false;
return true;
}
// If the order book is radically updated, we need to reprocess all pathfinding requests
@@ -1069,10 +1075,17 @@ public:
ScopedLockType ml (m_mutex, __FILE__, __LINE__);
mPathLedger.reset();
if (!mPathFindThread)
newPFWork("pf:newOBDB");
}
/** A thread needs to be dispatched to handle pathfinding work of some kind
*/
void newPFWork (const char *name)
{
mPathFindThread = true;
getApp().getJobQueue ().addJob (jtUPDATE_PF, "updatePaths",
if (mPathFindThread < 2)
{
++mPathFindThread;
getApp().getJobQueue().addJob (jtUPDATE_PF, name,
BIND_TYPE (&LedgerMasterImp::updatePaths, this, P_1));
}
}

View File

@@ -129,6 +129,7 @@ public:
virtual void tryAdvance () = 0;
virtual void newPathRequest () = 0;
virtual bool isNewPathRequest () = 0;
virtual void newOrderBookDB () = 0;
virtual bool fixIndex (LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) = 0;

View File

@@ -21,7 +21,7 @@ SETUP_LOG (PathRequest)
// VFALCO TODO Move these globals into a PathRequests collection inteface
PathRequest::StaticLockType PathRequest::sLock ("PathRequest", __FILE__, __LINE__);
std::set <PathRequest::wptr> PathRequest::sRequests;
std::vector <PathRequest::wptr> PathRequest::sRequests;
RippleLineCache::pointer PathRequest::sLineCache;
std::atomic <int> PathRequest::s_last_id (0);
@@ -30,17 +30,42 @@ PathRequest::PathRequest (const boost::shared_ptr<InfoSub>& subscriber)
, wpSubscriber (subscriber)
, jvStatus (Json::objectValue)
, bValid (false)
, bNew (true)
, iLastIndex (0)
, iLastLevel (0)
, bLastSuccess (false)
, iIdentifier (++s_last_id)
{
WriteLog (lsINFO, PathRequest) << iIdentifier << " created";
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " created";
ptCreated = boost::posix_time::microsec_clock::universal_time ();
}
static std::string const get_milli_diff (boost::posix_time::ptime const& after, boost::posix_time::ptime const& before)
{
return lexicalCastThrow <std::string> (static_cast <unsigned> ((after - before).total_milliseconds()));
}
static std::string const get_milli_diff (boost::posix_time::ptime const& before)
{
return get_milli_diff(boost::posix_time::microsec_clock::universal_time(), before);
}
PathRequest::~PathRequest()
{
WriteLog (lsINFO, PathRequest) << iIdentifier << " destroyed";
std::string fast, full;
if (!ptQuickReply.is_not_a_date_time())
{
fast = " fast:";
fast += get_milli_diff (ptQuickReply, ptCreated);
fast += "ms";
}
if (!ptFullReply.is_not_a_date_time())
{
full = " full:";
full += get_milli_diff (ptFullReply, ptCreated);
full += "ms";
}
WriteLog (lsINFO, PathRequest) << iIdentifier << " complete:" << fast << full <<
" total:" << get_milli_diff(ptCreated) << "ms";
}
bool PathRequest::isValid ()
@@ -49,10 +74,23 @@ bool PathRequest::isValid ()
return bValid;
}
bool PathRequest::isNew ()
bool PathRequest::needsUpdate (bool newOnly, LedgerIndex index)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
return bNew;
if (newOnly)
{ // we only want to handle new requests
if (iLastIndex != 0)
return false;
iLastIndex = 1;
return true;
}
else
{
if (iLastIndex >= index)
return false;
iLastIndex = index;
return true;
}
}
bool PathRequest::isValid (Ledger::ref lrLedger)
@@ -119,7 +157,7 @@ Json::Value PathRequest::doCreate (Ledger::ref lrLedger, const Json::Value& valu
RippleLineCache::pointer cache;
{
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
cache = getLineCache (ledger);
cache = getLineCache (ledger, false);
}
Json::Value status;
@@ -131,27 +169,28 @@ Json::Value PathRequest::doCreate (Ledger::ref lrLedger, const Json::Value& valu
mValid = isValid (ledger);
if (mValid)
{
doUpdate (cache, true);
}
status = doUpdate (cache, true);
else
status = jvStatus;
}
else
{
mValid = false;
status = jvStatus;
}
}
if (mValid)
{
WriteLog (lsINFO, PathRequest) << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
" -> " << raDstAccount.humanAccountID ();
WriteLog (lsINFO, PathRequest) << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
sRequests.insert (shared_from_this ());
sRequests.push_back (shared_from_this ());
}
else
WriteLog (lsINFO, PathRequest) << iIdentifier << " invalid";
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " invalid";
return status;
}
@@ -267,17 +306,14 @@ void PathRequest::resetLevel (int l)
iLastLevel = l;
}
bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
{
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " update " << (fast ? "fast" : "normal");
ScopedLockType sl (mLock, __FILE__, __LINE__);
jvStatus = Json::objectValue;
if (!isValid (cache->getLedger ()))
return false;
if (!fast)
bNew = false;
return jvStatus;
jvStatus = Json::objectValue;
std::set<currIssuer_t> sourceCurrencies (sciSourceCurrencies);
@@ -312,12 +348,8 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
if (iLevel == 0)
{ // first pass
if (loaded)
if (loaded || fast)
iLevel = getConfig().PATH_SEARCH_FAST;
else if (!fast)
iLevel = getConfig().PATH_SEARCH_OLD;
else if (getConfig().PATH_SEARCH < getConfig().PATH_SEARCH_MAX)
iLevel = getConfig().PATH_SEARCH + 1; // start with an extra boost
else
iLevel = getConfig().PATH_SEARCH;
}
@@ -326,8 +358,6 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
iLevel = getConfig().PATH_SEARCH;
if (loaded && (iLevel > getConfig().PATH_SEARCH_FAST))
--iLevel;
else if (!loaded && (iLevel < getConfig().PATH_SEARCH))
++iLevel;
}
else if (bLastSuccess)
{ // decrement, if possible
@@ -356,7 +386,7 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
STPathSet& spsPaths = mContext[currIssuer];
Pathfinder pf (cache, raSrcAccount, raDstAccount,
currIssuer.first, currIssuer.second, saDstAmount, valid);
CondLog (!valid, lsINFO, PathRequest) << iIdentifier << " PF request not valid";
CondLog (!valid, lsDEBUG, PathRequest) << iIdentifier << " PF request not valid";
STPath extraPath;
if (valid && pf.findPaths (iLevel, 4, spsPaths, extraPath))
@@ -398,34 +428,42 @@ bool PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
}
else
{
WriteLog (lsINFO, PathRequest) << iIdentifier << " rippleCalc returns " << transHuman (terResult);
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " rippleCalc returns " << transHuman (terResult);
}
}
else
{
WriteLog (lsINFO, PathRequest) << iIdentifier << " No paths found";
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " No paths found";
}
}
iLastLevel = iLevel;
bLastSuccess = found;
if (fast && ptQuickReply.is_not_a_date_time())
ptQuickReply = boost::posix_time::microsec_clock::universal_time();
else if (!fast && ptFullReply.is_not_a_date_time())
ptFullReply = boost::posix_time::microsec_clock::universal_time();
jvStatus["alternatives"] = jvArray;
return true;
return jvStatus;
}
/** Get the current RippleLineCache, updating it if necessary.
Get the correct ledger to use.
Call with a lock
*/
RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger)
RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger, bool authoritative)
{
uint32 lineSeq = sLineCache ? sLineCache->getLedger()->getLedgerSeq() : 0;
if ( (lineSeq == 0) || // No cache
(lineSeq < ledger->getLedgerSeq()) || // Cache is out of date
(lineSeq > (ledger->getLedgerSeq() + 16))) // We jumped way back somehow
uint32 lgrSeq = ledger->getLedgerSeq();
if ( (lineSeq == 0) || // no ledger
(authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
(authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
(lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
{
ledger = boost::make_shared<Ledger>(*ledger, false);
ledger = boost::make_shared<Ledger>(*ledger, false); // Take a snapshot of the ledger
sLineCache = boost::make_shared<RippleLineCache> (ledger);
}
else
@@ -435,9 +473,9 @@ RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger)
return sLineCache;
}
void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, CancelCallback shouldCancel)
void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
{
std::set<wptr> requests;
std::vector<wptr> requests;
LoadEvent::autoptr event (getApp().getJobQueue().getLoadEventAP(jtPATH_FIND, "PathRequest::updateAll"));
@@ -447,16 +485,35 @@ void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, Ca
{
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
requests = sRequests;
cache = getLineCache (ledger);
cache = getLineCache (ledger, true);
}
if (requests.empty ())
bool newRequests = getApp().getLedgerMaster().isNewPathRequest();
bool mustBreak = false;
WriteLog (lsTRACE, PathRequest) << "updateAll seq=" << ledger->getLedgerSeq() << ", " <<
requests.size() << " requests";
int processed = 0, removed = 0;
do
{
{ // Get the latest requests, cache, and ledger
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
if (sRequests.empty())
return;
WriteLog (lsDEBUG, PathRequest) << "updateAll seq=" << ledger->getLedgerSeq() <<
(newOnly ? " newOnly, " : " all, ") << requests.size() << " requests";
// Newest request is last in sRequests, but we want to serve it first
requests.empty();
requests.reserve(sRequests.size ());
BOOST_REVERSE_FOREACH (wptr& req, sRequests)
{
requests.push_back (req);
}
int processed = 0, removed = 0;
cache = getLineCache (ledger, false);
}
BOOST_FOREACH (wref wRequest, requests)
{
@@ -468,11 +525,7 @@ void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, Ca
if (pRequest)
{
// Drop old requests level to get new ones done faster
if (hasNew)
pRequest->resetLevel(getConfig().PATH_SEARCH);
if (newOnly && !pRequest->isNew ())
if (!pRequest->needsUpdate (newRequests, ledger->getLedgerSeq ()))
remove = false;
else
{
@@ -480,12 +533,7 @@ void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, Ca
if (ipSub)
{
Json::Value update;
{
ScopedLockType sl (pRequest->mLock, __FILE__, __LINE__);
pRequest->doUpdate (cache, false);
update = pRequest->jvStatus;
}
Json::Value update = pRequest->doUpdate (cache, false);
update["type"] = "path_find";
ipSub->send (update, false);
remove = false;
@@ -496,11 +544,49 @@ void PathRequest::updateAll (Ledger::ref inLedger, bool newOnly, bool hasNew, Ca
if (remove)
{
++removed;
PathRequest::pointer pRequest = wRequest.lock ();
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
sRequests.erase (wRequest);
// Remove any dangling weak pointers or weak pointers that refer to this path request.
std::vector<wptr>::iterator it = sRequests.begin();
while (it != sRequests.end())
{
PathRequest::pointer itRequest = it->lock ();
if (!itRequest || (itRequest == pRequest))
{
++removed;
it = sRequests.erase (it);
}
else
++it;
}
}
mustBreak = !newRequests && getApp().getLedgerMaster().isNewPathRequest();
if (mustBreak) // We weren't handling new requests and then there was a new request
break;
}
if (mustBreak)
{ // a new request came in while we were working
newRequests = true;
}
else if (newRequests)
{ // we only did new requests, so we always need a last pass
newRequests = getApp().getLedgerMaster().isNewPathRequest();
}
else
{ // check if there are any new requests, otherwise we are done
newRequests = getApp().getLedgerMaster().isNewPathRequest();
if (!newRequests) // We did a full pass and there are no new requests
return;
}
}
while (!shouldCancel ());
WriteLog (lsDEBUG, PathRequest) << "updateAll complete " << processed << " process and " <<
removed << " removed";
}

View File

@@ -49,22 +49,21 @@ public:
bool isValid (const boost::shared_ptr<Ledger>&);
bool isValid ();
bool isNew ();
bool needsUpdate (bool newOnly, LedgerIndex index);
Json::Value getStatus ();
Json::Value doCreate (const boost::shared_ptr<Ledger>&, const Json::Value&);
Json::Value doClose (const Json::Value&);
Json::Value doStatus (const Json::Value&);
Json::Value doUpdate (const boost::shared_ptr<RippleLineCache>&, bool fast); // update jvStatus
bool doUpdate (const boost::shared_ptr<RippleLineCache>&, bool fast); // update jvStatus
static void updateAll (const boost::shared_ptr<Ledger>& ledger, bool newOnly, bool hasNew, CancelCallback shouldCancel);
static void updateAll (const boost::shared_ptr<Ledger>& ledger, CancelCallback shouldCancel);
static RippleLineCache::pointer getLineCache (Ledger::pointer& ledger, bool authoritative);
private:
void setValid ();
void resetLevel (int level);
int parseJson (const Json::Value&, bool complete);
static RippleLineCache::pointer getLineCache (Ledger::pointer& ledger);
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
@@ -83,7 +82,7 @@ private:
std::map<currIssuer_t, STPathSet> mContext;
bool bValid;
bool bNew;
LedgerIndex iLastIndex;
int iLastLevel;
bool bLastSuccess;
@@ -92,8 +91,12 @@ private:
static std::atomic <int> s_last_id;
boost::posix_time::ptime ptCreated;
boost::posix_time::ptime ptQuickReply;
boost::posix_time::ptime ptFullReply;
// Track all requests
static std::set<wptr> sRequests;
static std::vector<wptr> sRequests;
// Use a RippleLineCache
static RippleLineCache::pointer sLineCache;

View File

@@ -1479,12 +1479,7 @@ Json::Value RPCHandler::doPathFind (Json::Value params, Resource::Charge& loadTy
return rpcError (rpcINVALID_PARAMS);
}
// TODO:
// - Add support for specifying non-endpoint issuer.
// - Return fully expanded path with proof.
// - Allows clients to verify path exists.
// - Return canonicalized path.
// - From a trusted server, allows clients to use path without manipulation.
// This interface is deprecated.
Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge& loadType, Application::ScopedLockType& masterLockHolder)
{
int jc = getApp().getJobQueue ().getJobCountGE (jtCLIENT);
@@ -1500,10 +1495,15 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge&
RippleAddress raDst;
STAmount saDstAmount;
Ledger::pointer lpLedger;
Json::Value jvResult = lookupLedger (params, lpLedger);
Json::Value jvResult;
if (getConfig().RUN_STANDALONE || params.isMember("ledger") || params.isMember("ledger_index") || params.isMember("ledger_hash"))
{ // The caller specified a ledger
jvResult = lookupLedger (params, lpLedger);
if (!lpLedger)
return jvResult;
}
if (!params.isMember ("source_account"))
{
@@ -1546,7 +1546,18 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge&
else
{
loadType = Resource::feeHighBurdenRPC;
Ledger::pointer lSnapShot = boost::make_shared<Ledger> (boost::ref (*lpLedger), false);
RippleLineCache::pointer cache;
if (lpLedger)
{ // The caller specified a ledger
lpLedger = boost::make_shared<Ledger> (boost::ref (*lpLedger), false);
cache = boost::make_shared<RippleLineCache>(lpLedger);
}
else
{ // Use the default ledger and cache
lpLedger = mNetOps->getValidatedLedger();
cache = PathRequest::getLineCache(lpLedger, false);
}
masterLockHolder.unlock (); // As long as we have a locked copy of the ledger, we can unlock.
@@ -1583,7 +1594,6 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge&
jvResult["destination_account"] = raDst.humanAccountID ();
Json::Value jvArray (Json::arrayValue);
RippleLineCache::pointer cache = boost::make_shared<RippleLineCache> (lSnapShot);
for (unsigned int i = 0; i != jvSrcCurrencies.size (); ++i)
{
@@ -1646,7 +1656,7 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge&
1);
saMaxAmount.negate ();
LedgerEntrySet lesSandbox (lSnapShot, tapNONE);
LedgerEntrySet lesSandbox (lpLedger, tapNONE);
TER terResult =
RippleCalc::rippleCalc (