PathRequests object to own all path requests.

This commit is contained in:
JoelKatz
2013-12-26 22:40:24 -08:00
committed by Vinnie Falco
parent a2109b4bda
commit 4591658160
6 changed files with 191 additions and 102 deletions

View File

@@ -1034,8 +1034,7 @@ public:
try try
{ {
// VFALCO TODO Fix this global variable getApp().getPathRequests().updateAll (lastLedger, job.getCancelCallback ());
PathRequest::updateAll (lastLedger, job.getCancelCallback ());
} }
catch (SHAMapMissingNode&) catch (SHAMapMissingNode&)
{ {

View File

@@ -45,6 +45,8 @@ class LoadManagerLog;
template <> char const* LogPartition::getPartitionName <LoadManagerLog> () { return "LoadManager"; } template <> char const* LogPartition::getPartitionName <LoadManagerLog> () { return "LoadManager"; }
class ResourceManagerLog; class ResourceManagerLog;
template <> char const* LogPartition::getPartitionName <ResourceManagerLog> () { return "ResourceManager"; } template <> char const* LogPartition::getPartitionName <ResourceManagerLog> () { return "ResourceManager"; }
class PathRequestLog;
template <> char const* LogPartition::getPartitionName <PathRequestLog> () { return "PathRequest"; }
template <> char const* LogPartition::getPartitionName <CollectorManager> () { return "Collector"; } template <> char const* LogPartition::getPartitionName <CollectorManager> () { return "Collector"; }
@@ -118,6 +120,9 @@ public:
, m_orderBookDB (*m_jobQueue) , m_orderBookDB (*m_jobQueue)
, m_pathRequests ( new PathRequests (
LogPartition::getJournal <PathRequestLog> (), m_collectorManager->collector ()))
, m_ledgerMaster (LedgerMaster::New ( , m_ledgerMaster (LedgerMaster::New (
*m_jobQueue, LogPartition::getJournal <LedgerMaster> ())) *m_jobQueue, LogPartition::getJournal <LedgerMaster> ()))
@@ -270,6 +275,11 @@ public:
return m_orderBookDB; return m_orderBookDB;
} }
PathRequests& getPathRequests ()
{
return *m_pathRequests;
}
SLECache& getSLECache () SLECache& getSLECache ()
{ {
return m_sleCache; return m_sleCache;
@@ -918,6 +928,7 @@ private:
std::unique_ptr <SiteFiles::Manager> m_siteFiles; std::unique_ptr <SiteFiles::Manager> m_siteFiles;
// VFALCO TODO Make OrderBookDB abstract // VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB; OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
std::unique_ptr <LedgerMaster> m_ledgerMaster; std::unique_ptr <LedgerMaster> m_ledgerMaster;
std::unique_ptr <NetworkOPs> m_networkOPs; std::unique_ptr <NetworkOPs> m_networkOPs;
std::unique_ptr <UniqueNodeList> m_deprecatedUNL; std::unique_ptr <UniqueNodeList> m_deprecatedUNL;

View File

@@ -45,6 +45,7 @@ class SerializedLedgerEntry;
class TransactionMaster; class TransactionMaster;
class TxQueue; class TxQueue;
class LocalCredentials; class LocalCredentials;
class PathRequests;
class DatabaseCon; class DatabaseCon;
@@ -103,6 +104,7 @@ public:
virtual TxQueue& getTxQueue () = 0; virtual TxQueue& getTxQueue () = 0;
virtual LocalCredentials& getLocalCredentials () = 0; virtual LocalCredentials& getLocalCredentials () = 0;
virtual Resource::Manager& getResourceManager () = 0; virtual Resource::Manager& getResourceManager () = 0;
virtual PathRequests& getPathRequests () = 0;
virtual DatabaseCon* getRpcDB () = 0; virtual DatabaseCon* getRpcDB () = 0;
virtual DatabaseCon* getTxnDB () = 0; virtual DatabaseCon* getTxnDB () = 0;

View File

@@ -19,26 +19,27 @@
SETUP_LOG (PathRequest) SETUP_LOG (PathRequest)
// VFALCO TODO Move these globals into a PathRequests collection inteface PathRequest::PathRequest (const boost::shared_ptr<InfoSub>& subscriber, int id, PathRequests& owner)
PathRequest::StaticLockType PathRequest::sLock ("PathRequest", __FILE__, __LINE__);
std::vector <PathRequest::wptr> PathRequest::sRequests;
RippleLineCache::pointer PathRequest::sLineCache;
std::atomic <int> PathRequest::s_last_id (0);
PathRequest::PathRequest (const boost::shared_ptr<InfoSub>& subscriber)
: mLock (this, "PathRequest", __FILE__, __LINE__) : mLock (this, "PathRequest", __FILE__, __LINE__)
, mOwner (owner)
, wpSubscriber (subscriber) , wpSubscriber (subscriber)
, jvStatus (Json::objectValue) , jvStatus (Json::objectValue)
, bValid (false) , bValid (false)
, iLastIndex (0) , iLastIndex (0)
, iLastLevel (0) , iLastLevel (0)
, bLastSuccess (false) , bLastSuccess (false)
, iIdentifier (++s_last_id) , iIdentifier (id)
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " created"; if (journal().debug)
journal().debug << iIdentifier << " created";
ptCreated = boost::posix_time::microsec_clock::universal_time (); ptCreated = boost::posix_time::microsec_clock::universal_time ();
} }
Journal& PathRequest::journal ()
{
return mOwner.journal ();
}
static std::string const get_milli_diff (boost::posix_time::ptime const& after, boost::posix_time::ptime const& before) static std::string const get_milli_diff (boost::posix_time::ptime const& after, boost::posix_time::ptime const& before)
{ {
return lexicalCastThrow <std::string> (static_cast <unsigned> ((after - before).total_milliseconds())); return lexicalCastThrow <std::string> (static_cast <unsigned> ((after - before).total_milliseconds()));
@@ -64,7 +65,8 @@ PathRequest::~PathRequest()
full += get_milli_diff (ptFullReply, ptCreated); full += get_milli_diff (ptFullReply, ptCreated);
full += "ms"; full += "ms";
} }
WriteLog (lsINFO, PathRequest) << iIdentifier << " complete:" << fast << full << if (journal().info)
journal().info << iIdentifier << " complete:" << fast << full <<
" total:" << get_milli_diff(ptCreated) << "ms"; " total:" << get_milli_diff(ptCreated) << "ms";
} }
@@ -148,50 +150,39 @@ bool PathRequest::isValid (Ledger::ref lrLedger)
return bValid; return bValid;
} }
Json::Value PathRequest::doCreate (Ledger::ref lrLedger, const Json::Value& value) Json::Value PathRequest::doCreate (Ledger::ref lrLedger, RippleLineCache::ref& cache,
const Json::Value& value, bool& valid)
{ {
assert (lrLedger->isClosed ());
// Get the ledger and line cache we should use
Ledger::pointer ledger = lrLedger;
RippleLineCache::pointer cache;
{
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
cache = getLineCache (ledger, false);
}
Json::Value status; Json::Value status;
bool mValid;
if (parseJson (value, true) != PFR_PJ_INVALID)
{ {
if (parseJson (value, true) != PFR_PJ_INVALID) bValid = isValid (lrLedger);
{
mValid = isValid (ledger);
if (mValid) if (bValid)
status = doUpdate (cache, true); status = doUpdate (cache, true);
else
status = jvStatus;
}
else else
{
mValid = false;
status = jvStatus; status = jvStatus;
}
}
if (mValid)
{
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
" -> " << raDstAccount.humanAccountID ();
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
StaticScopedLockType sl (sLock, __FILE__, __LINE__);
sRequests.push_back (shared_from_this ());
} }
else else
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " invalid"; {
bValid = false;
status = jvStatus;
}
if (journal().debug)
{
if (bValid)
{
journal().debug << iIdentifier << " valid: " << raSrcAccount.humanAccountID () <<
journal().debug << iIdentifier << " Deliver: " << saDstAmount.getFullText ();
}
else
journal().debug << iIdentifier << " invalid";
}
valid = bValid;
return status; return status;
} }
@@ -289,7 +280,7 @@ int PathRequest::parseJson (const Json::Value& jvParams, bool complete)
} }
Json::Value PathRequest::doClose (const Json::Value&) Json::Value PathRequest::doClose (const Json::Value&)
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " closed"; journal().debug << iIdentifier << " closed";
ScopedLockType sl (mLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
return jvStatus; return jvStatus;
} }
@@ -308,7 +299,8 @@ void PathRequest::resetLevel (int l)
Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast) Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " update " << (fast ? "fast" : "normal"); journal().debug << iIdentifier << " update " << (fast ? "fast" : "normal");
ScopedLockType sl (mLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
if (!isValid (cache->getLedger ())) if (!isValid (cache->getLedger ()))
@@ -372,7 +364,7 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
--iLevel; --iLevel;
} }
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " processing at level " << iLevel; journal().debug << iIdentifier << " processing at level " << iLevel;
bool found = false; bool found = false;
@@ -380,7 +372,8 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
{ {
{ {
STAmount test (currIssuer.first, currIssuer.second, 1); STAmount test (currIssuer.first, currIssuer.second, 1);
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Trying to find paths: " << test.getFullText (); if (journal().debug)
journal().debug << iIdentifier << " Trying to find paths: " << test.getFullText ();
} }
bool valid; bool valid;
STPathSet& spsPaths = mContext[currIssuer]; STPathSet& spsPaths = mContext[currIssuer];
@@ -399,7 +392,7 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
currIssuer.second.isNonZero () ? currIssuer.second : currIssuer.second.isNonZero () ? currIssuer.second :
(currIssuer.first.isZero () ? ACCOUNT_XRP : raSrcAccount.getAccountID ()), 1); (currIssuer.first.isZero () ? ACCOUNT_XRP : raSrcAccount.getAccountID ()), 1);
saMaxAmount.negate (); saMaxAmount.negate ();
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Paths found, calling rippleCalc"; journal().debug << iIdentifier << " Paths found, calling rippleCalc";
TER terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct, TER terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct,
vpsExpanded, saMaxAmount, saDstAmount, vpsExpanded, saMaxAmount, saDstAmount,
raDstAccount.getAccountID (), raSrcAccount.getAccountID (), raDstAccount.getAccountID (), raSrcAccount.getAccountID (),
@@ -408,14 +401,14 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
if ((extraPath.size() > 0) && ((terResult == terNO_LINE) || (terResult == tecPATH_PARTIAL))) if ((extraPath.size() > 0) && ((terResult == terNO_LINE) || (terResult == tecPATH_PARTIAL)))
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Trying with an extra path element"; journal().debug << iIdentifier << " Trying with an extra path element";
spsPaths.addPath(extraPath); spsPaths.addPath(extraPath);
vpsExpanded.clear (); vpsExpanded.clear ();
terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct, terResult = RippleCalc::rippleCalc (lesSandbox, saMaxAmountAct, saDstAmountAct,
vpsExpanded, saMaxAmount, saDstAmount, vpsExpanded, saMaxAmount, saDstAmount,
raDstAccount.getAccountID (), raSrcAccount.getAccountID (), raDstAccount.getAccountID (), raSrcAccount.getAccountID (),
spsPaths, false, false, false, true); spsPaths, false, false, false, true);
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " Extra path element gives " << transHuman (terResult); journal().debug << iIdentifier << " Extra path element gives " << transHuman (terResult);
} }
if (terResult == tesSUCCESS) if (terResult == tesSUCCESS)
@@ -428,12 +421,12 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
} }
else else
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " rippleCalc returns " << transHuman (terResult); journal().debug << iIdentifier << " rippleCalc returns " << transHuman (terResult);
} }
} }
else else
{ {
WriteLog (lsDEBUG, PathRequest) << iIdentifier << " No paths found"; journal().debug << iIdentifier << " No paths found";
} }
} }
@@ -441,22 +434,33 @@ Json::Value PathRequest::doUpdate (RippleLineCache::ref cache, bool fast)
bLastSuccess = found; bLastSuccess = found;
if (fast && ptQuickReply.is_not_a_date_time()) if (fast && ptQuickReply.is_not_a_date_time())
{
ptQuickReply = boost::posix_time::microsec_clock::universal_time(); ptQuickReply = boost::posix_time::microsec_clock::universal_time();
mOwner.reportFast ((ptQuickReply-ptCreated).total_milliseconds());
}
else if (!fast && ptFullReply.is_not_a_date_time()) else if (!fast && ptFullReply.is_not_a_date_time())
{
ptFullReply = boost::posix_time::microsec_clock::universal_time(); ptFullReply = boost::posix_time::microsec_clock::universal_time();
mOwner.reportFull ((ptFullReply-ptCreated).total_milliseconds());
}
jvStatus["alternatives"] = jvArray; jvStatus["alternatives"] = jvArray;
return jvStatus; return jvStatus;
} }
InfoSub::pointer PathRequest::getSubscriber ()
{
return wpSubscriber.lock ();
}
/** Get the current RippleLineCache, updating it if necessary. /** Get the current RippleLineCache, updating it if necessary.
Get the correct ledger to use. Get the correct ledger to use.
*/ */
RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger, bool authoritative) RippleLineCache::pointer PathRequests::getLineCache (Ledger::pointer& ledger, bool authoritative)
{ {
StaticScopedLockType sl (sLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
uint32 lineSeq = sLineCache ? sLineCache->getLedger()->getLedgerSeq() : 0; uint32 lineSeq = mLineCache ? mLineCache->getLedger()->getLedgerSeq() : 0;
uint32 lgrSeq = ledger->getLedgerSeq(); uint32 lgrSeq = ledger->getLedgerSeq();
if ( (lineSeq == 0) || // no ledger if ( (lineSeq == 0) || // no ledger
@@ -465,18 +469,18 @@ RippleLineCache::pointer PathRequest::getLineCache (Ledger::pointer& ledger, boo
(lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
{ {
ledger = boost::make_shared<Ledger>(*ledger, false); // Take a snapshot of the ledger ledger = boost::make_shared<Ledger>(*ledger, false); // Take a snapshot of the ledger
sLineCache = boost::make_shared<RippleLineCache> (ledger); mLineCache = boost::make_shared<RippleLineCache> (ledger);
} }
else else
{ {
ledger = sLineCache->getLedger(); ledger = mLineCache->getLedger();
} }
return sLineCache; return mLineCache;
} }
void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel) void PathRequests::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
{ {
std::vector<wptr> requests; std::vector<PathRequest::wptr> requests;
LoadEvent::autoptr event (getApp().getJobQueue().getLoadEventAP(jtPATH_FIND, "PathRequest::updateAll")); LoadEvent::autoptr event (getApp().getJobQueue().getLoadEventAP(jtPATH_FIND, "PathRequest::updateAll"));
@@ -484,15 +488,15 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
Ledger::pointer ledger = inLedger; Ledger::pointer ledger = inLedger;
RippleLineCache::pointer cache; RippleLineCache::pointer cache;
{ {
StaticScopedLockType sl (sLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
requests = sRequests; requests = mRequests;
cache = getLineCache (ledger, true); cache = getLineCache (ledger, true);
} }
bool newRequests = getApp().getLedgerMaster().isNewPathRequest(); bool newRequests = getApp().getLedgerMaster().isNewPathRequest();
bool mustBreak = false; bool mustBreak = false;
WriteLog (lsTRACE, PathRequest) << "updateAll seq=" << ledger->getLedgerSeq() << ", " << journal().trace << "updateAll seq=" << ledger->getLedgerSeq() << ", " <<
requests.size() << " requests"; requests.size() << " requests";
int processed = 0, removed = 0; int processed = 0, removed = 0;
@@ -500,15 +504,15 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
{ {
{ // Get the latest requests, cache, and ledger { // Get the latest requests, cache, and ledger
StaticScopedLockType sl (sLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
if (sRequests.empty()) if (mRequests.empty())
return; return;
// Newest request is last in sRequests, but we want to serve it first // Newest request is last in mRequests, but we want to serve it first
requests.empty(); requests.empty();
requests.reserve(sRequests.size ()); requests.reserve (mRequests.size ());
BOOST_REVERSE_FOREACH (wptr& req, sRequests) BOOST_REVERSE_FOREACH (PathRequest::wptr& req, mRequests)
{ {
requests.push_back (req); requests.push_back (req);
} }
@@ -516,7 +520,7 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
cache = getLineCache (ledger, false); cache = getLineCache (ledger, false);
} }
BOOST_FOREACH (wref wRequest, requests) BOOST_FOREACH (PathRequest::wref wRequest, requests)
{ {
if (shouldCancel()) if (shouldCancel())
break; break;
@@ -530,8 +534,7 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
remove = false; remove = false;
else else
{ {
InfoSub::pointer ipSub = pRequest->wpSubscriber.lock (); InfoSub::pointer ipSub = pRequest->getSubscriber ();
if (ipSub) if (ipSub)
{ {
Json::Value update = pRequest->doUpdate (cache, false); Json::Value update = pRequest->doUpdate (cache, false);
@@ -547,17 +550,17 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
{ {
PathRequest::pointer pRequest = wRequest.lock (); PathRequest::pointer pRequest = wRequest.lock ();
StaticScopedLockType sl (sLock, __FILE__, __LINE__); ScopedLockType sl (mLock, __FILE__, __LINE__);
// Remove any dangling weak pointers or weak pointers that refer to this path request. // Remove any dangling weak pointers or weak pointers that refer to this path request.
std::vector<wptr>::iterator it = sRequests.begin(); std::vector<PathRequest::wptr>::iterator it = mRequests.begin();
while (it != sRequests.end()) while (it != mRequests.end())
{ {
PathRequest::pointer itRequest = it->lock (); PathRequest::pointer itRequest = it->lock ();
if (!itRequest || (itRequest == pRequest)) if (!itRequest || (itRequest == pRequest))
{ {
++removed; ++removed;
it = sRequests.erase (it); it = mRequests.erase (it);
} }
else else
++it; ++it;
@@ -588,6 +591,39 @@ void PathRequest::updateAll (Ledger::ref inLedger, CancelCallback shouldCancel)
} }
while (!shouldCancel ()); while (!shouldCancel ());
WriteLog (lsDEBUG, PathRequest) << "updateAll complete " << processed << " process and " << journal().debug << "updateAll complete " << processed << " process and " <<
removed << " removed"; removed << " removed";
} }
Json::Value PathRequests::makePathRequest(
boost::shared_ptr <InfoSub> const& subscriber,
const boost::shared_ptr<Ledger>& inLedger,
const Json::Value& requestJson)
{
PathRequest::pointer req = boost::make_shared<PathRequest> (subscriber, ++mLastIdentifier, *this);
Ledger::pointer ledger = inLedger;
RippleLineCache::pointer cache;
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
cache = getLineCache (ledger, false);
}
bool valid = false;
Json::Value result = req->doCreate (ledger, cache, requestJson, valid);
if (valid)
{
{
ScopedLockType sl (mLock, __FILE__, __LINE__);
mRequests.push_back (req);
}
subscriber->setPathRequest (req);
getApp().getLedgerMaster().newPathRequest();
}
return result;
}
// vim:ts=4

View File

@@ -24,6 +24,7 @@
// The request issuer must maintain a strong pointer // The request issuer must maintain a strong pointer
class RippleLineCache; class RippleLineCache;
class PathRequests;
// Return values from parseJson <0 = invalid, >0 = valid // Return values from parseJson <0 = invalid, >0 = valid
#define PFR_PJ_INVALID -1 #define PFR_PJ_INVALID -1
@@ -44,7 +45,7 @@ public:
public: public:
// VFALCO TODO Break the cyclic dependency on InfoSub // VFALCO TODO Break the cyclic dependency on InfoSub
explicit PathRequest (boost::shared_ptr <InfoSub> const& subscriber); explicit PathRequest (boost::shared_ptr <InfoSub> const& subscriber, int id, PathRequests&);
~PathRequest (); ~PathRequest ();
bool isValid (const boost::shared_ptr<Ledger>&); bool isValid (const boost::shared_ptr<Ledger>&);
@@ -52,13 +53,13 @@ public:
bool needsUpdate (bool newOnly, LedgerIndex index); bool needsUpdate (bool newOnly, LedgerIndex index);
Json::Value getStatus (); Json::Value getStatus ();
Json::Value doCreate (const boost::shared_ptr<Ledger>&, const Json::Value&); Json::Value doCreate (const boost::shared_ptr<Ledger>&, const RippleLineCache::pointer&,
const Json::Value&, bool&);
Json::Value doClose (const Json::Value&); Json::Value doClose (const Json::Value&);
Json::Value doStatus (const Json::Value&); Json::Value doStatus (const Json::Value&);
Json::Value doUpdate (const boost::shared_ptr<RippleLineCache>&, bool fast); // update jvStatus Json::Value doUpdate (const boost::shared_ptr<RippleLineCache>&, bool fast); // update jvStatus
InfoSub::pointer getSubscriber ();
static void updateAll (const boost::shared_ptr<Ledger>& ledger, CancelCallback shouldCancel); Journal& journal ();
static RippleLineCache::pointer getLineCache (Ledger::pointer& ledger, bool authoritative);
private: private:
void setValid (); void setValid ();
@@ -69,6 +70,8 @@ private:
typedef LockType::ScopedLockType ScopedLockType; typedef LockType::ScopedLockType ScopedLockType;
LockType mLock; LockType mLock;
PathRequests& mOwner;
boost::weak_ptr<InfoSub> wpSubscriber; // Who this request came from boost::weak_ptr<InfoSub> wpSubscriber; // Who this request came from
Json::Value jvId; Json::Value jvId;
Json::Value jvStatus; // Last result Json::Value jvStatus; // Last result
@@ -78,7 +81,7 @@ private:
RippleAddress raDstAccount; RippleAddress raDstAccount;
STAmount saDstAmount; STAmount saDstAmount;
std::set<currIssuer_t> sciSourceCurrencies; std::set<currIssuer_t> sciSourceCurrencies;
std::vector<Json::Value> vjvBridges; // std::vector<Json::Value> vjvBridges;
std::map<currIssuer_t, STPathSet> mContext; std::map<currIssuer_t, STPathSet> mContext;
bool bValid; bool bValid;
@@ -88,22 +91,69 @@ private:
bool bLastSuccess; bool bLastSuccess;
int iIdentifier; int iIdentifier;
static std::atomic <int> s_last_id;
boost::posix_time::ptime ptCreated; boost::posix_time::ptime ptCreated;
boost::posix_time::ptime ptQuickReply; boost::posix_time::ptime ptQuickReply;
boost::posix_time::ptime ptFullReply; boost::posix_time::ptime ptFullReply;
};
class PathRequests
{
public:
PathRequests (Journal journal, std::shared_ptr <insight::Collector> const& collector)
: mJournal (journal)
, mLastIdentifier (0)
, mLock ("PathRequests", __FILE__, __LINE__)
{
mFast = collector->make_event ("pathfind_fast");
mFull = collector->make_event ("pathfind_full");
}
void updateAll (const boost::shared_ptr<Ledger>& ledger, CancelCallback shouldCancel);
RippleLineCache::pointer getLineCache (Ledger::pointer& ledger, bool authoritative);
Json::Value makePathRequest (
boost::shared_ptr <InfoSub> const& subscriber,
const boost::shared_ptr<Ledger>& ledger,
const Json::Value& request);
Journal& journal ()
{
return mJournal;
}
void reportFast (int milliseconds)
{
mFast.notify (static_cast < insight::Event::value_type> (milliseconds));
}
void reportFull (int milliseconds)
{
mFull.notify (static_cast < insight::Event::value_type> (milliseconds));
}
private:
Journal mJournal;
insight::Event mFast;
insight::Event mFull;
// Track all requests // Track all requests
static std::vector<wptr> sRequests; std::vector<PathRequest::wptr> mRequests;
// Use a RippleLineCache // Use a RippleLineCache
static RippleLineCache::pointer sLineCache; RippleLineCache::pointer mLineCache;
Atomic<int> mLastIdentifier;
typedef RippleRecursiveMutex LockType;
typedef LockType::ScopedLockType ScopedLockType;
LockType mLock;
typedef RippleRecursiveMutex StaticLockType;
typedef LockType::ScopedLockType StaticScopedLockType;
static StaticLockType sLock;
}; };
#endif #endif

View File

@@ -1443,16 +1443,7 @@ Json::Value RPCHandler::doPathFind (Json::Value params, Resource::Charge& loadTy
{ {
loadType = Resource::feeHighBurdenRPC; loadType = Resource::feeHighBurdenRPC;
mInfoSub->clearPathRequest (); mInfoSub->clearPathRequest ();
PathRequest::pointer request = boost::make_shared<PathRequest> (mInfoSub); return getApp().getPathRequests().makePathRequest (mInfoSub, lpLedger, params);
Json::Value result = request->doCreate (lpLedger, params);
if (request->isValid ())
{
mInfoSub->setPathRequest (request);
getApp().getLedgerMaster ().newPathRequest ();
}
return result;
} }
if (sSubCommand == "close") if (sSubCommand == "close")
@@ -1556,7 +1547,7 @@ Json::Value RPCHandler::doRipplePathFind (Json::Value params, Resource::Charge&
else else
{ // Use the default ledger and cache { // Use the default ledger and cache
lpLedger = mNetOps->getValidatedLedger(); lpLedger = mNetOps->getValidatedLedger();
cache = PathRequest::getLineCache(lpLedger, false); cache = getApp().getPathRequests().getLineCache(lpLedger, false);
} }
masterLockHolder.unlock (); // As long as we have a locked copy of the ledger, we can unlock. masterLockHolder.unlock (); // As long as we have a locked copy of the ledger, we can unlock.