From 5b2d5e84280249fd380142d624667dd1fe9893b7 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Sat, 13 Jul 2013 19:02:35 -0700 Subject: [PATCH] Add a way for us to place jobs with a concurrency limit. The main use case is having all threads stuck in ledgerData, fighting each other. --- modules/ripple_core/functional/ripple_Job.cpp | 9 +++ modules/ripple_core/functional/ripple_Job.h | 4 + .../functional/ripple_JobQueue.cpp | 73 ++++++++++++++----- .../ripple_core/functional/ripple_JobQueue.h | 3 + src/cpp/ripple/ripple_Peer.cpp | 4 +- src/cpp/ripple/ripple_PeerSet.cpp | 3 +- 6 files changed, 76 insertions(+), 20 deletions(-) diff --git a/modules/ripple_core/functional/ripple_Job.cpp b/modules/ripple_core/functional/ripple_Job.cpp index fe0075a27..2c4b435b8 100644 --- a/modules/ripple_core/functional/ripple_Job.cpp +++ b/modules/ripple_core/functional/ripple_Job.cpp @@ -7,17 +7,20 @@ Job::Job () : mType (jtINVALID) , mJobIndex (0) + , m_limit (0) { } Job::Job (JobType type, uint64 index) : mType (type) , mJobIndex (index) + , m_limit (0) { } Job::Job (JobType type, std::string const& name, + int limit, uint64 index, LoadMonitor& lm, FUNCTION_TYPE const& job) @@ -25,6 +28,7 @@ Job::Job (JobType type, , mJobIndex (index) , mJob (job) , mName (name) + , m_limit(limit) { m_loadEvent = boost::make_shared (boost::ref (lm), name, false); } @@ -52,6 +56,11 @@ void Job::rename (std::string const& newName) mName = newName; } +int Job::getLimit () const +{ + return m_limit; +} + const char* Job::toString (JobType t) { switch (t) diff --git a/modules/ripple_core/functional/ripple_Job.h b/modules/ripple_core/functional/ripple_Job.h index a49589bf3..d1305d7ad 100644 --- a/modules/ripple_core/functional/ripple_Job.h +++ b/modules/ripple_core/functional/ripple_Job.h @@ -66,6 +66,7 @@ public: // VFALCO TODO try to remove the dependency on LoadMonitor. Job (JobType type, std::string const& name, + int limit, uint64 index, LoadMonitor& lm, FUNCTION_TYPE const& job); @@ -76,6 +77,8 @@ public: void rename (const std::string& n); + int getLimit () const; + // These comparison operators make the jobs sort in priority order in the job set bool operator< (const Job& j) const; bool operator> (const Job& j) const; @@ -90,6 +93,7 @@ private: FUNCTION_TYPE mJob; LoadEvent::pointer m_loadEvent; std::string mName; + int m_limit; }; #endif diff --git a/modules/ripple_core/functional/ripple_JobQueue.cpp b/modules/ripple_core/functional/ripple_JobQueue.cpp index e6741cd79..5d88673ea 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.cpp +++ b/modules/ripple_core/functional/ripple_JobQueue.cpp @@ -31,6 +31,11 @@ JobQueue::JobQueue (boost::asio::io_service& svc) } void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE& jobFunc) +{ + addLimitJob(type, name, 0, jobFunc); +} + +void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& jobFunc) { assert (type != jtINVALID); @@ -39,7 +44,7 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering assert (mThreadCount != 0); // do not add jobs to a queue with no threads - mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc)); + mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc)); ++mJobCounts[type].first; mJobCond.notify_one (); } @@ -232,6 +237,37 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode) mJobCond.notify_one (); // in case we sucked up someone else's signal } +bool JobQueue::getJob(Job& job) +{ + if (mJobSet.empty() || mShuttingDown) + return false; + + std::set::iterator it = mJobSet.begin (); + + while (1) + { + // Are we out of jobs? + if (it == mJobSet.end()) + return false; + + // Does this job have no limit? + if (it->getLimit() == 0) + break; + + // Is this job category below the limit? + if (mJobCounts[it->getType()].second < it->getLimit()) + break; + + // Try the next job, if any + ++it; + } + + job = *it; + mJobSet.erase (it); + + return true; +} + // do jobs until asked to stop void JobQueue::threadEntry () { @@ -239,27 +275,32 @@ void JobQueue::threadEntry () while (1) { + JobType type; + setCallingThreadName ("waiting"); - while (mJobSet.empty () && !mShuttingDown) { - mJobCond.wait (sl); - } - - if (mJobSet.empty ()) - break; - - JobType type; - std::set::iterator it = mJobSet.begin (); - { - Job job (*it); - mJobSet.erase (it); + Job job; + while (!getJob(job)) + { + if (mShuttingDown) + { + --mThreadCount; + mJobCond.notify_all(); + return; + } + mJobCond.wait (sl); + } type = job.getType (); -- (mJobCounts[type].first); if (type == jtDEATH) - break; + { + --mThreadCount; + mJobCond.notify_all(); + return; + } ++ (mJobCounts[type].second); sl.unlock (); @@ -267,12 +308,10 @@ void JobQueue::threadEntry () WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job"; job.doJob (); } // must destroy job without holding lock + sl.lock (); -- (mJobCounts[type].second); } - - --mThreadCount; - mJobCond.notify_all (); } // vim:ts=4 diff --git a/modules/ripple_core/functional/ripple_JobQueue.h b/modules/ripple_core/functional/ripple_JobQueue.h index 6526f90b3..30f877197 100644 --- a/modules/ripple_core/functional/ripple_JobQueue.h +++ b/modules/ripple_core/functional/ripple_JobQueue.h @@ -16,6 +16,7 @@ public: // have to call bind. // void addJob (JobType type, const std::string& name, const FUNCTION_TYPE& job); + void addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE& job); int getJobCount (JobType t); // Jobs waiting at this priority int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority @@ -59,6 +60,8 @@ private: boost::asio::io_service& mIOService; std::map > mJobCounts; + + bool getJob (Job& job); }; #endif diff --git a/src/cpp/ripple/ripple_Peer.cpp b/src/cpp/ripple/ripple_Peer.cpp index ab50643e5..d7bfe8947 100644 --- a/src/cpp/ripple/ripple_Peer.cpp +++ b/src/cpp/ripple/ripple_Peer.cpp @@ -2134,7 +2134,7 @@ void PeerImp::recvLedger (const boost::shared_ptr& packe } if (getApp().getInboundLedgers ().awaitLedgerData (hash)) - getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotLedgerData", + getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "gotLedgerData", 2, BIND_TYPE (&InboundLedgers::gotLedgerData, &getApp().getInboundLedgers (), P_1, hash, packet_ptr, boost::weak_ptr (shared_from_this ()))); else @@ -2369,7 +2369,7 @@ void PeerImp::doFetchPack (const boost::shared_ptr& return; } - getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack", + getApp().getJobQueue ().addLimitJob (jtPACK, "MakeFetchPack", 1, BIND_TYPE (&NetworkOPs::makeFetchPack, &getApp().getOPs (), P_1, boost::weak_ptr (shared_from_this ()), packet, wantLedger, haveLedger, UptimeTimer::getInstance ().getElapsedSeconds ())); } diff --git a/src/cpp/ripple/ripple_PeerSet.cpp b/src/cpp/ripple/ripple_PeerSet.cpp index ef490aed2..9f9f02b95 100644 --- a/src/cpp/ripple/ripple_PeerSet.cpp +++ b/src/cpp/ripple/ripple_PeerSet.cpp @@ -82,7 +82,8 @@ void PeerSet::TimerEntry (boost::weak_ptr wptr, const boost::system::er ptr->setTimer (); } else - getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntry", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); + getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "timerEntry", 2, + BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr)); } }