Rework dispatch of new API PF requests.

This commit is contained in:
JoelKatz
2013-05-03 09:33:58 -07:00
parent 0e9973d3a8
commit 9e47107255
5 changed files with 57 additions and 11 deletions

View File

@@ -628,6 +628,13 @@ void LedgerMaster::tryPublish()
mPubThread = true;
theApp->getJobQueue().addJob(jtPUBLEDGER, "Ledger::pubThread",
BIND_TYPE(&LedgerMaster::pubThread, this));
mPathFindNewLedger = true;
if (!mPathFindThread)
{
mPathFindThread = true;
theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths",
BIND_TYPE(&LedgerMaster::updatePaths, this));
}
}
}
@@ -669,22 +676,44 @@ void LedgerMaster::pubThread()
void LedgerMaster::updatePaths()
{
Ledger::pointer lastLedger;
do
{
bool newOnly = false;
{
boost::recursive_mutex::scoped_lock ml(mLock);
if (lastLedger.get() == mPubLedger.get())
if (mPathFindNewLedger || (lastLedger && (lastLedger.get() != mPubLedger.get())))
lastLedger = mPubLedger;
else if (mPathFindNewRequest)
{
newOnly = true;
lastLedger = boost::make_shared<Ledger>(*mCurrentLedger, false);
}
else
{
mPathFindThread = false;
return;
}
lastLedger = mPubLedger;
mPathFindNewLedger = false;
mPathFindNewRequest = false;
}
PFRequest::updateAll(lastLedger);
PFRequest::updateAll(lastLedger, newOnly);
} while(1);
}
void LedgerMaster::newPFRequest()
{
boost::recursive_mutex::scoped_lock ml(mLock);
mPathFindNewRequest = true;
if (!mPathFindThread)
{
mPathFindThread = true;
theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths",
BIND_TYPE(&LedgerMaster::updatePaths, this));
}
}
// vim:ts=4

View File

@@ -45,7 +45,10 @@ protected:
std::list<Ledger::pointer> mPubLedgers; // List of ledgers to publish
bool mPubThread; // Publish thread is running
bool mPathFindThread; // Pathfind thread is running
bool mPathFindNewLedger;
bool mPathFindNewRequest;
void applyFutureTransactions(uint32 ledgerIndex);
bool isValidTransaction(Transaction::ref trans);
@@ -60,7 +63,8 @@ protected:
public:
LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0),
mMinValidations(0), mLastValidateSeq(0), mPubThread(false), mPathFindThread(false)
mMinValidations(0), mLastValidateSeq(0), mPubThread(false),
mPathFindThread(false), mPathFindNewLedger(false), mPathFindNewRequest(false)
{ ; }
uint32 getCurrentLedgerIndex();
@@ -157,6 +161,7 @@ public:
void checkAccept(const uint256& hash);
void checkAccept(const uint256& hash, uint32 seq);
void tryPublish();
void newPFRequest();
static bool shouldAcquire(uint32 currentLedgerID, uint32 ledgerHistory, uint32 targetLedger);
};

View File

@@ -12,7 +12,7 @@ boost::recursive_mutex PFRequest::sLock;
std::set<PFRequest::wptr> PFRequest::sRequests;
PFRequest::PFRequest(const boost::shared_ptr<InfoSub>& subscriber) :
wpSubscriber(subscriber), jvStatus(Json::objectValue), bValid(false)
wpSubscriber(subscriber), jvStatus(Json::objectValue), bValid(false), bNew(true)
{
;
}
@@ -23,6 +23,12 @@ bool PFRequest::isValid()
return bValid;
}
bool PFRequest::isNew()
{
boost::recursive_mutex::scoped_lock sl(mLock);
return bNew;
}
bool PFRequest::isValid(Ledger::ref lrLedger)
{
boost::recursive_mutex::scoped_lock sl(mLock);
@@ -198,6 +204,8 @@ bool PFRequest::doUpdate(RLCache::ref cache, bool fast)
jvStatus = Json::objectValue;
if (!isValid(cache->getLedger()))
return false;
if (!fast)
bNew = false;
std::set<currIssuer_t> sourceCurrencies(sciSourceCurrencies);
if (sourceCurrencies.empty())
@@ -248,11 +256,10 @@ bool PFRequest::doUpdate(RLCache::ref cache, bool fast)
return true;
}
void PFRequest::updateAll(const boost::shared_ptr<Ledger>& ledger)
void PFRequest::updateAll(Ledger::ref ledger, bool newOnly)
{
assert(ledger->isImmutable());
std::set<wptr> requests;
{
boost::recursive_mutex::scoped_lock sl(sLock);
requests = sRequests;
@@ -267,7 +274,7 @@ void PFRequest::updateAll(const boost::shared_ptr<Ledger>& ledger)
{
bool remove = true;
PFRequest::pointer pRequest = wRequest.lock();
if (pRequest)
if (pRequest && (!newOnly || pRequest->isNew()))
{
InfoSub::pointer ipSub = pRequest->wpSubscriber.lock();
if (ipSub)
@@ -275,7 +282,7 @@ void PFRequest::updateAll(const boost::shared_ptr<Ledger>& ledger)
Json::Value update;
{
boost::recursive_mutex::scoped_lock sl(pRequest->mLock);
pRequest->doUpdate(cache, true);
pRequest->doUpdate(cache, false);
update = pRequest->jvStatus;
}
update["type"] = "path_find";

View File

@@ -52,6 +52,7 @@ protected:
std::vector<Json::Value> vjvBridges;
bool bValid;
bool bNew;
// Track all requests
static std::set<wptr> sRequests;
@@ -66,6 +67,7 @@ public:
bool isValid(const boost::shared_ptr<Ledger>&);
bool isValid();
bool isNew();
Json::Value getStatus();
Json::Value doCreate(const boost::shared_ptr<Ledger>&, const Json::Value&);
@@ -74,7 +76,7 @@ public:
bool doUpdate(const boost::shared_ptr<RLCache>&, bool fast); // update jvStatus
static void updateAll(const boost::shared_ptr<Ledger> &);
static void updateAll(const boost::shared_ptr<Ledger>& ledger, bool newOnly);
};
#endif

View File

@@ -1185,7 +1185,10 @@ Json::Value RPCHandler::doPathFind(Json::Value jvRequest, int& cost, ScopedLock&
PFRequest::pointer request = boost::make_shared<PFRequest>(mInfoSub);
Json::Value result = request->doCreate(mNetOps->getClosedLedger(), jvRequest);
if (request->isValid())
{
mInfoSub->setPFRequest(request);
theApp->getLedgerMaster().newPFRequest();
}
return result;
}