Fix JobQueue

This commit is contained in:
Vinnie Falco
2014-01-08 13:05:29 -08:00
parent ce3358bdf8
commit 34fb12344c
5 changed files with 52 additions and 26 deletions

View File

@@ -95,12 +95,14 @@ public:
, m_rpcServiceManager (RPC::Manager::New ( , m_rpcServiceManager (RPC::Manager::New (
LogPartition::getJournal <RPCServiceManagerLog> ())) LogPartition::getJournal <RPCServiceManagerLog> ()))
, m_nodeStoreScheduler (*this)
// The JobQueue has to come pretty early since // The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue. // almost everything is a Stoppable child of the JobQueue.
// //
, m_jobQueue (JobQueue::New ( , m_jobQueue (JobQueue::New (
m_collectorManager->collector (), m_collectorManager->collector (),
*this, LogPartition::getJournal <JobQueueLog> ())) m_nodeStoreScheduler, LogPartition::getJournal <JobQueueLog> ()))
// The io_service must be a child of the JobQueue since we call addJob // The io_service must be a child of the JobQueue since we call addJob
// in response to newtwork data from peers and also client requests. // in response to newtwork data from peers and also client requests.
@@ -132,16 +134,13 @@ public:
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER #if ! RIPPLE_USE_RPC_SERVICE_MANAGER
, m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service , m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service
#endif #endif
, m_nodeStoreScheduler (*m_jobQueue, *m_jobQueue)
, m_nodeStore (NodeStore::Database::New ("NodeStore.main", m_nodeStoreScheduler, , m_nodeStore (NodeStore::Database::New ("NodeStore.main", m_nodeStoreScheduler,
LogPartition::getJournal <NodeObject> (), LogPartition::getJournal <NodeObject> (),
getConfig ().nodeDatabase, getConfig ().ephemeralNodeDatabase)) getConfig ().nodeDatabase, getConfig ().ephemeralNodeDatabase))
, m_sntpClient (SNTPClient::New (*this)) , m_sntpClient (SNTPClient::New (*this))
, m_inboundLedgers (InboundLedgers::New(*m_jobQueue)) , m_inboundLedgers (InboundLedgers::New (*m_jobQueue))
, m_txQueue (TxQueue::New ()) , m_txQueue (TxQueue::New ())
@@ -166,6 +165,9 @@ public:
, mShutdown (false) , mShutdown (false)
{ {
// VFALCO HACK
m_nodeStoreScheduler.setJobQueue (*m_jobQueue);
bassert (s_instance == nullptr); bassert (s_instance == nullptr);
s_instance = this; s_instance = this;
@@ -910,6 +912,7 @@ private:
std::unique_ptr <RPC::Manager> m_rpcServiceManager; std::unique_ptr <RPC::Manager> m_rpcServiceManager;
// These are Stoppable-related // These are Stoppable-related
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr <JobQueue> m_jobQueue; std::unique_ptr <JobQueue> m_jobQueue;
IoServicePool m_mainIoPool; IoServicePool m_mainIoPool;
std::unique_ptr <SiteFiles::Manager> m_siteFiles; std::unique_ptr <SiteFiles::Manager> m_siteFiles;
@@ -922,7 +925,6 @@ private:
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER #if ! RIPPLE_USE_RPC_SERVICE_MANAGER
RPCServerHandler m_rpcServerHandler; RPCServerHandler m_rpcServerHandler;
#endif #endif
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr <NodeStore::Database> m_nodeStore; std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <SNTPClient> m_sntpClient; std::unique_ptr <SNTPClient> m_sntpClient;
std::unique_ptr <InboundLedgers> m_inboundLedgers; std::unique_ptr <InboundLedgers> m_inboundLedgers;

View File

@@ -17,27 +17,32 @@
*/ */
//============================================================================== //==============================================================================
NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent, JobQueue& jobQueue) NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent)
: Stoppable ("NodeStoreScheduler", parent) : Stoppable ("NodeStoreScheduler", parent)
, m_jobQueue (jobQueue) , m_jobQueue (nullptr)
, m_taskCount (1) // start it off at 1 , m_taskCount (0)
{ {
} }
void NodeStoreScheduler::setJobQueue (JobQueue& jobQueue)
{
m_jobQueue = &jobQueue;
}
void NodeStoreScheduler::onStop () void NodeStoreScheduler::onStop ()
{ {
if (--m_taskCount == 0)
stopped();
} }
void NodeStoreScheduler::onChildrenStopped () void NodeStoreScheduler::onChildrenStopped ()
{ {
bassert (m_taskCount == 0);
stopped ();
} }
void NodeStoreScheduler::scheduleTask (NodeStore::Task& task) void NodeStoreScheduler::scheduleTask (NodeStore::Task& task)
{ {
++m_taskCount; ++m_taskCount;
m_jobQueue.addJob ( m_jobQueue->addJob (
jtWRITE, jtWRITE,
"NodeObject::store", "NodeObject::store",
BIND_TYPE (&NodeStoreScheduler::doTask, BIND_TYPE (&NodeStoreScheduler::doTask,

View File

@@ -26,7 +26,12 @@ class NodeStoreScheduler
, public Stoppable , public Stoppable
{ {
public: public:
NodeStoreScheduler (Stoppable& parent, JobQueue& jobQueue); NodeStoreScheduler (Stoppable& parent);
// VFALCO NOTE This is a temporary hack to solve the problem
// of circular dependency.
//
void setJobQueue (JobQueue& jobQueue);
void onStop (); void onStop ();
void onChildrenStopped (); void onChildrenStopped ();
@@ -35,7 +40,7 @@ public:
private: private:
void doTask (NodeStore::Task& task, Job&); void doTask (NodeStore::Task& task, Job&);
JobQueue& m_jobQueue; JobQueue* m_jobQueue;
std::atomic <int> m_taskCount; std::atomic <int> m_taskCount;
}; };

View File

@@ -54,13 +54,12 @@ enum JobType
// special types not dispatched by the job pool // special types not dispatched by the job pool
jtPEER = 30, jtPEER = 30,
jtDISK = 31, jtDISK = 31,
jtACCEPTLEDGER = 32, jtTXN_PROC = 32,
jtTXN_PROC = 33, jtOB_SETUP = 33,
jtOB_SETUP = 34, jtPATH_FIND = 34,
jtPATH_FIND = 35, jtHO_READ = 35,
jtHO_READ = 36, jtHO_WRITE = 36,
jtHO_WRITE = 37, jtGENERIC = 37, // Used just to measure time
jtGENERIC = 38, // Used just to measure time
}; // CAUTION: If you add new types, add them to Job.cpp too }; // CAUTION: If you add new types, add them to Job.cpp too
// VFALCO TODO move this into the enum so it calculates itself? // VFALCO TODO move this into the enum so it calculates itself?

View File

@@ -114,7 +114,6 @@ public:
m_loads [ jtCLIENT ].setTargetLatency (2000, 5000); m_loads [ jtCLIENT ].setTargetLatency (2000, 5000);
m_loads [ jtPEER ].setTargetLatency (200, 2500); m_loads [ jtPEER ].setTargetLatency (200, 2500);
m_loads [ jtDISK ].setTargetLatency (500, 1000); m_loads [ jtDISK ].setTargetLatency (500, 1000);
m_loads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500);
m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds
m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second
@@ -138,8 +137,24 @@ public:
// do not add jobs to a queue with no threads // do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
// If this goes off it means that a child didn't follow the Stoppable API rules. {
bassert (! isStopped() && ! areChildrenStopped()); // If this goes off it means that a child didn't follow
// the Stoppable API rules. A job may only be added if:
//
// - The JobQueue has NOT stopped
// AND
// * We are currently processing jobs
// OR
// * We have have pending jobs
// OR
// * Not all children are stopped
//
ScopedLock lock (m_mutex);
bassert (! isStopped() && (
m_processCount>0 ||
! m_jobSet.empty () ||
! areChildrenStopped()));
}
// Don't even add it to the queue if we're stopping // Don't even add it to the queue if we're stopping
// and the job type is marked for skipOnStop. // and the job type is marked for skipOnStop.
@@ -576,6 +591,7 @@ private:
case jtNETOP_CLUSTER: case jtNETOP_CLUSTER:
case jtNETOP_TIMER: case jtNETOP_TIMER:
case jtADMIN: case jtADMIN:
//case jtACCEPT:
return true; return true;
default: default:
@@ -601,14 +617,12 @@ private:
// These are not dispatched by JobQueue // These are not dispatched by JobQueue
case jtPEER: case jtPEER:
case jtDISK: case jtDISK:
case jtACCEPTLEDGER:
case jtTXN_PROC: case jtTXN_PROC:
case jtOB_SETUP: case jtOB_SETUP:
case jtPATH_FIND: case jtPATH_FIND:
case jtHO_READ: case jtHO_READ:
case jtHO_WRITE: case jtHO_WRITE:
case jtGENERIC: case jtGENERIC:
case jtACCEPT:
limit = 0; limit = 0;
break; break;
@@ -634,6 +648,7 @@ private:
case jtPROPOSAL_t: case jtPROPOSAL_t:
case jtSWEEP: case jtSWEEP:
case jtADMIN: case jtADMIN:
case jtACCEPT:
limit = std::numeric_limits <int>::max (); limit = std::numeric_limits <int>::max ();
break; break;