mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-01 16:35:53 +00:00
Add a way for us to place jobs with a concurrency limit.
The main use case is having all threads stuck in ledgerData, fighting each other.
This commit is contained in:
@@ -7,17 +7,20 @@
|
||||
Job::Job ()
|
||||
: mType (jtINVALID)
|
||||
, mJobIndex (0)
|
||||
, m_limit (0)
|
||||
{
|
||||
}
|
||||
|
||||
Job::Job (JobType type, uint64 index)
|
||||
: mType (type)
|
||||
, mJobIndex (index)
|
||||
, m_limit (0)
|
||||
{
|
||||
}
|
||||
|
||||
Job::Job (JobType type,
|
||||
std::string const& name,
|
||||
int limit,
|
||||
uint64 index,
|
||||
LoadMonitor& lm,
|
||||
FUNCTION_TYPE <void (Job&)> const& job)
|
||||
@@ -25,6 +28,7 @@ Job::Job (JobType type,
|
||||
, mJobIndex (index)
|
||||
, mJob (job)
|
||||
, mName (name)
|
||||
, m_limit(limit)
|
||||
{
|
||||
m_loadEvent = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
|
||||
}
|
||||
@@ -52,6 +56,11 @@ void Job::rename (std::string const& newName)
|
||||
mName = newName;
|
||||
}
|
||||
|
||||
int Job::getLimit () const
|
||||
{
|
||||
return m_limit;
|
||||
}
|
||||
|
||||
const char* Job::toString (JobType t)
|
||||
{
|
||||
switch (t)
|
||||
|
||||
@@ -66,6 +66,7 @@ public:
|
||||
// VFALCO TODO try to remove the dependency on LoadMonitor.
|
||||
Job (JobType type,
|
||||
std::string const& name,
|
||||
int limit,
|
||||
uint64 index,
|
||||
LoadMonitor& lm,
|
||||
FUNCTION_TYPE <void (Job&)> const& job);
|
||||
@@ -76,6 +77,8 @@ public:
|
||||
|
||||
void rename (const std::string& n);
|
||||
|
||||
int getLimit () const;
|
||||
|
||||
// These comparison operators make the jobs sort in priority order in the job set
|
||||
bool operator< (const Job& j) const;
|
||||
bool operator> (const Job& j) const;
|
||||
@@ -90,6 +93,7 @@ private:
|
||||
FUNCTION_TYPE <void (Job&)> mJob;
|
||||
LoadEvent::pointer m_loadEvent;
|
||||
std::string mName;
|
||||
int m_limit;
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -31,6 +31,11 @@ JobQueue::JobQueue (boost::asio::io_service& svc)
|
||||
}
|
||||
|
||||
void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& jobFunc)
|
||||
{
|
||||
addLimitJob(type, name, 0, jobFunc);
|
||||
}
|
||||
|
||||
void JobQueue::addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE<void (Job&)>& jobFunc)
|
||||
{
|
||||
assert (type != jtINVALID);
|
||||
|
||||
@@ -39,7 +44,7 @@ void JobQueue::addJob (JobType type, const std::string& name, const FUNCTION_TYP
|
||||
if (type != jtCLIENT) // FIXME: Workaround incorrect client shutdown ordering
|
||||
assert (mThreadCount != 0); // do not add jobs to a queue with no threads
|
||||
|
||||
mJobSet.insert (Job (type, name, ++mLastJob, mJobLoads[type], jobFunc));
|
||||
mJobSet.insert (Job (type, name, limit, ++mLastJob, mJobLoads[type], jobFunc));
|
||||
++mJobCounts[type].first;
|
||||
mJobCond.notify_one ();
|
||||
}
|
||||
@@ -232,6 +237,37 @@ void JobQueue::setThreadCount (int c, bool const standaloneMode)
|
||||
mJobCond.notify_one (); // in case we sucked up someone else's signal
|
||||
}
|
||||
|
||||
bool JobQueue::getJob(Job& job)
|
||||
{
|
||||
if (mJobSet.empty() || mShuttingDown)
|
||||
return false;
|
||||
|
||||
std::set<Job>::iterator it = mJobSet.begin ();
|
||||
|
||||
while (1)
|
||||
{
|
||||
// Are we out of jobs?
|
||||
if (it == mJobSet.end())
|
||||
return false;
|
||||
|
||||
// Does this job have no limit?
|
||||
if (it->getLimit() == 0)
|
||||
break;
|
||||
|
||||
// Is this job category below the limit?
|
||||
if (mJobCounts[it->getType()].second < it->getLimit())
|
||||
break;
|
||||
|
||||
// Try the next job, if any
|
||||
++it;
|
||||
}
|
||||
|
||||
job = *it;
|
||||
mJobSet.erase (it);
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
// do jobs until asked to stop
|
||||
void JobQueue::threadEntry ()
|
||||
{
|
||||
@@ -239,27 +275,32 @@ void JobQueue::threadEntry ()
|
||||
|
||||
while (1)
|
||||
{
|
||||
JobType type;
|
||||
|
||||
setCallingThreadName ("waiting");
|
||||
|
||||
while (mJobSet.empty () && !mShuttingDown)
|
||||
{
|
||||
mJobCond.wait (sl);
|
||||
}
|
||||
|
||||
if (mJobSet.empty ())
|
||||
break;
|
||||
|
||||
JobType type;
|
||||
std::set<Job>::iterator it = mJobSet.begin ();
|
||||
{
|
||||
Job job (*it);
|
||||
mJobSet.erase (it);
|
||||
Job job;
|
||||
while (!getJob(job))
|
||||
{
|
||||
if (mShuttingDown)
|
||||
{
|
||||
--mThreadCount;
|
||||
mJobCond.notify_all();
|
||||
return;
|
||||
}
|
||||
mJobCond.wait (sl);
|
||||
}
|
||||
|
||||
type = job.getType ();
|
||||
-- (mJobCounts[type].first);
|
||||
|
||||
if (type == jtDEATH)
|
||||
break;
|
||||
{
|
||||
--mThreadCount;
|
||||
mJobCond.notify_all();
|
||||
return;
|
||||
}
|
||||
|
||||
++ (mJobCounts[type].second);
|
||||
sl.unlock ();
|
||||
@@ -267,12 +308,10 @@ void JobQueue::threadEntry ()
|
||||
WriteLog (lsTRACE, JobQueue) << "Doing " << Job::toString (type) << " job";
|
||||
job.doJob ();
|
||||
} // must destroy job without holding lock
|
||||
|
||||
sl.lock ();
|
||||
-- (mJobCounts[type].second);
|
||||
}
|
||||
|
||||
--mThreadCount;
|
||||
mJobCond.notify_all ();
|
||||
}
|
||||
|
||||
// vim:ts=4
|
||||
|
||||
@@ -16,6 +16,7 @@ public:
|
||||
// have to call bind.
|
||||
//
|
||||
void addJob (JobType type, const std::string& name, const FUNCTION_TYPE<void (Job&)>& job);
|
||||
void addLimitJob (JobType type, const std::string& name, int limit, const FUNCTION_TYPE<void (Job&)>& job);
|
||||
|
||||
int getJobCount (JobType t); // Jobs waiting at this priority
|
||||
int getJobCountTotal (JobType t); // Jobs waiting plus running at this priority
|
||||
@@ -59,6 +60,8 @@ private:
|
||||
boost::asio::io_service& mIOService;
|
||||
|
||||
std::map<JobType, std::pair<int, int > > mJobCounts;
|
||||
|
||||
bool getJob (Job& job);
|
||||
};
|
||||
|
||||
#endif
|
||||
|
||||
@@ -2134,7 +2134,7 @@ void PeerImp::recvLedger (const boost::shared_ptr<protocol::TMLedgerData>& packe
|
||||
}
|
||||
|
||||
if (getApp().getInboundLedgers ().awaitLedgerData (hash))
|
||||
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "gotLedgerData",
|
||||
getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "gotLedgerData", 2,
|
||||
BIND_TYPE (&InboundLedgers::gotLedgerData, &getApp().getInboundLedgers (),
|
||||
P_1, hash, packet_ptr, boost::weak_ptr<Peer> (shared_from_this ())));
|
||||
else
|
||||
@@ -2369,7 +2369,7 @@ void PeerImp::doFetchPack (const boost::shared_ptr<protocol::TMGetObjectByHash>&
|
||||
return;
|
||||
}
|
||||
|
||||
getApp().getJobQueue ().addJob (jtPACK, "MakeFetchPack",
|
||||
getApp().getJobQueue ().addLimitJob (jtPACK, "MakeFetchPack", 1,
|
||||
BIND_TYPE (&NetworkOPs::makeFetchPack, &getApp().getOPs (), P_1,
|
||||
boost::weak_ptr<Peer> (shared_from_this ()), packet, wantLedger, haveLedger, UptimeTimer::getInstance ().getElapsedSeconds ()));
|
||||
}
|
||||
|
||||
@@ -82,7 +82,8 @@ void PeerSet::TimerEntry (boost::weak_ptr<PeerSet> wptr, const boost::system::er
|
||||
ptr->setTimer ();
|
||||
}
|
||||
else
|
||||
getApp().getJobQueue ().addJob (jtLEDGER_DATA, "timerEntry", BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr));
|
||||
getApp().getJobQueue ().addLimitJob (jtLEDGER_DATA, "timerEntry", 2,
|
||||
BIND_TYPE (&PeerSet::TimerJobEntry, P_1, ptr));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user