diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 569d4538e..bf5cbfe62 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -95,12 +95,14 @@ public: , m_rpcServiceManager (RPC::Manager::New ( LogPartition::getJournal ())) + , m_nodeStoreScheduler (*this) + // The JobQueue has to come pretty early since // almost everything is a Stoppable child of the JobQueue. // , m_jobQueue (JobQueue::New ( m_collectorManager->collector (), - *this, LogPartition::getJournal ())) + m_nodeStoreScheduler, LogPartition::getJournal ())) // The io_service must be a child of the JobQueue since we call addJob // in response to newtwork data from peers and also client requests. @@ -132,16 +134,13 @@ public: #if ! RIPPLE_USE_RPC_SERVICE_MANAGER , m_rpcServerHandler (*m_networkOPs, *m_resourceManager) // passive object, not a Service #endif - - , m_nodeStoreScheduler (*m_jobQueue, *m_jobQueue) - , m_nodeStore (NodeStore::Database::New ("NodeStore.main", m_nodeStoreScheduler, LogPartition::getJournal (), getConfig ().nodeDatabase, getConfig ().ephemeralNodeDatabase)) , m_sntpClient (SNTPClient::New (*this)) - , m_inboundLedgers (InboundLedgers::New(*m_jobQueue)) + , m_inboundLedgers (InboundLedgers::New (*m_jobQueue)) , m_txQueue (TxQueue::New ()) @@ -166,6 +165,9 @@ public: , mShutdown (false) { + // VFALCO HACK + m_nodeStoreScheduler.setJobQueue (*m_jobQueue); + bassert (s_instance == nullptr); s_instance = this; @@ -910,6 +912,7 @@ private: std::unique_ptr m_rpcServiceManager; // These are Stoppable-related + NodeStoreScheduler m_nodeStoreScheduler; std::unique_ptr m_jobQueue; IoServicePool m_mainIoPool; std::unique_ptr m_siteFiles; @@ -922,7 +925,6 @@ private: #if ! RIPPLE_USE_RPC_SERVICE_MANAGER RPCServerHandler m_rpcServerHandler; #endif - NodeStoreScheduler m_nodeStoreScheduler; std::unique_ptr m_nodeStore; std::unique_ptr m_sntpClient; std::unique_ptr m_inboundLedgers; diff --git a/src/ripple_app/main/NodeStoreScheduler.cpp b/src/ripple_app/main/NodeStoreScheduler.cpp index 3e18dc0dc..49ec6b32c 100644 --- a/src/ripple_app/main/NodeStoreScheduler.cpp +++ b/src/ripple_app/main/NodeStoreScheduler.cpp @@ -17,27 +17,32 @@ */ //============================================================================== -NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent, JobQueue& jobQueue) +NodeStoreScheduler::NodeStoreScheduler (Stoppable& parent) : Stoppable ("NodeStoreScheduler", parent) - , m_jobQueue (jobQueue) - , m_taskCount (1) // start it off at 1 + , m_jobQueue (nullptr) + , m_taskCount (0) { } +void NodeStoreScheduler::setJobQueue (JobQueue& jobQueue) +{ + m_jobQueue = &jobQueue; +} + void NodeStoreScheduler::onStop () { - if (--m_taskCount == 0) - stopped(); } void NodeStoreScheduler::onChildrenStopped () { + bassert (m_taskCount == 0); + stopped (); } void NodeStoreScheduler::scheduleTask (NodeStore::Task& task) { ++m_taskCount; - m_jobQueue.addJob ( + m_jobQueue->addJob ( jtWRITE, "NodeObject::store", BIND_TYPE (&NodeStoreScheduler::doTask, diff --git a/src/ripple_app/main/NodeStoreScheduler.h b/src/ripple_app/main/NodeStoreScheduler.h index 75a40ddd6..8a9701558 100644 --- a/src/ripple_app/main/NodeStoreScheduler.h +++ b/src/ripple_app/main/NodeStoreScheduler.h @@ -26,7 +26,12 @@ class NodeStoreScheduler , public Stoppable { public: - NodeStoreScheduler (Stoppable& parent, JobQueue& jobQueue); + NodeStoreScheduler (Stoppable& parent); + + // VFALCO NOTE This is a temporary hack to solve the problem + // of circular dependency. + // + void setJobQueue (JobQueue& jobQueue); void onStop (); void onChildrenStopped (); @@ -35,7 +40,7 @@ public: private: void doTask (NodeStore::Task& task, Job&); - JobQueue& m_jobQueue; + JobQueue* m_jobQueue; std::atomic m_taskCount; }; diff --git a/src/ripple_core/functional/Job.h b/src/ripple_core/functional/Job.h index b4c7ebb8e..3a9041602 100644 --- a/src/ripple_core/functional/Job.h +++ b/src/ripple_core/functional/Job.h @@ -54,13 +54,12 @@ enum JobType // special types not dispatched by the job pool jtPEER = 30, jtDISK = 31, - jtACCEPTLEDGER = 32, - jtTXN_PROC = 33, - jtOB_SETUP = 34, - jtPATH_FIND = 35, - jtHO_READ = 36, - jtHO_WRITE = 37, - jtGENERIC = 38, // Used just to measure time + jtTXN_PROC = 32, + jtOB_SETUP = 33, + jtPATH_FIND = 34, + jtHO_READ = 35, + jtHO_WRITE = 36, + jtGENERIC = 37, // Used just to measure time }; // CAUTION: If you add new types, add them to Job.cpp too // VFALCO TODO move this into the enum so it calculates itself? diff --git a/src/ripple_core/functional/JobQueue.cpp b/src/ripple_core/functional/JobQueue.cpp index 76de148e2..b00ea9506 100644 --- a/src/ripple_core/functional/JobQueue.cpp +++ b/src/ripple_core/functional/JobQueue.cpp @@ -114,7 +114,6 @@ public: m_loads [ jtCLIENT ].setTargetLatency (2000, 5000); m_loads [ jtPEER ].setTargetLatency (200, 2500); m_loads [ jtDISK ].setTargetLatency (500, 1000); - m_loads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500); m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second @@ -138,8 +137,24 @@ public: // do not add jobs to a queue with no threads bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); - // If this goes off it means that a child didn't follow the Stoppable API rules. - bassert (! isStopped() && ! areChildrenStopped()); + { + // If this goes off it means that a child didn't follow + // the Stoppable API rules. A job may only be added if: + // + // - The JobQueue has NOT stopped + // AND + // * We are currently processing jobs + // OR + // * We have have pending jobs + // OR + // * Not all children are stopped + // + ScopedLock lock (m_mutex); + bassert (! isStopped() && ( + m_processCount>0 || + ! m_jobSet.empty () || + ! areChildrenStopped())); + } // Don't even add it to the queue if we're stopping // and the job type is marked for skipOnStop. @@ -576,6 +591,7 @@ private: case jtNETOP_CLUSTER: case jtNETOP_TIMER: case jtADMIN: + //case jtACCEPT: return true; default: @@ -601,14 +617,12 @@ private: // These are not dispatched by JobQueue case jtPEER: case jtDISK: - case jtACCEPTLEDGER: case jtTXN_PROC: case jtOB_SETUP: case jtPATH_FIND: case jtHO_READ: case jtHO_WRITE: case jtGENERIC: - case jtACCEPT: limit = 0; break; @@ -634,6 +648,7 @@ private: case jtPROPOSAL_t: case jtSWEEP: case jtADMIN: + case jtACCEPT: limit = std::numeric_limits ::max (); break;