New method JobQueue::getJobForThread().

* Clean up mutability.
* Add override specifiers.
* Get rid of unused parameters.
This commit is contained in:
Tom Ritchford
2015-06-01 18:35:57 -04:00
committed by Nik Bougalis
parent 24ea1ab035
commit b2b0377717
2 changed files with 53 additions and 25 deletions

View File

@@ -25,6 +25,8 @@
#include <beast/insight/Collector.h> #include <beast/insight/Collector.h>
#include <beast/threads/Stoppable.h> #include <beast/threads/Stoppable.h>
#include <boost/function.hpp> #include <boost/function.hpp>
#include <boost/optional.hpp>
#include <thread>
namespace ripple { namespace ripple {
@@ -45,13 +47,13 @@ public:
std::string const& name, boost::function <void (Job&)> const& job) = 0; std::string const& name, boost::function <void (Job&)> const& job) = 0;
// Jobs waiting at this priority // Jobs waiting at this priority
virtual int getJobCount (JobType t) = 0; virtual int getJobCount (JobType t) const = 0;
// Jobs waiting plus running at this priority // Jobs waiting plus running at this priority
virtual int getJobCountTotal (JobType t) = 0; virtual int getJobCountTotal (JobType t) const = 0;
// All waiting jobs at or greater than this priority // All waiting jobs at or greater than this priority
virtual int getJobCountGE (JobType t) = 0; virtual int getJobCountGE (JobType t) const = 0;
virtual void shutdown () = 0; virtual void shutdown () = 0;
@@ -60,19 +62,25 @@ public:
// VFALCO TODO Rename these to newLoadEventMeasurement or something similar // VFALCO TODO Rename these to newLoadEventMeasurement or something similar
// since they create the object. // since they create the object.
// //
virtual LoadEvent::pointer getLoadEvent (JobType t, std::string const& name) = 0; virtual LoadEvent::pointer getLoadEvent (
JobType t, std::string const& name) = 0;
// VFALCO TODO Why do we need two versions, one which returns a shared // VFALCO TODO Why do we need two versions, one which returns a shared
// pointer and the other which returns an autoptr? // pointer and the other which returns an autoptr?
// //
virtual LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name) = 0; virtual LoadEvent::autoptr getLoadEventAP (
JobType t, std::string const& name) = 0;
// Add multiple load events // Add multiple load events
virtual void addLoadEvents (JobType t, virtual void addLoadEvents (
int count, std::chrono::milliseconds elapsed) = 0; JobType t, int count, std::chrono::milliseconds elapsed) = 0;
virtual bool isOverloaded () = 0; virtual bool isOverloaded () = 0;
/** Get the Job corresponding to a thread. If no thread, use the current
thread. */
virtual Job* getJobForThread (std::thread::id const& id = {}) const = 0;
virtual Json::Value getJson (int c = 0) = 0; virtual Json::Value getJson (int c = 0) = 0;
}; };

View File

@@ -40,14 +40,17 @@ public:
using JobSet = std::set <Job>; using JobSet = std::set <Job>;
using JobDataMap = std::map <JobType, JobTypeData>; using JobDataMap = std::map <JobType, JobTypeData>;
using ScopedLock = std::lock_guard <std::mutex>; using ScopedLock = std::lock_guard <std::mutex>;
using ThreadIdMap = std::map <std::thread::id, Job*>;
beast::Journal m_journal; beast::Journal m_journal;
std::mutex m_mutex; mutable std::mutex m_mutex;
std::uint64_t m_lastJob; std::uint64_t m_lastJob;
JobSet m_jobSet; JobSet m_jobSet;
JobDataMap m_jobData; JobDataMap m_jobData;
JobTypeData m_invalidJobData; JobTypeData m_invalidJobData;
ThreadIdMap m_threadIds;
// The number of jobs currently in processTask() // The number of jobs currently in processTask()
int m_processCount; int m_processCount;
@@ -100,7 +103,7 @@ public:
} }
} }
~JobQueueImp () ~JobQueueImp () override
{ {
// Must unhook before destroying // Must unhook before destroying
hook = beast::insight::Hook (); hook = beast::insight::Hook ();
@@ -113,7 +116,7 @@ public:
} }
void addJob (JobType type, std::string const& name, void addJob (JobType type, std::string const& name,
boost::function <void (Job&)> const& jobFunc) boost::function <void (Job&)> const& jobFunc) override
{ {
assert (type != jtINVALID); assert (type != jtINVALID);
@@ -168,7 +171,7 @@ public:
} }
} }
int getJobCount (JobType t) int getJobCount (JobType t) const override
{ {
ScopedLock lock (m_mutex); ScopedLock lock (m_mutex);
@@ -179,7 +182,7 @@ public:
: c->second.waiting; : c->second.waiting;
} }
int getJobCountTotal (JobType t) int getJobCountTotal (JobType t) const override
{ {
ScopedLock lock (m_mutex); ScopedLock lock (m_mutex);
@@ -190,7 +193,7 @@ public:
: (c->second.waiting + c->second.running); : (c->second.waiting + c->second.running);
} }
int getJobCountGE (JobType t) int getJobCountGE (JobType t) const override
{ {
// return the number of jobs at this priority level or greater // return the number of jobs at this priority level or greater
int ret = 0; int ret = 0;
@@ -208,7 +211,7 @@ public:
// shut down the job queue without completing pending jobs // shut down the job queue without completing pending jobs
// //
void shutdown () void shutdown () override
{ {
m_journal.info << "Job queue shutting down"; m_journal.info << "Job queue shutting down";
@@ -216,7 +219,7 @@ public:
} }
// set the number of thread serving the job queue to precisely this number // set the number of thread serving the job queue to precisely this number
void setThreadCount (int c, bool const standaloneMode) void setThreadCount (int c, bool const standaloneMode) override
{ {
if (standaloneMode) if (standaloneMode)
{ {
@@ -234,8 +237,8 @@ public:
m_workers.setNumberOfThreads (c); m_workers.setNumberOfThreads (c);
} }
LoadEvent::pointer getLoadEvent (
LoadEvent::pointer getLoadEvent (JobType t, std::string const& name) JobType t, std::string const& name) override
{ {
JobDataMap::iterator iter (m_jobData.find (t)); JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ()); assert (iter != m_jobData.end ());
@@ -247,7 +250,8 @@ public:
std::ref (iter-> second.load ()), name, true); std::ref (iter-> second.load ()), name, true);
} }
LoadEvent::autoptr getLoadEventAP (JobType t, std::string const& name) LoadEvent::autoptr getLoadEventAP (
JobType t, std::string const& name) override
{ {
JobDataMap::iterator iter (m_jobData.find (t)); JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ()); assert (iter != m_jobData.end ());
@@ -260,14 +264,15 @@ public:
} }
void addLoadEvents (JobType t, void addLoadEvents (JobType t,
int count, std::chrono::milliseconds elapsed) int count, std::chrono::milliseconds elapsed) override
{ {
JobDataMap::iterator iter (m_jobData.find (t)); JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ()); assert (iter != m_jobData.end ());
iter->second.load().addSamples (count, elapsed); iter->second.load().addSamples (count, elapsed);
} }
bool isOverloaded () // Cannot be const because LoadMonitor has no const methods.
bool isOverloaded () override
{ {
int count = 0; int count = 0;
@@ -280,7 +285,8 @@ public:
return count > 0; return count > 0;
} }
Json::Value getJson (int) // Cannot be const because LoadMonitor has no const methods.
Json::Value getJson (int) override
{ {
Json::Value ret (Json::objectValue); Json::Value ret (Json::objectValue);
@@ -336,6 +342,14 @@ public:
return ret; return ret;
} }
Job* getJobForThread (std::thread::id const& id) const override
{
auto tid = (id == std::thread::id()) ? std::this_thread::get_id() : id;
auto i = m_threadIds.find (tid);
return (i == m_threadIds.end()) ? nullptr : i->second;
}
private: private:
//-------------------------------------------------------------------------- //--------------------------------------------------------------------------
JobTypeData& getJobTypeData (JobType type) JobTypeData& getJobTypeData (JobType type)
@@ -430,7 +444,7 @@ private:
// Invariants: // Invariants:
// The calling thread owns the JobLock // The calling thread owns the JobLock
// //
void getNextJob (Job& job, ScopedLock const& lock) void getNextJob (Job& job)
{ {
assert (! m_jobSet.empty ()); assert (! m_jobSet.empty ());
@@ -459,6 +473,8 @@ private:
job = *iter; job = *iter;
m_jobSet.erase (iter); m_jobSet.erase (iter);
m_threadIds[std::this_thread::get_id()] = &job;
--data.waiting; --data.waiting;
++data.running; ++data.running;
} }
@@ -478,7 +494,7 @@ private:
// Invariants: // Invariants:
// <none> // <none>
// //
void finishJob (Job const& job, ScopedLock const& lock) void finishJob (Job const& job)
{ {
JobType const type = job.getType (); JobType const type = job.getType ();
@@ -496,6 +512,10 @@ private:
m_workers.addTask (); m_workers.addTask ();
} }
if (! m_threadIds.erase (std::this_thread::get_id()))
{
assert (false);
}
--data.running; --data.running;
} }
@@ -539,7 +559,7 @@ private:
{ {
ScopedLock lock (m_mutex); ScopedLock lock (m_mutex);
getNextJob (job, lock); getNextJob (job);
++m_processCount; ++m_processCount;
} }
@@ -567,7 +587,7 @@ private:
{ {
ScopedLock lock (m_mutex); ScopedLock lock (m_mutex);
finishJob (job, lock); finishJob (job);
--m_processCount; --m_processCount;
checkStopped (lock); checkStopped (lock);
} }