diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index ffec790d3d..6b01cf3a88 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -4,14 +4,37 @@ */ //============================================================================== +JobQueue::Count::Count () noexcept + : type (jtINVALID) + , waiting (0) + , running (0) + , deferred (0) +{ +} + +JobQueue::Count::Count (JobType type_) noexcept + : type (type_) + , waiting (0) + , running (0) + , deferred (0) +{ +} + +//------------------------------------------------------------------------------ + SETUP_LOG (JobQueue) JobQueue::JobQueue () : m_workers (*this, "JobQueue", 0) , mLastJob (0) { + // Initialize the job counts. + // The 'limit' field in particular will be set based on the limit for (int i = 0; i < NUM_JOB_TYPES; ++i) - mJobCounts[static_cast(i)] = std::make_pair(0, 0); + { + JobType const type (static_cast (i)); + mJobCounts [type] = Count (type); + } mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000); mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000); @@ -50,7 +73,7 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP it.first->peekEvent().start(); // start timing how long it stays in the queue - queueJobForRunning (*it.first, lock); + queueJob (*it.first, lock); } int JobQueue::getJobCount (JobType t) @@ -59,7 +82,7 @@ int JobQueue::getJobCount (JobType t) JobCounts::const_iterator c = mJobCounts.find (t); - return (c == mJobCounts.end ()) ? 0 : c->second.first; + return (c == mJobCounts.end ()) ? 0 : c->second.waiting; } int JobQueue::getJobCountTotal (JobType t) @@ -68,7 +91,7 @@ int JobQueue::getJobCountTotal (JobType t) JobCounts::const_iterator c = mJobCounts.find (t); - return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second); + return (c == mJobCounts.end ()) ? 0 : (c->second.waiting + c->second.running); } int JobQueue::getJobCountGE (JobType t) @@ -83,7 +106,7 @@ int JobQueue::getJobCountGE (JobType t) BOOST_FOREACH (jt_int_pair const& it, mJobCounts) { if (it.first >= t) - ret += it.second.first; + ret += it.second.waiting; } return ret; @@ -102,7 +125,8 @@ std::vector< std::pair > > JobQueue::getJobCounts ( BOOST_FOREACH (const jt_int_pair & it, mJobCounts) { - ret.push_back (it); + ret.push_back (std::make_pair (it.second.type, + std::make_pair (it.second.waiting, it.second.running))); } return ret; @@ -136,8 +160,8 @@ Json::Value JobQueue::getJson (int) } else { - jobCount = it->second.first; - threadCount = it->second.second; + jobCount = it->second.waiting; + threadCount = it->second.running; } if ((count != 0) || (jobCount != 0) || (latencyPeak != 0) || (threadCount != 0)) @@ -222,37 +246,6 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) m_workers.setNumberOfThreads (c); } -//------------------------------------------------------------------------------ -// -// Determines the number of free task slots for the given JobType. -// -// This can return a negative number. -// -// Pre-conditions: -// -// -// Post-conditions: -// -// -// Invariants: -// The calling thread owns the JobLock -// -int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&) -{ - int const limit = getJobLimit (type); - - if (limit != 0) - { - int waiting = mJobCounts [type].first; - int running = mJobCounts [type].second; - - return (limit - running) - waiting; - } - - // The actual number doesn't matter as long as its positive - return 1; -} - //------------------------------------------------------------------------------ // // Signals an added Job for processing. @@ -269,26 +262,28 @@ int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&) // Invariants: // The calling thread owns the JobLock // -void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock) +void JobQueue::queueJob (Job const& job, ScopedLockType& lock) { JobType const type (job.getType ()); check_precondition (type != jtINVALID); + check_precondition (mJobSet.find (job) != mJobSet.end ()); - if (freeTaskSlots (type, lock) > 0) + Count& count (mJobCounts [type]); + + ++count.waiting; + + if (count.waiting + count.running < getJobLimit (type)) { m_workers.addTask (); } else { - // Do nothing. - // When the next job of this type finishes, it will - // call addTask if there are more waiting than the limit + // defer the task until we go below the limit + // + ++count.deferred; } - - // This has to happen after the call to freeTaskSlots - ++mJobCounts [type].first; } //------------------------------------------------------------------------------ @@ -296,8 +291,7 @@ void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock) // Returns the next Job we should run now. // // RunnableJob: -// A Job with no limit for its JobType, or -// The number of running Jobs for its JobType is below the limit. +// A Job in the JobSet whose slots count for its type is greater than zero. // // Pre-conditions: // mJobSet must not be empty. @@ -312,17 +306,21 @@ void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock) // Invariants: // The calling thread owns the JobLock // -void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&) +void JobQueue::getNextJob (Job& job, ScopedLockType&) { check_precondition (! mJobSet.empty ()); JobSet::const_iterator iter; for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter) { - // Check the requirements for RunnableJob - if (getJobLimit (iter->getType ()) <= 0 || - (mJobCounts [iter->getType ()].second < getJobLimit (iter->getType ()))) + Count& count (mJobCounts [iter->getType ()]); + + bassert (count.running <= getJobLimit (count.type)); + + // Run this job if we're running below the limit. + if (count.running < getJobLimit (count.type)) { + bassert (count.waiting > 0); break; } } @@ -331,14 +329,16 @@ void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&) JobType const type = iter->getType (); + Count& count (mJobCounts [type]); + check_postcondition (type != JobType::jtINVALID); job = *iter; mJobSet.erase (iter); - --(mJobCounts [type].first); - ++(mJobCounts [type].second); + --count.waiting; + ++count.running; } //------------------------------------------------------------------------------ @@ -356,25 +356,29 @@ void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&) // Invariants: // // -void JobQueue::setRunningJobFinished (Job const& job) +void JobQueue::finishJob (Job const& job) { JobType const type = job.getType (); - ScopedLockType lock (mJobLock); - - check_precondition (mJobSet.find (job) == mJobSet.end ()); - check_precondition (type != JobType::jtINVALID); - - // If there were previously no free slots and we would - // free one up, and there are any other jobs of this type - // waiting, then go ahead and signal a task - // - if (freeTaskSlots (type, lock) == 0 && mJobCounts [type].first > 0) { - m_workers.addTask (); - } + ScopedLockType lock (mJobLock); - --(mJobCounts [type].second); + check_precondition (mJobSet.find (job) == mJobSet.end ()); + check_precondition (type != JobType::jtINVALID); + + Count& count (mJobCounts [type]); + + // Queue a deferred task + if (count.deferred > 0) + { + bassert (count.running + count.waiting >= getJobLimit (type)); + + --count.deferred; + m_workers.addTask (); + } + + --count.running; + } } //------------------------------------------------------------------------------ @@ -394,7 +398,11 @@ void JobQueue::processTask () { Job job; - getNextJobToRun (job, ScopedLockType (mJobLock)); + { + ScopedLockType lock (mJobLock); + + getNextJob (job, lock); + } JobType const type (job.getType ()); @@ -406,7 +414,7 @@ void JobQueue::processTask () job.doJob (); - setRunningJobFinished (job); + finishJob (job); // Note that when Job::~Job is called, the last reference // to the associated LoadEvent object (in the Job) may be destroyed. @@ -415,11 +423,12 @@ void JobQueue::processTask () //------------------------------------------------------------------------------ // Returns the limit of running jobs for the given job type. -// A value of zero means no limit. +// For jobs with no limit, we return the largest int. Hopefully that +// will be enough. // int JobQueue::getJobLimit (JobType type) { - int limit = 0; + int limit = std::numeric_limits ::max (); switch (type) { @@ -434,7 +443,6 @@ int JobQueue::getJobLimit (JobType type) case jtHO_WRITE: case jtGENERIC: default: - bassertfalse; limit = 0; break; @@ -452,7 +460,7 @@ int JobQueue::getJobLimit (JobType type) case jtPROPOSAL_t: case jtSWEEP: case jtADMIN: - limit = 0; + limit = std::numeric_limits ::max (); break; case jtLEDGER_DATA: limit = 2; break; diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index 5fabc21e15..f8f228fccb 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -10,7 +10,21 @@ class JobQueue : private Workers::Callback { public: - typedef std::map > JobCounts; + // Statistics on a particular JobType + struct Count + { + Count () noexcept; + explicit Count (JobType type) noexcept; + + JobType type; // The type of Job these counts reflect + int waiting; // The number waiting + int running; // How many are running + int deferred; // Number of jobs we didn't signal due to limits + }; + + typedef std::map JobCounts; + + //-------------------------------------------------------------------------- JobQueue (); @@ -22,11 +36,15 @@ public: void addJob (JobType type, const std::string& name, const FUNCTION_TYPE& job); int getJobCount (JobType t); // Jobs waiting at this priority + int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority + int getJobCountGE (JobType t); // All waiting jobs at or greater than this priority + std::vector< std::pair > > getJobCounts (); // jobs waiting, threads doing void shutdown (); + void setThreadCount (int c, bool const standaloneMode); // VFALCO TODO Rename these to newLoadEventMeasurement or something similar @@ -46,6 +64,7 @@ public: } bool isOverloaded (); + Json::Value getJson (int c = 0); private: @@ -53,10 +72,9 @@ private: typedef JobLockType::scoped_lock ScopedLockType; typedef std::set JobSet; - int freeTaskSlots (JobType type, ScopedLockType const&); - void queueJobForRunning (Job const& job, ScopedLockType const&); - void getNextJobToRun (Job& job, ScopedLockType const&); - void setRunningJobFinished (Job const& job); + void queueJob (Job const& job, ScopedLockType&); + void getNextJob (Job& job, ScopedLockType&); + void finishJob (Job const& job); void processTask ();