Move PathRequests to separate files

This commit is contained in:
Vinnie Falco
2014-01-21 09:05:44 -05:00
parent fe83f471f5
commit c95dccfec6
8 changed files with 314 additions and 263 deletions

View File

@@ -19,8 +19,11 @@
SETUP_LOG (PathRequest)
PathRequest::PathRequest (const boost::shared_ptr<InfoSub>& subscriber, int id, PathRequests& owner)
: mLock (this, "PathRequest", __FILE__, __LINE__)
PathRequest::PathRequest (
const boost::shared_ptr<InfoSub>& subscriber, int id, PathRequests& owner,
Journal journal)
: m_journal (journal)
, mLock (this, "PathRequest", __FILE__, __LINE__)
, mOwner (owner)
, wpSubscriber (subscriber)
, jvStatus (Json::objectValue)
@@ -30,16 +33,11 @@ PathRequest::PathRequest (const boost::shared_ptr<InfoSub>& subscriber, int id,
, bLastSuccess (false)
, iIdentifier (id)
{
if (journal().debug)
journal().debug << iIdentifier << " created";
if (m_journal.debug)
m_journal.debug << iIdentifier << " created";
ptCreated = boost::posix_time::microsec_clock::universal_time ();
}
Journal& PathRequest::journal ()
{
return mOwner.journal ();
}
static std::string const get_milli_diff (boost::posix_time::ptime const& after, boost::posix_time::ptime const& before)
{
return lexicalCastThrow <std::string> (static_cast <unsigned> ((after - before).total_milliseconds()));
@@ -65,8 +63,8 @@ PathRequest::~PathRequest()
full += get_milli_diff (ptFullReply, ptCreated);
full += "ms";
}
if (journal().info)
journal().info << iIdentifier << " complete:" << fast << full <<
if (m_journal.info)
m_journal.info << iIdentifier << " complete:" << fast << full <<
" total:" << get_milli_diff(ptCreated) << "ms";
}
@@ -171,15 +169,15 @@ Json::Value PathRequest::doCreate (Ledger::ref lrLedger, RippleLineCache::ref& c
status = jvStatus;
}
if (journal().debug)
if (m_journal.debug)
{
if (bValid)
{
journal().debug << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
journal().debug << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
m_journal.debug << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
m_journal.debug << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
}
else
journal().debug << iIdentifier << " invalid";
m_journal.debug << iIdentifier << " invalid";
}
valid = bValid;
@@ -280,7 +278,7 @@ int PathRequest::parseJson (const Json::Value& jvParams, bool complete)
}
Json::Value PathRequest::doClose (const Json::Value&)
{
journal().debug << iIdentifier << " closed";
m_journal.debug << iIdentifier << " closed";
ScopedLockType sl (mLock, __FILE__, __LINE__);
return jvStatus;
}
@@ -299,7 +297,7 @@ void PathRequest::resetLevel (int l)
Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
{
journal().debug << iIdentifier << " update " << (fast ? "fast" : "normal");
m_journal.debug << iIdentifier << " update " << (fast ? "fast" : "normal");
ScopedLockType sl (mLock, __FILE__, __LINE__);
@@ -364,7 +362,7 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
--iLevel;
}
journal().debug << iIdentifier << " processing at level " << iLevel;
m_journal.debug << iIdentifier << " processing at level " << iLevel;
bool found = false;
@@ -372,8 +370,8 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
{
{
STAmount test (currIssuer.first, currIssuer.second, 1);
if (journal().debug)
journal().debug << iIdentifier << " Trying to find paths: " << test.getFullText ();
if (m_journal.debug)
m_journal.debug << iIdentifier << " Trying to find paths: " << test.getFullText ();
}
bool valid;
STPathSet& spsPaths = mContext[currIssuer];
@@ -392,7 +390,7 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
currIssuer.second.isNonZero () ? currIssuer.second :
(currIssuer.first.isZero () ? ACCOUNT_XRP : raSrcAccount.getAccountID ()), 1);
saMaxAmount.negate ();
journal().debug << iIdentifier << " Paths found, calling rippleCalc";
m_journal.debug << iIdentifier << " Paths found, calling rippleCalc";
TER terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct,
vpsExpanded, saMaxAmount, saDstAmount,
raDstAccount.getAccountID (), raSrcAccount.getAccountID (),
@@ -401,14 +399,14 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
if ((extraPath.size() > 0) && ((terResult == terNO_LINE) || (terResult == tecPATH_PARTIAL)))
{
journal().debug << iIdentifier << " Trying with an extra path element";
m_journal.debug << iIdentifier << " Trying with an extra path element";
spsPaths.addPath(extraPath);
vpsExpanded.clear ();
terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct,
vpsExpanded, saMaxAmount, saDstAmount,
raDstAccount.getAccountID (), raSrcAccount.getAccountID (),
spsPaths, false, false, false, true);
journal().debug << iIdentifier << " Extra path element gives " << transHuman (terResult);
m_journal.debug << iIdentifier << " Extra path element gives " << transHuman (terResult);
}
if (terResult == tesSUCCESS)
@@ -421,12 +419,12 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
}
else
{
journal().debug << iIdentifier << " rippleCalc returns " << transHuman (terResult);
m_journal.debug << iIdentifier << " rippleCalc returns " << transHuman (terResult);
}
}
else
{
journal().debug << iIdentifier << " No paths found";
m_journal.debug << iIdentifier << " No paths found";
}
}
@@ -453,177 +451,3 @@ InfoSub::pointer PathRequest::getSubscriber ()
return wpSubscriber.lock ();
}
/** Get the current RippleLineCache, updating it if necessary.
Get the correct ledger to use.
*/
RippleLineCache::pointer PathRequests::getLineCache (Ledger::pointer& ledger, bool authoritative)
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
uint32 lineSeq = mLineCache ? mLineCache->getLedger()->getLedgerSeq() : 0;
uint32 lgrSeq = ledger->getLedgerSeq();
if ( (lineSeq == 0) || // no ledger
(authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
(authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
(lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
{
ledger = boost::make_shared<Ledger>(*ledger, false); // Take a snapshot of the ledger
mLineCache = boost::make_shared<RippleLineCache> (ledger);
}
else
{
ledger = mLineCache->getLedger();
}
return mLineCache;
}
void PathRequests::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
{
std::vector<PathRequest::wptr> requests;
LoadEvent::autoptr event (getApp().getJobQueue().getLoadEventAP(jtPATH_FIND, "PathRequest::updateAll"));
// Get the ledger and cache we should be using
Ledger::pointer ledger = inLedger;
RippleLineCache::pointer cache;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
requests = mRequests;
cache = getLineCache (ledger, true);
}
bool newRequests = getApp().getLedgerMaster().isNewPathRequest();
bool mustBreak = false;
journal().trace << "updateAll seq=" << ledger->getLedgerSeq() << ", " <<
requests.size() << " requests";
int processed = 0, removed = 0;
do
{
{ // Get the latest requests, cache, and ledger
ScopedLockType sl (mLock, __FILE__, __LINE__);
if (mRequests.empty())
return;
// Newest request is last in mRequests, but we want to serve it first
requests.empty();
requests.reserve (mRequests.size ());
BOOST_REVERSE_FOREACH (PathRequest::wptr& req, mRequests)
{
requests.push_back (req);
}
cache = getLineCache (ledger, false);
}
BOOST_FOREACH (PathRequest::wref wRequest, requests)
{
if (shouldCancel())
break;
bool remove = true;
PathRequest::pointer pRequest = wRequest.lock ();
if (pRequest)
{
if (!pRequest->needsUpdate (newRequests, ledger->getLedgerSeq ()))
remove = false;
else
{
InfoSub::pointer ipSub = pRequest->getSubscriber ();
if (ipSub)
{
Json::Value update = pRequest->doUpdate (cache, false);
update["type"] = "path_find";
ipSub->send (update, false);
remove = false;
++processed;
}
}
}
if (remove)
{
PathRequest::pointer pRequest = wRequest.lock ();
ScopedLockType sl (mLock, __FILE__, __LINE__);
// Remove any dangling weak pointers or weak pointers that refer to this path request.
std::vector<PathRequest::wptr>::iterator it = mRequests.begin();
while (it != mRequests.end())
{
PathRequest::pointer itRequest = it->lock ();
if (!itRequest || (itRequest == pRequest))
{
++removed;
it = mRequests.erase (it);
}
else
++it;
}
}
mustBreak = !newRequests && getApp().getLedgerMaster().isNewPathRequest();
if (mustBreak) // We weren't handling new requests and then there was a new request
break;
}
if (mustBreak)
{ // a new request came in while we were working
newRequests = true;
}
else if (newRequests)
{ // we only did new requests, so we always need a last pass
newRequests = getApp().getLedgerMaster().isNewPathRequest();
}
else
{ // check if there are any new requests, otherwise we are done
newRequests = getApp().getLedgerMaster().isNewPathRequest();
if (!newRequests) // We did a full pass and there are no new requests
return;
}
}
while (!shouldCancel ());
journal().debug << "updateAll complete " << processed << " process and " <<
removed << " removed";
}
Json::Value PathRequests::makePathRequest(
boost::shared_ptr <InfoSub> const& subscriber,
const boost::shared_ptr<Ledger>& inLedger,
const Json::Value& requestJson)
{
PathRequest::pointer req = boost::make_shared<PathRequest> (subscriber, ++mLastIdentifier, *this);
Ledger::pointer ledger = inLedger;
RippleLineCache::pointer cache;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
cache = getLineCache (ledger, false);
}
bool valid = false;
Json::Value result = req->doCreate (ledger, cache, requestJson, valid);
if (valid)
{
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mRequests.push_back (req);
}
subscriber->setPathRequest (req);
getApp().getLedgerMaster().newPathRequest();
}
return result;
}
// vim:ts=4