From b6871cba8d0d3e1fc62f5b829f6167e1735955d1 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Thu, 2 May 2013 16:13:36 -0700 Subject: [PATCH] Dispatch new pathfinding requests. --- src/cpp/ripple/JobQueue.cpp | 1 + src/cpp/ripple/JobQueue.h | 21 +++++++++++---------- src/cpp/ripple/LedgerMaster.cpp | 30 ++++++++++++++++++++++++++++++ src/cpp/ripple/LedgerMaster.h | 4 +++- 4 files changed, 45 insertions(+), 11 deletions(-) diff --git a/src/cpp/ripple/JobQueue.cpp b/src/cpp/ripple/JobQueue.cpp index e89b2184d..f6ce0bcf4 100644 --- a/src/cpp/ripple/JobQueue.cpp +++ b/src/cpp/ripple/JobQueue.cpp @@ -43,6 +43,7 @@ const char* Job::toString(JobType t) case jtPROOFWORK: return "proofOfWork"; case jtPROPOSAL_ut: return "untrustedProposal"; case jtLEDGER_DATA: return "ledgerData"; + case jtUPDATE_PF: return "updatePaths"; case jtCLIENT: return "clientCommand"; case jtTRANSACTION: return "transaction"; case jtPUBLEDGER: return "publishNewLedger"; diff --git a/src/cpp/ripple/JobQueue.h b/src/cpp/ripple/JobQueue.h index 14710d95f..76d444b92 100644 --- a/src/cpp/ripple/JobQueue.h +++ b/src/cpp/ripple/JobQueue.h @@ -28,16 +28,17 @@ enum JobType jtPROOFWORK = 4, // A proof of work demand from another server jtPROPOSAL_ut = 5, // A proposal from an untrusted source jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring - jtCLIENT = 7, // A websocket command from the client - jtTRANSACTION = 8, // A transaction received from the network - jtPUBLEDGER = 9, // Publish a fully-accepted ledger - jtWAL = 10, // Write-ahead logging - jtVALIDATION_t = 11, // A validation from a trusted source - jtWRITE = 12, // Write out hashed objects - jtTRANSACTION_l = 13, // A local transaction - jtPROPOSAL_t = 14, // A proposal from a trusted source - jtADMIN = 15, // An administrative operation - jtDEATH = 16, // job of death, used internally + jtUPDATE_PF = 7, // Update pathfinding requests + jtCLIENT = 8, // A websocket command from the client + jtTRANSACTION = 9, // A transaction received from the network + jtPUBLEDGER = 10, // Publish a fully-accepted ledger + jtWAL = 11, // Write-ahead logging + jtVALIDATION_t = 12, // A validation from a trusted source + jtWRITE = 13, // Write out hashed objects + jtTRANSACTION_l = 14, // A local transaction + jtPROPOSAL_t = 15, // A proposal from a trusted source + jtADMIN = 16, // An administrative operation + jtDEATH = 17, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, diff --git a/src/cpp/ripple/LedgerMaster.cpp b/src/cpp/ripple/LedgerMaster.cpp index 377650973..d9095ca6b 100644 --- a/src/cpp/ripple/LedgerMaster.cpp +++ b/src/cpp/ripple/LedgerMaster.cpp @@ -6,6 +6,7 @@ #include "Application.h" #include "RippleAddress.h" #include "Log.h" +#include "PFRequest.h" SETUP_LOG(); @@ -633,6 +634,7 @@ void LedgerMaster::tryPublish() void LedgerMaster::pubThread() { std::list ledgers; + bool published = false; while (1) { @@ -644,6 +646,12 @@ void LedgerMaster::pubThread() if (ledgers.empty()) { mPubThread = false; + if (published && !mPathFindThread) + { + mPathFindThread = true; + theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths", + BIND_TYPE(&LedgerMaster::updatePaths, this)); + } return; } } @@ -653,8 +661,30 @@ void LedgerMaster::pubThread() cLog(lsDEBUG) << "Publishing ledger " << l->getLedgerSeq(); setFullLedger(l); // OPTIMIZEME: This is actually more work than we need to do theApp->getOPs().pubLedger(l); + published = true; } } } +void LedgerMaster::updatePaths() +{ + Ledger::pointer lastLedger; + + do + { + { + boost::recursive_mutex::scoped_lock ml(mLock); + if (lastLedger.get() == mPubLedger.get()) + { + mPathFindThread = false; + return; + } + lastLedger = mPubLedger; + } + + PFRequest::updateAll(lastLedger); + + } while(1); +} + // vim:ts=4 diff --git a/src/cpp/ripple/LedgerMaster.h b/src/cpp/ripple/LedgerMaster.h index 6c9d87122..45310a1a2 100644 --- a/src/cpp/ripple/LedgerMaster.h +++ b/src/cpp/ripple/LedgerMaster.h @@ -45,6 +45,7 @@ protected: std::list mPubLedgers; // List of ledgers to publish bool mPubThread; // Publish thread is running + bool mPathFindThread; // Pathfind thread is running void applyFutureTransactions(uint32 ledgerIndex); bool isValidTransaction(Transaction::ref trans); @@ -54,11 +55,12 @@ protected: void asyncAccept(Ledger::pointer); void missingAcquireComplete(LedgerAcquire::pointer); void pubThread(); + void updatePaths(); public: LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0), - mMinValidations(0), mLastValidateSeq(0), mPubThread(false) + mMinValidations(0), mLastValidateSeq(0), mPubThread(false), mPathFindThread(false) { ; } uint32 getCurrentLedgerIndex();