Dispatch new pathfinding requests.

This commit is contained in:
JoelKatz
2013-05-02 16:13:36 -07:00
parent 79a879a3b8
commit b6871cba8d
4 changed files with 45 additions and 11 deletions

View File

@@ -43,6 +43,7 @@ const char* Job::toString(JobType t)
case jtPROOFWORK: return "proofOfWork";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtLEDGER_DATA: return "ledgerData";
case jtUPDATE_PF: return "updatePaths";
case jtCLIENT: return "clientCommand";
case jtTRANSACTION: return "transaction";
case jtPUBLEDGER: return "publishNewLedger";

View File

@@ -28,16 +28,17 @@ enum JobType
jtPROOFWORK = 4, // A proof of work demand from another server
jtPROPOSAL_ut = 5, // A proposal from an untrusted source
jtLEDGER_DATA = 6, // Received data for a ledger we're acquiring
jtCLIENT = 7, // A websocket command from the client
jtTRANSACTION = 8, // A transaction received from the network
jtPUBLEDGER = 9, // Publish a fully-accepted ledger
jtWAL = 10, // Write-ahead logging
jtVALIDATION_t = 11, // A validation from a trusted source
jtWRITE = 12, // Write out hashed objects
jtTRANSACTION_l = 13, // A local transaction
jtPROPOSAL_t = 14, // A proposal from a trusted source
jtADMIN = 15, // An administrative operation
jtDEATH = 16, // job of death, used internally
jtUPDATE_PF = 7, // Update pathfinding requests
jtCLIENT = 8, // A websocket command from the client
jtTRANSACTION = 9, // A transaction received from the network
jtPUBLEDGER = 10, // Publish a fully-accepted ledger
jtWAL = 11, // Write-ahead logging
jtVALIDATION_t = 12, // A validation from a trusted source
jtWRITE = 13, // Write out hashed objects
jtTRANSACTION_l = 14, // A local transaction
jtPROPOSAL_t = 15, // A proposal from a trusted source
jtADMIN = 16, // An administrative operation
jtDEATH = 17, // job of death, used internally
// special types not dispatched by the job pool
jtPEER = 24,

View File

@@ -6,6 +6,7 @@
#include "Application.h"
#include "RippleAddress.h"
#include "Log.h"
#include "PFRequest.h"
SETUP_LOG();
@@ -633,6 +634,7 @@ void LedgerMaster::tryPublish()
void LedgerMaster::pubThread()
{
std::list<Ledger::pointer> ledgers;
bool published = false;
while (1)
{
@@ -644,6 +646,12 @@ void LedgerMaster::pubThread()
if (ledgers.empty())
{
mPubThread = false;
if (published && !mPathFindThread)
{
mPathFindThread = true;
theApp->getJobQueue().addJob(jtUPDATE_PF, "updatePaths",
BIND_TYPE(&LedgerMaster::updatePaths, this));
}
return;
}
}
@@ -653,8 +661,30 @@ void LedgerMaster::pubThread()
cLog(lsDEBUG) << "Publishing ledger " << l->getLedgerSeq();
setFullLedger(l); // OPTIMIZEME: This is actually more work than we need to do
theApp->getOPs().pubLedger(l);
published = true;
}
}
}
void LedgerMaster::updatePaths()
{
Ledger::pointer lastLedger;
do
{
{
boost::recursive_mutex::scoped_lock ml(mLock);
if (lastLedger.get() == mPubLedger.get())
{
mPathFindThread = false;
return;
}
lastLedger = mPubLedger;
}
PFRequest::updateAll(lastLedger);
} while(1);
}
// vim:ts=4

View File

@@ -45,6 +45,7 @@ protected:
std::list<Ledger::pointer> mPubLedgers; // List of ledgers to publish
bool mPubThread; // Publish thread is running
bool mPathFindThread; // Pathfind thread is running
void applyFutureTransactions(uint32 ledgerIndex);
bool isValidTransaction(Transaction::ref trans);
@@ -54,11 +55,12 @@ protected:
void asyncAccept(Ledger::pointer);
void missingAcquireComplete(LedgerAcquire::pointer);
void pubThread();
void updatePaths();
public:
LedgerMaster() : mHeldTransactions(uint256()), mMissingSeq(0),
mMinValidations(0), mLastValidateSeq(0), mPubThread(false)
mMinValidations(0), mLastValidateSeq(0), mPubThread(false), mPathFindThread(false)
{ ; }
uint32 getCurrentLedgerIndex();