From 4a949a9cbdd7fa3e14a41a5f9019826e7f6e3638 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Mon, 12 Aug 2013 10:00:54 -0700 Subject: [PATCH] Refactor JobQueue to fix a bad state and clean up job limits --- modules/ripple_app/ledger/Ledger.cpp | 2 +- modules/ripple_app/peers/ripple_Peer.cpp | 4 +- modules/ripple_app/peers/ripple_PeerSet.cpp | 4 +- modules/ripple_core/functional/ripple_Job.cpp | 12 - modules/ripple_core/functional/ripple_Job.h | 5 - .../functional/ripple_JobQueue.cpp | 315 +++++++++++++----- .../ripple_core/functional/ripple_JobQueue.h | 18 +- 7 files changed, 253 insertions(+), 107 deletions(-) diff --git a/modules/ripple_app/ledger/Ledger.cpp b/modules/ripple_app/ledger/Ledger.cpp index 6770b22919..72db0fdede 100644 --- a/modules/ripple_app/ledger/Ledger.cpp +++ b/modules/ripple_app/ledger/Ledger.cpp @@ -1839,7 +1839,7 @@ bool Ledger::pendSaveValidated (bool isSynchronous, bool isCurrent) } else { - getApp().getJobQueue ().addLimitJob (jtPUBOLDLEDGER, "Ledger::pendOldSave", 2, + getApp().getJobQueue ().addJob (jtPUBOLDLEDGER, "Ledger::pendOldSave", BIND_TYPE (&Ledger::saveValidatedLedgerAsync, shared_from_this (), P_1, isCurrent)); } diff --git a/modules/ripple_app/peers/ripple_Peer.cpp b/modules/ripple_app/peers/ripple_Peer.cpp index 9337c828ac..91d01fbf4c 100644 --- a/modules/ripple_app/peers/ripple_Peer.cpp +++ b/modules/ripple_app/peers/ripple_Peer.cpp @@ -2160,7 +2160,7 @@ void PeerImp::recvLedger (const boost::shared_ptr& packe } if (getApp().getInboundLedgers ().awaitLedgerData (hash)) - getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "gotLedgerData", 2, + getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotLedgerData", BIND_TYPE (&InboundLedgers::gotLedgerData, &getApp().getInboundLedgers (), P_1, hash, packet_ptr, boost::weak_ptr (shared_from_this ()))); else @@ -2399,7 +2399,7 @@ void PeerImp::doFetchPack (const boost::shared_ptr& return; } - getApp().getJobQueue ().addLimitJob (jtPACK, "MakeFetchPack", 1, + getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", BIND_TYPE (&NetworkOPs::makeFetchPack, &getApp().getOPs (), P_1, boost::weak_ptr (shared_from_this ()), packet, wantLedger, haveLedger, UptimeTimer::getInstance ().getElapsedSeconds ())); } diff --git a/modules/ripple_app/peers/ripple_PeerSet.cpp b/modules/ripple_app/peers/ripple_PeerSet.cpp index af153ab666..9ef897fa77 100644 --- a/modules/ripple_app/peers/ripple_PeerSet.cpp +++ b/modules/ripple_app/peers/ripple_PeerSet.cpp @@ -77,7 +77,7 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er { if (ptr->mTxnData) { - getApp().getJobQueue ().addLimitJob (jtTXN_DATA, "timerEntry", 2, + getApp().getJobQueue ().addJob (jtTXN_DATA, "timerEntry", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } else @@ -90,7 +90,7 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er ptr->setTimer (); } else - getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "timerEntry", 2, + getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntry", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } } diff --git a/modules/ripple_core/functional/ripple_Job.cpp b/modules/ripple_core/functional/ripple_Job.cpp index 370e6b6730..c6d8d7df75 100644 --- a/modules/ripple_core/functional/ripple_Job.cpp +++ b/modules/ripple_core/functional/ripple_Job.cpp @@ -7,20 +7,17 @@ Job::Job () : mType (jtINVALID) , mJobIndex (0) - , m_limit (0) { } Job::Job (JobType type, uint64 index) : mType (type) , mJobIndex (index) - , m_limit (0) { } Job::Job (JobType type, std::string const& name, - int limit, uint64 index, LoadMonitor& lm, FUNCTION_TYPE const& job) @@ -28,7 +25,6 @@ Job::Job (JobType type, , mJobIndex (index) , mJob (job) , mName (name) - , m_limit(limit) { m_loadEvent = boost::make_shared (boost::ref (lm), name, false); } @@ -50,11 +46,6 @@ void Job::rename (std::string const& newName) mName = newName; } -int Job::getLimit () const -{ - return m_limit; -} - LoadEvent& Job::peekEvent() const { return *m_loadEvent; @@ -115,9 +106,6 @@ const char* Job::toString (JobType t) case jtADMIN: return "administration"; - case jtDEATH: - return "jobOfDeath"; - case jtPEER: return "peerCommand"; diff --git a/modules/ripple_core/functional/ripple_Job.h b/modules/ripple_core/functional/ripple_Job.h index 1e97ee4ee0..5e77253728 100644 --- a/modules/ripple_core/functional/ripple_Job.h +++ b/modules/ripple_core/functional/ripple_Job.h @@ -31,7 +31,6 @@ enum JobType jtPROPOSAL_t = 16, // A proposal from a trusted source jtSWEEP = 17, // Sweep for stale structures jtADMIN = 18, // An administrative operation - jtDEATH = 19, // job of death, used internally // special types not dispatched by the job pool jtPEER = 24, @@ -69,7 +68,6 @@ public: // VFALCO TODO try to remove the dependency on LoadMonitor. Job (JobType type, std::string const& name, - int limit, uint64 index, LoadMonitor& lm, FUNCTION_TYPE const& job); @@ -80,8 +78,6 @@ public: void rename (const std::string& n); - int getLimit () const; - LoadEvent& peekEvent() const; // These comparison operators make the jobs sort in priority order in the job set @@ -98,7 +94,6 @@ private: FUNCTION_TYPE mJob; LoadEvent::pointer m_loadEvent; std::string mName; - int m_limit; }; #endif diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index 7dd87f3b4d..ffec790d3d 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -36,44 +36,37 @@ JobQueue::~JobQueue () } void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc) -{ - addLimitJob (type, name, 0, jobFunc); -} - -void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& jobFunc) { assert (type != jtINVALID); - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); // FIXME: Workaround incorrect client shutdown ordering // do not add jobs to a queue with no threads bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); std::pair< std::set ::iterator, bool > it = - mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); + mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc)); it.first->peekEvent().start(); // start timing how long it stays in the queue - ++mJobCounts[type].first; - - m_workers.addTask (); + queueJobForRunning (*it.first, lock); } int JobQueue::getJobCount (JobType t) { - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); - JobCounts::iterator c = mJobCounts.find (t); + JobCounts::const_iterator c = mJobCounts.find (t); return (c == mJobCounts.end ()) ? 0 : c->second.first; } int JobQueue::getJobCountTotal (JobType t) { - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); - JobCounts::iterator c = mJobCounts.find (t); + JobCounts::const_iterator c = mJobCounts.find (t); return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second); } @@ -83,7 +76,7 @@ int JobQueue::getJobCountGE (JobType t) // return the number of jobs at this priority level or greater int ret = 0; - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); typedef JobCounts::value_type jt_int_pair; @@ -101,7 +94,7 @@ std::vector< std::pair > > JobQueue::getJobCounts ( // return all jobs at all priority levels std::vector< std::pair > > ret; - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); ret.reserve (mJobCounts.size ()); @@ -119,7 +112,7 @@ Json::Value JobQueue::getJson (int) { Json::Value ret (Json::objectValue); - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); ret["threads"] = m_workers.getNumberOfThreads (); @@ -134,7 +127,7 @@ Json::Value JobQueue::getJson (int) int jobCount, threadCount; bool isOver; mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver); - JobCounts::iterator it = mJobCounts.find (static_cast (i)); + JobCounts::const_iterator it = mJobCounts.find (static_cast (i)); if (it == mJobCounts.end ()) { @@ -184,7 +177,7 @@ bool JobQueue::isOverloaded () { int count = 0; - boost::mutex::scoped_lock sl (mJobLock); + ScopedLockType lock (mJobLock); for (int i = 0; i < NUM_JOB_TYPES; ++i) if (mJobLoads[i].isOver ()) @@ -229,82 +222,244 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) m_workers.setNumberOfThreads (c); } -bool JobQueue::getJob(Job& job) +//------------------------------------------------------------------------------ +// +// Determines the number of free task slots for the given JobType. +// +// This can return a negative number. +// +// Pre-conditions: +// +// +// Post-conditions: +// +// +// Invariants: +// The calling thread owns the JobLock +// +int JobQueue::freeTaskSlots (JobType type, ScopedLockType const&) { - bool gotJob = false; - - if (! mJobSet.empty ()) + int const limit = getJobLimit (type); + + if (limit != 0) { - std::set::iterator it = mJobSet.begin (); + int waiting = mJobCounts [type].first; + int running = mJobCounts [type].second; - for (;;) - { - // Are we out of jobs? - if (it == mJobSet.end()) - return false; // VFALCO TODO get rid of this return from the middle - - // Does this job have no limit? - if (it->getLimit() == 0) - break; - - // Is this job category below the limit? - if (mJobCounts[it->getType()].second < it->getLimit()) - break; - - // Try the next job, if any - ++it; - } - - job = *it; - mJobSet.erase (it); - - gotJob = true; + return (limit - running) - waiting; } - return gotJob; + // The actual number doesn't matter as long as its positive + return 1; } +//------------------------------------------------------------------------------ +// +// Signals an added Job for processing. +// +// Pre-conditions: +// The JobType must be valid. +// The Job must exist in mJobSet. +// The Job must not have previously been queued. +// +// Post-conditions: +// Count of waiting jobs of that type will be incremented. +// If JobQueue exists, and has at least one thread, Job will eventually run. +// +// Invariants: +// The calling thread owns the JobLock +// +void JobQueue::queueJobForRunning (Job const& job, ScopedLockType const& lock) +{ + JobType const type (job.getType ()); + + check_precondition (type != jtINVALID); + check_precondition (mJobSet.find (job) != mJobSet.end ()); + + if (freeTaskSlots (type, lock) > 0) + { + m_workers.addTask (); + } + else + { + // Do nothing. + // When the next job of this type finishes, it will + // call addTask if there are more waiting than the limit + } + + // This has to happen after the call to freeTaskSlots + ++mJobCounts [type].first; +} + +//------------------------------------------------------------------------------ +// +// Returns the next Job we should run now. +// +// RunnableJob: +// A Job with no limit for its JobType, or +// The number of running Jobs for its JobType is below the limit. +// +// Pre-conditions: +// mJobSet must not be empty. +// mJobSet holds at least one RunnableJob +// +// Post-conditions: +// job is a valid Job object. +// job is removed from mJobQueue. +// Waiting job count of it's type is decremented +// Running job count of it's type is incremented +// +// Invariants: +// The calling thread owns the JobLock +// +void JobQueue::getNextJobToRun (Job& job, ScopedLockType const&) +{ + check_precondition (! mJobSet.empty ()); + + JobSet::const_iterator iter; + for (iter = mJobSet.begin (); iter != mJobSet.end (); ++iter) + { + // Check the requirements for RunnableJob + if (getJobLimit (iter->getType ()) <= 0 || + (mJobCounts [iter->getType ()].second < getJobLimit (iter->getType ()))) + { + break; + } + } + + check_precondition (iter != mJobSet.end ()); + + JobType const type = iter->getType (); + + check_postcondition (type != JobType::jtINVALID); + + job = *iter; + + mJobSet.erase (iter); + + --(mJobCounts [type].first); + ++(mJobCounts [type].second); +} + +//------------------------------------------------------------------------------ +// +// Indicates that a running Job has completed its task. +// +// Pre-conditions: +// Job must not exist in mJobSet. +// The JobType must not be invalid. +// +// Post-conditions: +// The running count of that JobType is decremented +// A new task is signaled if there are more waiting Jobs than the limit, if any. +// +// Invariants: +// +// +void JobQueue::setRunningJobFinished (Job const& job) +{ + JobType const type = job.getType (); + + ScopedLockType lock (mJobLock); + + check_precondition (mJobSet.find (job) == mJobSet.end ()); + check_precondition (type != JobType::jtINVALID); + + // If there were previously no free slots and we would + // free one up, and there are any other jobs of this type + // waiting, then go ahead and signal a task + // + if (freeTaskSlots (type, lock) == 0 && mJobCounts [type].first > 0) + { + m_workers.addTask (); + } + + --(mJobCounts [type].second); +} + +//------------------------------------------------------------------------------ +// +// Runs the next appropriate waiting Job. +// +// Pre-conditions: +// A RunnableJob must exist in the JobSet +// +// Post-conditions: +// The chosen RunnableJob will have Job::doJob() called. +// +// Invariants: +// +// void JobQueue::processTask () { - boost::mutex::scoped_lock lock (mJobLock); + Job job; - while (1) - { - // This lock shouldn't be needed + getNextJobToRun (job, ScopedLockType (mJobLock)); - JobType type (jtINVALID); + JobType const type (job.getType ()); - { - Job job; + String const name (Job::toString (type)); - bool const haveJob = getJob (job); + Thread::setCurrentThreadName (name); - if (haveJob) - { - type = job.getType (); + WriteLog (lsTRACE, JobQueue) << "Doing " << name << " job"; - // VFALCO TODO Replace with Atomic <> - --(mJobCounts[type].first); - ++(mJobCounts[type].second); + job.doJob (); - lock.unlock (); + setRunningJobFinished (job); - Thread::setCurrentThreadName (Job::toString (type)); - - WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job"; - - job.doJob (); - } - else - return; - - // must destroy job, here, without holding lock - } - - if (type != jtINVALID) - { - lock.lock (); - -- (mJobCounts[type].second); - } - } + // Note that when Job::~Job is called, the last reference + // to the associated LoadEvent object (in the Job) may be destroyed. +} + +//------------------------------------------------------------------------------ + +// Returns the limit of running jobs for the given job type. +// A value of zero means no limit. +// +int JobQueue::getJobLimit (JobType type) +{ + int limit = 0; + + switch (type) + { + // These are not dispatched by JobQueue + case jtPEER: + case jtDISK: + case jtACCEPTLEDGER: + case jtTXN_PROC: + case jtOB_SETUP: + case jtPATH_FIND: + case jtHO_READ: + case jtHO_WRITE: + case jtGENERIC: + default: + bassertfalse; + limit = 0; + break; + + case jtVALIDATION_ut: + case jtPROOFWORK: + case jtTRANSACTION_l: + case jtPROPOSAL_ut: + case jtUPDATE_PF: + case jtCLIENT: + case jtTRANSACTION: + case jtPUBLEDGER: + case jtWAL: + case jtVALIDATION_t: + case jtWRITE: + case jtPROPOSAL_t: + case jtSWEEP: + case jtADMIN: + limit = 0; + break; + + case jtLEDGER_DATA: limit = 2; break; + case jtPACK: limit = 1; break; + case jtPUBOLDLEDGER: limit = 2; break; + case jtTXN_DATA: limit = 1; break; + }; + + return limit; } diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index eb258d3308..5fabc21e15 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -20,7 +20,6 @@ public: // have to call bind. // void addJob (JobType type, const std::string& name, const FUNCTION_TYPE& job); - void addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& job); int getJobCount (JobType t); // Jobs waiting at this priority int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority @@ -50,15 +49,24 @@ public: Json::Value getJson (int c = 0); private: - bool getJob (Job& job); + typedef boost::mutex JobLockType; + typedef JobLockType::scoped_lock ScopedLockType; + typedef std::set JobSet; + + int freeTaskSlots (JobType type, ScopedLockType const&); + void queueJobForRunning (Job const& job, ScopedLockType const&); + void getNextJobToRun (Job& job, ScopedLockType const&); + void setRunningJobFinished (Job const& job); + void processTask (); + static int getJobLimit (JobType type); + private: Workers m_workers; - - boost::mutex mJobLock; // VFALCO TODO Replace with CriticalSection + JobLockType mJobLock; uint64 mLastJob; - std::set mJobSet; + JobSet mJobSet; LoadMonitor mJobLoads [NUM_JOB_TYPES]; JobCounts mJobCounts; };