From b2b037771768cee3755c52414cf5f9cd0047565a Mon Sep 17 00:00:00 2001 From: Tom Ritchford Date: Mon, 1 Jun 2015 18:35:57 -0400 Subject: [PATCH] New method JobQueue::getJobForThread(). * Clean up mutability. * Add override specifiers. * Get rid of unused parameters. --- src/ripple/core/JobQueue.h | 22 ++++++++---- src/ripple/core/impl/JobQueue.cpp | 56 +++++++++++++++++++++---------- 2 files changed, 53 insertions(+), 25 deletions(-) diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index 648b8fd028..9cb21db60b 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -25,6 +25,8 @@ #include #include #include +#include +#include namespace ripple { @@ -45,13 +47,13 @@ public: std::string const& name, boost::function const& job) = 0; // Jobs waiting at this priority - virtual int getJobCount (JobType t) = 0; + virtual int getJobCount (JobType t) const = 0; // Jobs waiting plus running at this priority - virtual int getJobCountTotal (JobType t) = 0; + virtual int getJobCountTotal (JobType t) const = 0; // All waiting jobs at or greater than this priority - virtual int getJobCountGE (JobType t) = 0; + virtual int getJobCountGE (JobType t) const = 0; virtual void shutdown () = 0; @@ -60,19 +62,25 @@ public: // VFALCO TODO Rename these to newLoadEventMeasurement or something similar // since they create the object. // - virtual LoadEvent::pointer getLoadEvent (JobType t, std::string const& name) = 0; + virtual LoadEvent::pointer getLoadEvent ( + JobType t, std::string const& name) = 0; // VFALCO TODO Why do we need two versions, one which returns a shared // pointer and the other which returns an autoptr? // - virtual LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name) = 0; + virtual LoadEvent::autoptr getLoadEventAP ( + JobType t, std::string const& name) = 0; // Add multiple load events - virtual void addLoadEvents (JobType t, - int count, std::chrono::milliseconds elapsed) = 0; + virtual void addLoadEvents ( + JobType t, int count, std::chrono::milliseconds elapsed) = 0; virtual bool isOverloaded () = 0; + /** Get the Job corresponding to a thread. If no thread, use the current + thread. */ + virtual Job* getJobForThread (std::thread::id const& id = {}) const = 0; + virtual Json::Value getJson (int c = 0) = 0; }; diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index 7b9d6b135d..9cdefc77e1 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -40,14 +40,17 @@ public: using JobSet = std::set ; using JobDataMap = std::map ; using ScopedLock = std::lock_guard ; + using ThreadIdMap = std::map ; beast::Journal m_journal; - std::mutex m_mutex; + mutable std::mutex m_mutex; std::uint64_t m_lastJob; JobSet m_jobSet; JobDataMap m_jobData; JobTypeData m_invalidJobData; + ThreadIdMap m_threadIds; + // The number of jobs currently in processTask() int m_processCount; @@ -100,7 +103,7 @@ public: } } - ~JobQueueImp () + ~JobQueueImp () override { // Must unhook before destroying hook = beast::insight::Hook (); @@ -113,7 +116,7 @@ public: } void addJob (JobType type, std::string const& name, - boost::function const& jobFunc) + boost::function const& jobFunc) override { assert (type != jtINVALID); @@ -168,7 +171,7 @@ public: } } - int getJobCount (JobType t) + int getJobCount (JobType t) const override { ScopedLock lock (m_mutex); @@ -179,7 +182,7 @@ public: : c->second.waiting; } - int getJobCountTotal (JobType t) + int getJobCountTotal (JobType t) const override { ScopedLock lock (m_mutex); @@ -190,7 +193,7 @@ public: : (c->second.waiting + c->second.running); } - int getJobCountGE (JobType t) + int getJobCountGE (JobType t) const override { // return the number of jobs at this priority level or greater int ret = 0; @@ -208,7 +211,7 @@ public: // shut down the job queue without completing pending jobs // - void shutdown () + void shutdown () override { m_journal.info << "Job queue shutting down"; @@ -216,7 +219,7 @@ public: } // set the number of thread serving the job queue to precisely this number - void setThreadCount (int c, bool const standaloneMode) + void setThreadCount (int c, bool const standaloneMode) override { if (standaloneMode) { @@ -234,8 +237,8 @@ public: m_workers.setNumberOfThreads (c); } - - LoadEvent::pointer getLoadEvent (JobType t, std::string const& name) + LoadEvent::pointer getLoadEvent ( + JobType t, std::string const& name) override { JobDataMap::iterator iter (m_jobData.find (t)); assert (iter != m_jobData.end ()); @@ -247,7 +250,8 @@ public: std::ref (iter-> second.load ()), name, true); } - LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name) + LoadEvent::autoptr getLoadEventAP ( + JobType t, std::string const& name) override { JobDataMap::iterator iter (m_jobData.find (t)); assert (iter != m_jobData.end ()); @@ -260,14 +264,15 @@ public: } void addLoadEvents (JobType t, - int count, std::chrono::milliseconds elapsed) + int count, std::chrono::milliseconds elapsed) override { JobDataMap::iterator iter (m_jobData.find (t)); assert (iter != m_jobData.end ()); iter->second.load().addSamples (count, elapsed); } - bool isOverloaded () + // Cannot be const because LoadMonitor has no const methods. + bool isOverloaded () override { int count = 0; @@ -280,7 +285,8 @@ public: return count > 0; } - Json::Value getJson (int) + // Cannot be const because LoadMonitor has no const methods. + Json::Value getJson (int) override { Json::Value ret (Json::objectValue); @@ -336,6 +342,14 @@ public: return ret; } + Job* getJobForThread (std::thread::id const& id) const override + { + auto tid = (id == std::thread::id()) ? std::this_thread::get_id() : id; + + auto i = m_threadIds.find (tid); + return (i == m_threadIds.end()) ? nullptr : i->second; + } + private: //-------------------------------------------------------------------------- JobTypeData& getJobTypeData (JobType type) @@ -430,7 +444,7 @@ private: // Invariants: // The calling thread owns the JobLock // - void getNextJob (Job& job, ScopedLock const& lock) + void getNextJob (Job& job) { assert (! m_jobSet.empty ()); @@ -459,6 +473,8 @@ private: job = *iter; m_jobSet.erase (iter); + m_threadIds[std::this_thread::get_id()] = &job; + --data.waiting; ++data.running; } @@ -478,7 +494,7 @@ private: // Invariants: // // - void finishJob (Job const& job, ScopedLock const& lock) + void finishJob (Job const& job) { JobType const type = job.getType (); @@ -496,6 +512,10 @@ private: m_workers.addTask (); } + if (! m_threadIds.erase (std::this_thread::get_id())) + { + assert (false); + } --data.running; } @@ -539,7 +559,7 @@ private: { ScopedLock lock (m_mutex); - getNextJob (job, lock); + getNextJob (job); ++m_processCount; } @@ -567,7 +587,7 @@ private: { ScopedLock lock (m_mutex); - finishJob (job, lock); + finishJob (job); --m_processCount; checkStopped (lock); }