Pathfinding dispatch improvements

* Simplify decision whether to update a path
* Prevent pathfinding threads from blocking each other
This commit is contained in:
JoelKatz
2014-05-02 11:23:34 -07:00
committed by Vinnie Falco
parent 11d0c89a59
commit 112d383698
3 changed files with 38 additions and 28 deletions

View File

@@ -29,7 +29,8 @@ const boost::shared_ptr<InfoSub>& subscriber, int id, PathRequests& owner,
, wpSubscriber (subscriber)
, jvStatus (Json::objectValue)
, bValid (false)
, iLastIndex (0)
, mLastIndex (0)
, mInProgress (false)
, iLastLevel (0)
, bLastSuccess (false)
, iIdentifier (id)
@@ -77,38 +78,43 @@ bool PathRequest::isValid ()
bool PathRequest::isNew ()
{
// does this path request still need its first full path
return iLastIndex.load() == 0;
ScopedLockType sl (mIndexLock);
// does this path request still need its first full path
return mLastIndex == 0;
}
bool PathRequest::needsUpdate (bool newOnly, LedgerIndex index)
{
LedgerIndex lastIndex = iLastIndex.load();
for (;;)
ScopedLockType sl (mIndexLock);
if (mInProgress)
{
if (newOnly)
{
// Is this request new
if (lastIndex != 0)
return false;
// This thread marks this request handled
if (iLastIndex.compare_exchange_weak (lastIndex, 1,
std::memory_order_release, std::memory_order_relaxed))
return true;
}
else
{
// Has the request already been handled?
if (lastIndex >= index)
return false;
// This thread marks this request handled
if (iLastIndex.compare_exchange_weak (lastIndex, index,
std::memory_order_release, std::memory_order_relaxed))
return true;
}
// Another thread is handling this
return false;
}
if (newOnly && (mLastIndex != 0))
{
// Only handling new requests, this isn't new
return false;
}
if (mLastIndex >= index)
{
return false;
}
mInProgress = true;
return true;
}
void PathRequest::updateComplete ()
{
ScopedLockType sl (mIndexLock);
assert (mInProgress);
mInProgress = false;
}
bool PathRequest::isValid (RippleLineCache::ref crCache)

View File

@@ -57,6 +57,7 @@ public:
bool isValid ();
bool isNew ();
bool needsUpdate (bool newOnly, LedgerIndex index);
void updateComplete ();
Json::Value getStatus ();
Json::Value doCreate (const boost::shared_ptr<Ledger>&, const RippleLineCache::pointer&,
@@ -93,7 +94,9 @@ private:
bool bValid;
std::atomic<LedgerIndex> iLastIndex;
LockType mIndexLock;
LedgerIndex mLastIndex;
bool mInProgress;
int iLastLevel;
bool bLastSuccess;

View File

@@ -90,6 +90,7 @@ void PathRequests::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
if (!ipSub->getConsumer ().warn ())
{
Json::Value update = pRequest->doUpdate (cache, false);
pRequest->updateComplete ();
update["type"] = "path_find";
ipSub->send (update, false);
remove = false;