mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-20 02:55:50 +00:00
Clean up JobQueue for new Workers
This commit is contained in:
3
TODO.txt
3
TODO.txt
@@ -5,9 +5,10 @@ RIPPLE TODO
|
||||
Vinnie's List: Changes day to day, descending priority
|
||||
(Items marked '*' can be handled by others.)
|
||||
|
||||
- Allow skipped/disabled unit tests and reporting.
|
||||
- Show summary for text output of unit test results
|
||||
* Make everyone check GitHub Issues every day
|
||||
- Make ProofOfWorkTests manual since they aren't used
|
||||
|
||||
- Do something about the throw() reporting weaknesses:
|
||||
* Make sure all Sconstruct and .pro builds have debug symbols in release
|
||||
* Replace all throw with beast::Throw()
|
||||
|
||||
@@ -9,8 +9,6 @@ SETUP_LOG (JobQueue)
|
||||
JobQueue::JobQueue ()
|
||||
: m_workers (*this, 0)
|
||||
, mLastJob (0)
|
||||
, mThreadCount (0)
|
||||
, mShuttingDown (false)
|
||||
{
|
||||
mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
|
||||
mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
|
||||
@@ -36,7 +34,7 @@ JobQueue::~JobQueue ()
|
||||
|
||||
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
|
||||
{
|
||||
addLimitJob(type, name, 0, jobFunc);
|
||||
addLimitJob (type, name, 0, jobFunc);
|
||||
}
|
||||
|
||||
void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE<void (Job&)>& jobFunc)
|
||||
@@ -45,15 +43,16 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
|
||||
assert (mThreadCount != 0); // do not add jobs to a queue with no threads
|
||||
// FIXME: Workaround incorrect client shutdown ordering
|
||||
// do not add jobs to a queue with no threads
|
||||
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
|
||||
|
||||
std::pair< std::set <Job>::iterator, bool > it =
|
||||
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
|
||||
it.first->peekEvent().start(); // start timing how long it stays in the queue
|
||||
++mJobCounts[type].first;
|
||||
|
||||
//mJobCond.notify_one ();
|
||||
it.first->peekEvent().start(); // start timing how long it stays in the queue
|
||||
|
||||
++mJobCounts[type].first;
|
||||
|
||||
m_workers.addTask ();
|
||||
}
|
||||
@@ -62,7 +61,8 @@ int JobQueue::getJobCount (JobType t)
|
||||
{
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
|
||||
JobCounts::iterator c = mJobCounts.find (t);
|
||||
|
||||
return (c == mJobCounts.end ()) ? 0 : c->second.first;
|
||||
}
|
||||
|
||||
@@ -70,7 +70,8 @@ int JobQueue::getJobCountTotal (JobType t)
|
||||
{
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
std::map< JobType, std::pair<int, int> >::iterator c = mJobCounts.find (t);
|
||||
JobCounts::iterator c = mJobCounts.find (t);
|
||||
|
||||
return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second);
|
||||
}
|
||||
|
||||
@@ -81,11 +82,13 @@ int JobQueue::getJobCountGE (JobType t)
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
|
||||
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
|
||||
typedef JobCounts::value_type jt_int_pair;
|
||||
|
||||
if (it.first >= t)
|
||||
ret += it.second.first;
|
||||
BOOST_FOREACH (jt_int_pair const& it, mJobCounts)
|
||||
{
|
||||
if (it.first >= t)
|
||||
ret += it.second.first;
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@@ -96,11 +99,15 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
|
||||
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
ret.reserve (mJobCounts.size ());
|
||||
|
||||
typedef std::map< JobType, std::pair<int, int> >::value_type jt_int_pair;
|
||||
typedef JobCounts::value_type jt_int_pair;
|
||||
|
||||
BOOST_FOREACH (const jt_int_pair & it, mJobCounts)
|
||||
ret.push_back (it);
|
||||
{
|
||||
ret.push_back (it);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
@@ -108,9 +115,10 @@ std::vector< std::pair<JobType, std::pair<int, int> > > JobQueue::getJobCounts (
|
||||
Json::Value JobQueue::getJson (int)
|
||||
{
|
||||
Json::Value ret (Json::objectValue);
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
ret["threads"] = mThreadCount;
|
||||
ret["threads"] = m_workers.getNumberOfThreads ();
|
||||
|
||||
Json::Value priorities = Json::arrayValue;
|
||||
|
||||
@@ -123,7 +131,7 @@ Json::Value JobQueue::getJson (int)
|
||||
int jobCount, threadCount;
|
||||
bool isOver;
|
||||
mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver);
|
||||
std::map< JobType, std::pair<int, int> >::iterator it = mJobCounts.find (static_cast<JobType> (i));
|
||||
JobCounts::iterator it = mJobCounts.find (static_cast<JobType> (i));
|
||||
|
||||
if (it == mJobCounts.end ())
|
||||
{
|
||||
@@ -172,6 +180,7 @@ Json::Value JobQueue::getJson (int)
|
||||
bool JobQueue::isOverloaded ()
|
||||
{
|
||||
int count = 0;
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
for (int i = 0; i < NUM_JOB_TYPES; ++i)
|
||||
@@ -181,16 +190,13 @@ bool JobQueue::isOverloaded ()
|
||||
return count > 0;
|
||||
}
|
||||
|
||||
// shut down the job queue without completing pending jobs
|
||||
//
|
||||
void JobQueue::shutdown ()
|
||||
{
|
||||
// shut down the job queue without completing pending jobs
|
||||
WriteLog (lsINFO, JobQueue) << "Job queue shutting down";
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
mShuttingDown = true;
|
||||
mJobCond.notify_all ();
|
||||
|
||||
while (mThreadCount != 0)
|
||||
mJobCond.wait (sl);
|
||||
m_workers.pauseAllThreadsAndWait ();
|
||||
}
|
||||
|
||||
// set the number of thread serving the job queue to precisely this number
|
||||
@@ -217,117 +223,44 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
|
||||
WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads";
|
||||
}
|
||||
|
||||
// VFALCO TODO Split the function up. The lower part actually does the "do",
|
||||
// The part above this comment figures out the value for numThreads
|
||||
//
|
||||
|
||||
m_workers.setNumberOfThreads (c);
|
||||
|
||||
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
while (mJobCounts[jtDEATH].first != 0)
|
||||
{
|
||||
mJobCond.wait (sl);
|
||||
}
|
||||
|
||||
while (mThreadCount < c)
|
||||
{
|
||||
++mThreadCount;
|
||||
boost::thread (BIND_TYPE (&JobQueue::threadEntry, this)).detach ();
|
||||
}
|
||||
|
||||
while (mThreadCount > c)
|
||||
{
|
||||
if (mJobCounts[jtDEATH].first != 0)
|
||||
{
|
||||
mJobCond.wait (sl);
|
||||
}
|
||||
else
|
||||
{
|
||||
mJobSet.insert (Job (jtDEATH, 0));
|
||||
++ (mJobCounts[jtDEATH].first);
|
||||
}
|
||||
}
|
||||
|
||||
mJobCond.notify_one (); // in case we sucked up someone else's signal
|
||||
}
|
||||
|
||||
bool JobQueue::getJob(Job& job)
|
||||
{
|
||||
if (mJobSet.empty() || mShuttingDown)
|
||||
return false;
|
||||
bool gotJob = false;
|
||||
|
||||
std::set<Job>::iterator it = mJobSet.begin ();
|
||||
|
||||
while (1)
|
||||
if (! mJobSet.empty ())
|
||||
{
|
||||
// Are we out of jobs?
|
||||
if (it == mJobSet.end())
|
||||
return false;
|
||||
|
||||
// Does this job have no limit?
|
||||
if (it->getLimit() == 0)
|
||||
break;
|
||||
|
||||
// Is this job category below the limit?
|
||||
if (mJobCounts[it->getType()].second < it->getLimit())
|
||||
break;
|
||||
|
||||
// Try the next job, if any
|
||||
++it;
|
||||
}
|
||||
|
||||
job = *it;
|
||||
mJobSet.erase (it);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// do jobs until asked to stop
|
||||
void JobQueue::threadEntry ()
|
||||
{
|
||||
boost::mutex::scoped_lock sl (mJobLock);
|
||||
|
||||
while (1)
|
||||
{
|
||||
JobType type;
|
||||
|
||||
setCallingThreadName ("waiting");
|
||||
std::set<Job>::iterator it = mJobSet.begin ();
|
||||
|
||||
for (;;)
|
||||
{
|
||||
Job job;
|
||||
while (!getJob(job))
|
||||
{
|
||||
if (mShuttingDown)
|
||||
{
|
||||
--mThreadCount;
|
||||
mJobCond.notify_all();
|
||||
return;
|
||||
}
|
||||
mJobCond.wait (sl);
|
||||
}
|
||||
// VFALCO NOTE how can we be out of jobs if we just checked mJobSet.empty ()?
|
||||
//
|
||||
// Are we out of jobs?
|
||||
if (it == mJobSet.end())
|
||||
return false; // VFALCO TODO get rid of this return from the middle
|
||||
|
||||
type = job.getType ();
|
||||
-- (mJobCounts[type].first);
|
||||
// Does this job have no limit?
|
||||
if (it->getLimit() == 0)
|
||||
break;
|
||||
|
||||
if (type == jtDEATH)
|
||||
{
|
||||
--mThreadCount;
|
||||
mJobCond.notify_all();
|
||||
return;
|
||||
}
|
||||
// Is this job category below the limit?
|
||||
if (mJobCounts[it->getType()].second < it->getLimit())
|
||||
break;
|
||||
|
||||
++ (mJobCounts[type].second);
|
||||
sl.unlock ();
|
||||
setCallingThreadName (Job::toString (type));
|
||||
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
|
||||
job.doJob ();
|
||||
} // must destroy job without holding lock
|
||||
// Try the next job, if any
|
||||
++it;
|
||||
}
|
||||
|
||||
sl.lock ();
|
||||
-- (mJobCounts[type].second);
|
||||
job = *it;
|
||||
mJobSet.erase (it);
|
||||
|
||||
gotJob = true;
|
||||
}
|
||||
|
||||
return gotJob;
|
||||
}
|
||||
|
||||
void JobQueue::processTask ()
|
||||
@@ -335,7 +268,7 @@ void JobQueue::processTask ()
|
||||
{
|
||||
// This lock shouldn't be needed
|
||||
boost::mutex::scoped_lock lock (mJobLock);
|
||||
|
||||
|
||||
JobType type (jtINVALID);
|
||||
|
||||
{
|
||||
|
||||
@@ -10,6 +10,8 @@
|
||||
class JobQueue : private Workers::Callback
|
||||
{
|
||||
public:
|
||||
typedef std::map<JobType, std::pair<int, int > > JobCounts;
|
||||
|
||||
JobQueue ();
|
||||
|
||||
~JobQueue ();
|
||||
@@ -48,25 +50,17 @@ public:
|
||||
Json::Value getJson (int c = 0);
|
||||
|
||||
private:
|
||||
void threadEntry ();
|
||||
|
||||
bool getJob (Job& job);
|
||||
void processTask ();
|
||||
|
||||
private:
|
||||
Workers m_workers;
|
||||
|
||||
boost::mutex mJobLock;
|
||||
boost::condition_variable mJobCond;
|
||||
|
||||
uint64 mLastJob;
|
||||
std::set <Job> mJobSet;
|
||||
LoadMonitor mJobLoads [NUM_JOB_TYPES];
|
||||
int mThreadCount;
|
||||
bool mShuttingDown;
|
||||
|
||||
std::map<JobType, std::pair<int, int > > mJobCounts;
|
||||
|
||||
bool getJob (Job& job);
|
||||
|
||||
boost::mutex mJobLock; // VFALCO TODO Replace with CriticalSection
|
||||
uint64 mLastJob;
|
||||
std::set <Job> mJobSet;
|
||||
LoadMonitor mJobLoads [NUM_JOB_TYPES];
|
||||
JobCounts mJobCounts;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
Reference in New Issue
Block a user