diff --git a/TODO.txt b/TODO.txt index 8d7c502fda..35b260a81b 100644 --- a/TODO.txt +++ b/TODO.txt @@ -5,9 +5,10 @@ RIPPLE TODO Vinnie's List: Changes day to day, descending priority (Items marked '*' can be handled by others.) +- Allow skipped/disabled unit tests and reporting. - Show summary for text output of unit test results -* Make everyone check GitHub Issues every day - Make ProofOfWorkTests manual since they aren't used + - Do something about the throw() reporting weaknesses: * Make sure all Sconstruct and .pro builds have debug symbols in release * Replace all throw with beast::Throw() diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index e7d0026a8d..c3cd05168b 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -9,8 +9,6 @@ SETUP_LOG (JobQueue) JobQueue::JobQueue () : m_workers (*this, 0) , mLastJob (0) - , mThreadCount (0) - , mShuttingDown (false) { mJobLoads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000); mJobLoads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000); @@ -36,7 +34,7 @@ JobQueue::~JobQueue () void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc) { - addLimitJob(type, name, 0, jobFunc); + addLimitJob (type, name, 0, jobFunc); } void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& jobFunc) @@ -45,15 +43,16 @@ void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, co boost::mutex::scoped_lock sl (mJobLock); - if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering - assert (mThreadCount != 0); // do not add jobs to a queue with no threads + // FIXME: Workaround incorrect client shutdown ordering + // do not add jobs to a queue with no threads + bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0); std::pair< std::set ::iterator, bool > it = mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); - it.first->peekEvent().start(); // start timing how long it stays in the queue - ++mJobCounts[type].first; - //mJobCond.notify_one (); + it.first->peekEvent().start(); // start timing how long it stays in the queue + + ++mJobCounts[type].first; m_workers.addTask (); } @@ -62,7 +61,8 @@ int JobQueue::getJobCount (JobType t) { boost::mutex::scoped_lock sl (mJobLock); - std::map< JobType, std::pair >::iterator c = mJobCounts.find (t); + JobCounts::iterator c = mJobCounts.find (t); + return (c == mJobCounts.end ()) ? 0 : c->second.first; } @@ -70,7 +70,8 @@ int JobQueue::getJobCountTotal (JobType t) { boost::mutex::scoped_lock sl (mJobLock); - std::map< JobType, std::pair >::iterator c = mJobCounts.find (t); + JobCounts::iterator c = mJobCounts.find (t); + return (c == mJobCounts.end ()) ? 0 : (c->second.first + c->second.second); } @@ -81,11 +82,13 @@ int JobQueue::getJobCountGE (JobType t) boost::mutex::scoped_lock sl (mJobLock); - typedef std::map< JobType, std::pair >::value_type jt_int_pair; - BOOST_FOREACH (const jt_int_pair & it, mJobCounts) + typedef JobCounts::value_type jt_int_pair; - if (it.first >= t) - ret += it.second.first; + BOOST_FOREACH (jt_int_pair const& it, mJobCounts) + { + if (it.first >= t) + ret += it.second.first; + } return ret; } @@ -96,11 +99,15 @@ std::vector< std::pair > > JobQueue::getJobCounts ( std::vector< std::pair > > ret; boost::mutex::scoped_lock sl (mJobLock); + ret.reserve (mJobCounts.size ()); - typedef std::map< JobType, std::pair >::value_type jt_int_pair; + typedef JobCounts::value_type jt_int_pair; + BOOST_FOREACH (const jt_int_pair & it, mJobCounts) - ret.push_back (it); + { + ret.push_back (it); + } return ret; } @@ -108,9 +115,10 @@ std::vector< std::pair > > JobQueue::getJobCounts ( Json::Value JobQueue::getJson (int) { Json::Value ret (Json::objectValue); + boost::mutex::scoped_lock sl (mJobLock); - ret["threads"] = mThreadCount; + ret["threads"] = m_workers.getNumberOfThreads (); Json::Value priorities = Json::arrayValue; @@ -123,7 +131,7 @@ Json::Value JobQueue::getJson (int) int jobCount, threadCount; bool isOver; mJobLoads[i].getCountAndLatency (count, latencyAvg, latencyPeak, isOver); - std::map< JobType, std::pair >::iterator it = mJobCounts.find (static_cast (i)); + JobCounts::iterator it = mJobCounts.find (static_cast (i)); if (it == mJobCounts.end ()) { @@ -172,6 +180,7 @@ Json::Value JobQueue::getJson (int) bool JobQueue::isOverloaded () { int count = 0; + boost::mutex::scoped_lock sl (mJobLock); for (int i = 0; i < NUM_JOB_TYPES; ++i) @@ -181,16 +190,13 @@ bool JobQueue::isOverloaded () return count > 0; } +// shut down the job queue without completing pending jobs +// void JobQueue::shutdown () { - // shut down the job queue without completing pending jobs WriteLog (lsINFO, JobQueue) << "Job queue shutting down"; - boost::mutex::scoped_lock sl (mJobLock); - mShuttingDown = true; - mJobCond.notify_all (); - while (mThreadCount != 0) - mJobCond.wait (sl); + m_workers.pauseAllThreadsAndWait (); } // set the number of thread serving the job queue to precisely this number @@ -217,117 +223,44 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) WriteLog (lsINFO, JobQueue) << "Auto-tuning to " << c << " validation/transaction/proposal threads"; } - // VFALCO TODO Split the function up. The lower part actually does the "do", - // The part above this comment figures out the value for numThreads - // - m_workers.setNumberOfThreads (c); - - - boost::mutex::scoped_lock sl (mJobLock); - - while (mJobCounts[jtDEATH].first != 0) - { - mJobCond.wait (sl); - } - - while (mThreadCount < c) - { - ++mThreadCount; - boost::thread (BIND_TYPE (&JobQueue::threadEntry, this)).detach (); - } - - while (mThreadCount > c) - { - if (mJobCounts[jtDEATH].first != 0) - { - mJobCond.wait (sl); - } - else - { - mJobSet.insert (Job (jtDEATH, 0)); - ++ (mJobCounts[jtDEATH].first); - } - } - - mJobCond.notify_one (); // in case we sucked up someone else's signal } bool JobQueue::getJob(Job& job) { - if (mJobSet.empty() || mShuttingDown) - return false; + bool gotJob = false; - std::set::iterator it = mJobSet.begin (); - - while (1) + if (! mJobSet.empty ()) { - // Are we out of jobs? - if (it == mJobSet.end()) - return false; - - // Does this job have no limit? - if (it->getLimit() == 0) - break; - - // Is this job category below the limit? - if (mJobCounts[it->getType()].second < it->getLimit()) - break; - - // Try the next job, if any - ++it; - } - - job = *it; - mJobSet.erase (it); - - return true; -} - -// do jobs until asked to stop -void JobQueue::threadEntry () -{ - boost::mutex::scoped_lock sl (mJobLock); - - while (1) - { - JobType type; - - setCallingThreadName ("waiting"); + std::set::iterator it = mJobSet.begin (); + for (;;) { - Job job; - while (!getJob(job)) - { - if (mShuttingDown) - { - --mThreadCount; - mJobCond.notify_all(); - return; - } - mJobCond.wait (sl); - } + // VFALCO NOTE how can we be out of jobs if we just checked mJobSet.empty ()? + // + // Are we out of jobs? + if (it == mJobSet.end()) + return false; // VFALCO TODO get rid of this return from the middle - type = job.getType (); - -- (mJobCounts[type].first); + // Does this job have no limit? + if (it->getLimit() == 0) + break; - if (type == jtDEATH) - { - --mThreadCount; - mJobCond.notify_all(); - return; - } + // Is this job category below the limit? + if (mJobCounts[it->getType()].second < it->getLimit()) + break; - ++ (mJobCounts[type].second); - sl.unlock (); - setCallingThreadName (Job::toString (type)); - WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job"; - job.doJob (); - } // must destroy job without holding lock + // Try the next job, if any + ++it; + } - sl.lock (); - -- (mJobCounts[type].second); + job = *it; + mJobSet.erase (it); + + gotJob = true; } + + return gotJob; } void JobQueue::processTask () @@ -335,7 +268,7 @@ void JobQueue::processTask () { // This lock shouldn't be needed boost::mutex::scoped_lock lock (mJobLock); - + JobType type (jtINVALID); { diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index 0663604c89..eb258d3308 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -10,6 +10,8 @@ class JobQueue : private Workers::Callback { public: + typedef std::map > JobCounts; + JobQueue (); ~JobQueue (); @@ -48,25 +50,17 @@ public: Json::Value getJson (int c = 0); private: - void threadEntry (); - + bool getJob (Job& job); void processTask (); private: Workers m_workers; - - boost::mutex mJobLock; - boost::condition_variable mJobCond; - - uint64 mLastJob; - std::set mJobSet; - LoadMonitor mJobLoads [NUM_JOB_TYPES]; - int mThreadCount; - bool mShuttingDown; - - std::map > mJobCounts; - - bool getJob (Job& job); + + boost::mutex mJobLock; // VFALCO TODO Replace with CriticalSection + uint64 mLastJob; + std::set mJobSet; + LoadMonitor mJobLoads [NUM_JOB_TYPES]; + JobCounts mJobCounts; }; #endif