diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index 627abeecbf..508a8d66c0 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -4,6 +4,10 @@ */ //============================================================================== +SETUP_LOG (JobQueue) + +//------------------------------------------------------------------------------ + JobQueue::Count::Count () noexcept : type (jtINVALID) , waiting (0) @@ -22,19 +26,26 @@ JobQueue::Count::Count (JobType type_) noexcept //------------------------------------------------------------------------------ -SETUP_LOG (JobQueue) +JobQueue::State::State () + : lastJob (0) +{ +} + +//------------------------------------------------------------------------------ JobQueue::JobQueue () - : mLock (this, "JobQueue", __FILE__, __LINE__) - , m_workers (*this, "JobQueue", 0) - , mLastJob (0) + : m_workers (*this, "JobQueue", 0) { - // Initialize the job counts. - // The 'limit' field in particular will be set based on the limit - for (int i = 0; i < NUM_JOB_TYPES; ++i) { - JobType const type (static_cast (i)); - mJobCounts [type] = Count (type); + SharedState::WriteAccess state (m_state); + + // Initialize the job counts. + // The 'limit' field in particular will be set based on the limit + for (int i = 0; i < NUM_JOB_TYPES; ++i) + { + JobType const type (static_cast (i)); + state->jobCounts [type] = Count (type); + } } mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000); @@ -53,6 +64,9 @@ JobQueue::JobQueue () mJobLoads [ jtPEER ].setTargetLatency (200, 2500); mJobLoads [ jtDISK ].setTargetLatency (500, 1000); mJobLoads [ jtACCEPTLEDGER ].setTargetLatency (1000, 2500); + + mJobLoads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds + mJobLoads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second } JobQueue::~JobQueue () @@ -61,38 +75,40 @@ JobQueue::~JobQueue () void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc) { - assert (type != jtINVALID); - - ScopedLockType lock (mLock, __FILE__, __LINE__); + bassert (type != jtINVALID); // FIXME: Workaround incorrect client shutdown ordering // do not add jobs to a queue with no threads bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); + SharedState::WriteAccess state (m_state); + std::pair< std::set ::iterator, bool > it = - mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc)); + state->jobSet.insert (Job ( + type, name, ++state->lastJob, mJobLoads[type], jobFunc)); - it.first->peekEvent().start(); // start timing how long it stays in the queue + // start timing how long it stays in the queue + it.first->peekEvent().start(); - queueJob (*it.first, lock); + queueJob (*it.first, state); } int JobQueue::getJobCount (JobType t) { - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::ReadAccess state (m_state); - JobCounts::const_iterator c = mJobCounts.find (t); + JobCounts::const_iterator c = state->jobCounts.find (t); - return (c == mJobCounts.end ()) ? 0 : c->second.waiting; + return (c == state->jobCounts.end ()) ? 0 : c->second.waiting; } int JobQueue::getJobCountTotal (JobType t) { - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::ReadAccess state (m_state); - JobCounts::const_iterator c = mJobCounts.find (t); + JobCounts::const_iterator c = state->jobCounts.find (t); - return (c == mJobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); + return (c == state->jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); } int JobQueue::getJobCountGE (JobType t) @@ -100,11 +116,11 @@ int JobQueue::getJobCountGE (JobType t) // return the number of jobs at this priority level or greater int ret = 0; - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::ReadAccess state (m_state); typedef JobCounts::value_type jt_int_pair; - BOOST_FOREACH (jt_int_pair const& it, mJobCounts) + BOOST_FOREACH (jt_int_pair const& it, state->jobCounts) { if (it.first >= t) ret += it.second.waiting; @@ -118,13 +134,13 @@ std::vector< std::pair > > JobQueue::getJobCounts ( // return all jobs at all priority levels std::vector< std::pair > > ret; - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::ReadAccess state (m_state); - ret.reserve (mJobCounts.size ()); + ret.reserve (state->jobCounts.size ()); typedef JobCounts::value_type jt_int_pair; - BOOST_FOREACH (const jt_int_pair & it, mJobCounts) + BOOST_FOREACH (const jt_int_pair & it, state->jobCounts) { ret.push_back (std::make_pair (it.second.type, std::make_pair (it.second.waiting, it.second.running))); @@ -137,24 +153,31 @@ Json::Value JobQueue::getJson (int) { Json::Value ret (Json::objectValue); - ScopedLockType lock (mLock, __FILE__, __LINE__); - ret["threads"] = m_workers.getNumberOfThreads (); Json::Value priorities = Json::arrayValue; + SharedState::ReadAccess state (m_state); + for (int i = 0; i < NUM_JOB_TYPES; ++i) { - if (static_cast(i) == jtGENERIC) + JobType const type (static_cast (i)); + + if (type == jtGENERIC) continue; - uint64 count, latencyAvg, latencyPeak; - int jobCount, threadCount; + uint64 count; + uint64 latencyAvg; + uint64 latencyPeak; + int jobCount; + int threadCount; bool isOver; - mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver); - JobCounts::const_iterator it = mJobCounts.find (static_cast (i)); - if (it == mJobCounts.end ()) + mJobLoads [i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver); + + JobCounts::const_iterator it = state->jobCounts.find (type); + + if (it == state->jobCounts.end ()) { jobCount = 0; threadCount = 0; @@ -172,7 +195,7 @@ Json::Value JobQueue::getJson (int) if (isOver) pri["over_target"] = true; - pri["job_type"] = Job::toString (static_cast (i)); + pri["job_type"] = Job::toString (type); if (jobCount != 0) pri["waiting"] = jobCount; @@ -202,8 +225,6 @@ bool JobQueue::isOverloaded () { int count = 0; - ScopedLockType lock (mLock, __FILE__, __LINE__); - for (int i = 0; i < NUM_JOB_TYPES; ++i) if (mJobLoads[i].isOver ()) ++count; @@ -263,15 +284,14 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) // Invariants: // The calling thread owns the JobLock // -void JobQueue::queueJob (Job const& job, ScopedLockType& lock) +void JobQueue::queueJob (Job const& job, SharedState::WriteAccess& state) { JobType const type (job.getType ()); - check_precondition (type != jtINVALID); + bassert (type != jtINVALID); + bassert (state->jobSet.find (job) != state->jobSet.end ()); - check_precondition (mJobSet.find (job) != mJobSet.end ()); - - Count& count (mJobCounts [type]); + Count& count (state->jobCounts [type]); if (count.waiting + count.running < getJobLimit (type)) { @@ -306,14 +326,14 @@ void JobQueue::queueJob (Job const& job, ScopedLockType& lock) // Invariants: // The calling thread owns the JobLock // -void JobQueue::getNextJob (Job& job, ScopedLockType&) +void JobQueue::getNextJob (Job& job, SharedState::WriteAccess& state) { - check_precondition (! mJobSet.empty ()); + bassert (! state->jobSet.empty ()); JobSet::const_iterator iter; - for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter) + for (iter = state->jobSet.begin (); iter != state->jobSet.end (); ++iter) { - Count& count (mJobCounts [iter->getType ()]); + Count& count (state->jobCounts [iter->getType ()]); bassert (count.running <= getJobLimit (count.type)); @@ -325,17 +345,17 @@ void JobQueue::getNextJob (Job& job, ScopedLockType&) } } - check_precondition (iter != mJobSet.end ()); + bassert (iter != state->jobSet.end ()); JobType const type = iter->getType (); - Count& count (mJobCounts [type]); + Count& count (state->jobCounts [type]); - check_postcondition (type != jtINVALID); + bassert (type != jtINVALID); job = *iter; - mJobSet.erase (iter); + state->jobSet.erase (iter); --count.waiting; ++count.running; @@ -361,14 +381,14 @@ void JobQueue::finishJob (Job const& job) JobType const type = job.getType (); { - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::WriteAccess state (m_state); - check_precondition (mJobSet.find (job) == mJobSet.end ()); - check_precondition (type != jtINVALID); + bassert (state->jobSet.find (job) == state->jobSet.end ()); + bassert (type != jtINVALID); - Count& count (mJobCounts [type]); + Count& count (state->jobCounts [type]); - // Queue a deferred task + // Queue a deferred task if possible if (count.deferred > 0) { bassert (count.running + count.waiting >= getJobLimit (type)); @@ -399,9 +419,9 @@ void JobQueue::processTask () Job job; { - ScopedLockType lock (mLock, __FILE__, __LINE__); + SharedState::WriteAccess state (m_state); - getNextJob (job, lock); + getNextJob (job, state); } JobType const type (job.getType ()); @@ -442,10 +462,14 @@ int JobQueue::getJobLimit (JobType type) case jtHO_READ: case jtHO_WRITE: case jtGENERIC: - default: limit = 0; break; + default: + // Someone added a JobType but forgot to set a limit. + // Did they also forget to add it to ripple_Job.cpp? + bassertfalse; + case jtVALIDATION_ut: case jtPROOFWORK: case jtTRANSACTION_l: @@ -468,6 +492,15 @@ int JobQueue::getJobLimit (JobType type) case jtPACK: limit = 1; break; case jtPUBOLDLEDGER: limit = 2; break; case jtTXN_DATA: limit = 1; break; + + // If either of the next two are processing so slowly + // or we are so busy we have two of them at once, it + // indicates a serious problem! + // + case jtNETOP_TIMER: + case jtNETOP_CLUSTER: + limit = 1; + break; }; return limit; diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index 69b581e92f..a80aa66ae6 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -68,26 +68,30 @@ public: Json::Value getJson (int c = 0); private: - typedef RippleMutex LockType; - typedef LockType::ScopedLockType ScopedLockType; - LockType mLock; - typedef std::set JobSet; - void queueJob (Job const& job, ScopedLockType&); - void getNextJob (Job& job, ScopedLockType&); - void finishJob (Job const& job); + struct State + { + State (); + uint64 lastJob; + JobSet jobSet; + JobCounts jobCounts; + }; + + typedef SharedData SharedState; + + void queueJob (Job const& job, SharedState::WriteAccess& state); + void getNextJob (Job& job, SharedState::WriteAccess& state); + void finishJob (Job const& job); void processTask (); static int getJobLimit (JobType type); private: + SharedState m_state; Workers m_workers; - uint64 mLastJob; - JobSet mJobSet; LoadMonitor mJobLoads [NUM_JOB_TYPES]; - JobCounts mJobCounts; }; #endif