From 9e4710725546b012948b051d1b3d049955f4161d Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Fri, 3 May 2013 09:33:58 -0700 Subject: [PATCH] Rework dispatch of new API PF requests. --- src/cpp/ripple/LedgerMaster.cpp | 35 ++++++++++++++++++++++++++++++--- src/cpp/ripple/LedgerMaster.h | 7 ++++++- src/cpp/ripple/PFRequest.cpp | 19 ++++++++++++------ src/cpp/ripple/PFRequest.h | 4 +++- src/cpp/ripple/RPCHandler.cpp | 3 +++ 5 files changed, 57 insertions(+), 11 deletions(-) diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index d9095ca6b0..97f04e1518 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -628,6 +628,13 @@ void LedgerMaster::tryPublish() mPubThread = true; theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread", BIND_TYPE(&LedgerMaster::pubThread, this)); + mPathFindNewLedger = true; + if (!mPathFindThread) + { + mPathFindThread = true; + theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", + BIND_TYPE(&LedgerMaster::updatePaths, this)); + } } } @@ -669,22 +676,44 @@ void LedgerMaster::pubThread() void LedgerMaster::updatePaths() { Ledger::pointer lastLedger; - do { + bool newOnly = false; + { boost::recursive_mutex::scoped_lock ml(mLock); - if (lastLedger.get() == mPubLedger.get()) + if (mPathFindNewLedger || (lastLedger && (lastLedger.get() != mPubLedger.get()))) + lastLedger = mPubLedger; + else if (mPathFindNewRequest) + { + newOnly = true; + lastLedger = boost::make_shared(*mCurrentLedger, false); + } + else { mPathFindThread = false; return; } lastLedger = mPubLedger; + mPathFindNewLedger = false; + mPathFindNewRequest = false; } - PFRequest::updateAll(lastLedger); + PFRequest::updateAll(lastLedger, newOnly); } while(1); } +void LedgerMaster::newPFRequest() +{ + boost::recursive_mutex::scoped_lock ml(mLock); + mPathFindNewRequest = true; + if (!mPathFindThread) + { + mPathFindThread = true; + theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", + BIND_TYPE(&LedgerMaster::updatePaths, this)); + } +} + // vim:ts=4 diff --git a/src/cpp/ripple/LedgerMaster.h b/src/cpp/ripple/LedgerMaster.h index 45310a1a21..2d872bbdca 100644 --- a/src/cpp/ripple/LedgerMaster.h +++ b/src/cpp/ripple/LedgerMaster.h @@ -45,7 +45,10 @@ protected: std::list mPubLedgers; // List of ledgers to publish bool mPubThread; // Publish thread is running + bool mPathFindThread; // Pathfind thread is running + bool mPathFindNewLedger; + bool mPathFindNewRequest; void applyFutureTransactions(uint32 ledgerIndex); bool isValidTransaction(Transaction::ref trans); @@ -60,7 +63,8 @@ protected: public: LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0), - mMinValidations(0), mLastValidateSeq(0), mPubThread(false), mPathFindThread(false) + mMinValidations(0), mLastValidateSeq(0), mPubThread(false), + mPathFindThread(false), mPathFindNewLedger(false), mPathFindNewRequest(false) { ; } uint32 getCurrentLedgerIndex(); @@ -157,6 +161,7 @@ public: void checkAccept(const uint256& hash); void checkAccept(const uint256& hash, uint32 seq); void tryPublish(); + void newPFRequest(); static bool shouldAcquire(uint32 currentLedgerID, uint32 ledgerHistory, uint32 targetLedger); }; diff --git a/src/cpp/ripple/PFRequest.cpp b/src/cpp/ripple/PFRequest.cpp index 743152d7c6..59f2c36683 100644 --- a/src/cpp/ripple/PFRequest.cpp +++ b/src/cpp/ripple/PFRequest.cpp @@ -12,7 +12,7 @@ boost::recursive_mutex PFRequest::sLock; std::set PFRequest::sRequests; PFRequest::PFRequest(const boost::shared_ptr& subscriber) : - wpSubscriber(subscriber), jvStatus(Json::objectValue), bValid(false) + wpSubscriber(subscriber), jvStatus(Json::objectValue), bValid(false), bNew(true) { ; } @@ -23,6 +23,12 @@ bool PFRequest::isValid() return bValid; } +bool PFRequest::isNew() +{ + boost::recursive_mutex::scoped_lock sl(mLock); + return bNew; +} + bool PFRequest::isValid(Ledger::ref lrLedger) { boost::recursive_mutex::scoped_lock sl(mLock); @@ -198,6 +204,8 @@ bool PFRequest::doUpdate(RLCache::ref cache, bool fast) jvStatus = Json::objectValue; if (!isValid(cache->getLedger())) return false; + if (!fast) + bNew = false; std::set sourceCurrencies(sciSourceCurrencies); if (sourceCurrencies.empty()) @@ -248,11 +256,10 @@ bool PFRequest::doUpdate(RLCache::ref cache, bool fast) return true; } -void PFRequest::updateAll(const boost::shared_ptr& ledger) +void PFRequest::updateAll(Ledger::ref ledger, bool newOnly) { - assert(ledger->isImmutable()); - std::set requests; + { boost::recursive_mutex::scoped_lock sl(sLock); requests = sRequests; @@ -267,7 +274,7 @@ void PFRequest::updateAll(const boost::shared_ptr& ledger) { bool remove = true; PFRequest::pointer pRequest = wRequest.lock(); - if (pRequest) + if (pRequest && (!newOnly || pRequest->isNew())) { InfoSub::pointer ipSub = pRequest->wpSubscriber.lock(); if (ipSub) @@ -275,7 +282,7 @@ void PFRequest::updateAll(const boost::shared_ptr& ledger) Json::Value update; { boost::recursive_mutex::scoped_lock sl(pRequest->mLock); - pRequest->doUpdate(cache, true); + pRequest->doUpdate(cache, false); update = pRequest->jvStatus; } update["type"] = "path_find"; diff --git a/src/cpp/ripple/PFRequest.h b/src/cpp/ripple/PFRequest.h index fbfa6ab182..402bb02cc0 100644 --- a/src/cpp/ripple/PFRequest.h +++ b/src/cpp/ripple/PFRequest.h @@ -52,6 +52,7 @@ protected: std::vector vjvBridges; bool bValid; + bool bNew; // Track all requests static std::set sRequests; @@ -66,6 +67,7 @@ public: bool isValid(const boost::shared_ptr&); bool isValid(); + bool isNew(); Json::Value getStatus(); Json::Value doCreate(const boost::shared_ptr&, const Json::Value&); @@ -74,7 +76,7 @@ public: bool doUpdate(const boost::shared_ptr&, bool fast); // update jvStatus - static void updateAll(const boost::shared_ptr &); + static void updateAll(const boost::shared_ptr& ledger, bool newOnly); }; #endif diff --git a/src/cpp/ripple/RPCHandler.cpp b/src/cpp/ripple/RPCHandler.cpp index 28f9684611..6988ffa602 100644 --- a/src/cpp/ripple/RPCHandler.cpp +++ b/src/cpp/ripple/RPCHandler.cpp @@ -1185,7 +1185,10 @@ Json::Value RPCHandler::doPathFind(Json::Value jvRequest, int& cost, ScopedLock& PFRequest::pointer request = boost::make_shared(mInfoSub); Json::Value result = request->doCreate(mNetOps->getClosedLedger(), jvRequest); if (request->isValid()) + { mInfoSub->setPFRequest(request); + theApp->getLedgerMaster().newPFRequest(); + } return result; }