Remove 'skip on stop' job attribute

This commit is contained in:
Vinnie Falco
2016-02-26 15:59:07 -05:00
parent b82be0a9b0
commit 6e70a6c6f5
4 changed files with 68 additions and 250 deletions

View File

@@ -37,6 +37,14 @@ namespace ripple {
class Logs; class Logs;
/** A pool of threads to perform work. /** A pool of threads to perform work.
A job posted will always run to completion.
Coroutines that are suspended must be resumed,
and run to completion.
When the JobQueue stops, it waits for all jobs
and coroutines to finish.
*/ */
class JobQueue class JobQueue
: public beast::Stoppable : public beast::Stoppable
@@ -139,6 +147,8 @@ private:
void collect(); void collect();
JobTypeData& getJobTypeData (JobType type); JobTypeData& getJobTypeData (JobType type);
void onStop() override;
// Signals the service stopped if the stopped condition is met. // Signals the service stopped if the stopped condition is met.
void checkStopped (std::lock_guard <std::mutex> const& lock); void checkStopped (std::lock_guard <std::mutex> const& lock);
@@ -210,18 +220,11 @@ private:
// <none> // <none>
void processTask () override; void processTask () override;
// Returns `true` if all jobs of this type should be skipped when
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
bool skipOnStop (JobType type);
// Returns the limit of running jobs for the given job type. // Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that // For jobs with no limit, we return the largest int. Hopefully that
// will be enough. // will be enough.
int getJobLimit (JobType type); int getJobLimit (JobType type);
void onStop () override;
void onChildrenStopped () override; void onChildrenStopped () override;
}; };

View File

@@ -33,9 +33,6 @@ private:
/** The limit on the number of running jobs for this job type. */ /** The limit on the number of running jobs for this job type. */
int const m_limit; int const m_limit;
/** Can be skipped */
bool const m_skip;
/** Special jobs are not dispatched via the job queue */ /** Special jobs are not dispatched via the job queue */
bool const m_special; bool const m_special;
@@ -48,11 +45,10 @@ public:
JobTypeInfo () = delete; JobTypeInfo () = delete;
JobTypeInfo (JobType type, std::string name, int limit, JobTypeInfo (JobType type, std::string name, int limit,
bool skip, bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) bool special, std::uint64_t avgLatency, std::uint64_t peakLatency)
: m_type (type) : m_type (type)
, m_name (name) , m_name (name)
, m_limit (limit) , m_limit (limit)
, m_skip (skip)
, m_special (special) , m_special (special)
, m_avgLatency (avgLatency) , m_avgLatency (avgLatency)
, m_peakLatency (peakLatency) , m_peakLatency (peakLatency)
@@ -75,11 +71,6 @@ public:
return m_limit; return m_limit;
} }
bool skip () const
{
return m_skip;
}
bool special () const bool special () const
{ {
return m_special; return m_special;

View File

@@ -34,144 +34,47 @@ public:
using const_iterator = Map::const_iterator; using const_iterator = Map::const_iterator;
JobTypes () JobTypes ()
: m_unknown (jtINVALID, "invalid", 0, true, true, 0, 0) : m_unknown (jtINVALID, "invalid", 0, true, 0, 0)
{ {
int maxLimit = std::numeric_limits <int>::max (); int maxLimit = std::numeric_limits <int>::max ();
// Make a fetch pack for a peer add( jtPACK, "makeFetchPack", 1, false, 0, 0);
add (jtPACK, "makeFetchPack", add( jtPUBOLDLEDGER, "publishAcqLedger", 2, false, 10000, 15000);
1, true, false, 0, 0); add( jtVALIDATION_ut, "untrustedValidation", maxLimit, false, 2000, 5000);
add( jtTRANSACTION_l, "localTransaction", maxLimit, false, 100, 500);
add( jtLEDGER_REQ, "ledgerRequest", 2, false, 0, 0);
add( jtPROPOSAL_ut, "untrustedProposal", maxLimit, false, 500, 1250);
add( jtLEDGER_DATA, "ledgerData", 2, false, 0, 0);
add( jtCLIENT, "clientCommand", maxLimit, false, 2000, 5000);
add( jtRPC, "RPC", maxLimit, false, 0, 0);
add( jtUPDATE_PF, "updatePaths", maxLimit, false, 0, 0);
add( jtTRANSACTION, "transaction", maxLimit, false, 250, 1000);
add( jtBATCH, "batch", maxLimit, false, 250, 1000);
add( jtUNL, "unl", 1, false, 0, 0);
add( jtADVANCE, "advanceLedger", maxLimit, false, 0, 0);
add( jtPUBLEDGER, "publishNewLedger", maxLimit, false, 3000, 4500);
add( jtTXN_DATA, "fetchTxnData", 1, false, 0, 0);
add( jtWAL, "writeAhead", maxLimit, false, 1000, 2500);
add( jtVALIDATION_t, "trustedValidation", maxLimit, false, 500, 1500);
add( jtWRITE, "writeObjects", maxLimit, false, 1750, 2500);
add( jtACCEPT, "acceptLedger", maxLimit, false, 0, 0);
add( jtPROPOSAL_t, "trustedProposal", maxLimit, false, 100, 500);
add( jtSWEEP, "sweep", maxLimit, false, 0, 0);
add( jtNETOP_CLUSTER, "clusterReport", 1, false, 9999, 9999);
add( jtNETOP_TIMER, "heartbeat", 1, false, 999, 999);
add( jtADMIN, "administration", maxLimit, false, 0, 0);
// An old ledger has been accepted add( jtPEER, "peerCommand", 0, true, 200, 2500);
add (jtPUBOLDLEDGER, "publishAcqLedger", add( jtDISK, "diskAccess", 0, true, 500, 1000);
2, true, false, 10000, 15000); add( jtTXN_PROC, "processTransaction", 0, true, 0, 0);
add( jtOB_SETUP, "orderBookSetup", 0, true, 0, 0);
// A validation from an untrusted source add( jtPATH_FIND, "pathFind", 0, true, 0, 0);
add (jtVALIDATION_ut, "untrustedValidation", add( jtHO_READ, "nodeRead", 0, true, 0, 0);
maxLimit, true, false, 2000, 5000); add( jtHO_WRITE, "nodeWrite", 0, true, 0, 0);
add( jtGENERIC, "generic", 0, true, 0, 0);
// A local transaction add( jtNS_SYNC_READ, "SyncReadNode", 0, true, 0, 0);
add (jtTRANSACTION_l, "localTransaction", add( jtNS_ASYNC_READ, "AsyncReadNode", 0, true, 0, 0);
maxLimit, true, false, 100, 500); add( jtNS_WRITE, "WriteNode", 0, true, 0, 0);
// A request for ledger/txnset data from another server
add (jtLEDGER_REQ, "ledgerRequest",
2, true, false, 0, 0);
// A proposal from an untrusted source
add (jtPROPOSAL_ut, "untrustedProposal",
maxLimit, true, false, 500, 1250);
// Received data for a ledger we're acquiring
add (jtLEDGER_DATA, "ledgerData",
2, true, false, 0, 0);
// A websocket command from the client
add (jtCLIENT, "clientCommand",
maxLimit, true, false, 2000, 5000);
// A websocket command from the client
add (jtRPC, "RPC",
maxLimit, false, false, 0, 0);
// Update pathfinding requests
add (jtUPDATE_PF, "updatePaths",
maxLimit, true, false, 0, 0);
// A transaction received from the network
add (jtTRANSACTION, "transaction",
maxLimit, true, false, 250, 1000);
// Apply batched transactions
add (jtBATCH, "batch",
maxLimit, true, false, 250, 1000);
// A Score or Fetch of the UNL (DEPRECATED)
add (jtUNL, "unl",
1, true, false, 0, 0);
// Advance validated/acquired ledgers
add (jtADVANCE, "advanceLedger",
maxLimit, true, false, 0, 0);
// Publish a fully-accepted ledger
add (jtPUBLEDGER, "publishNewLedger",
maxLimit, true, false, 3000, 4500);
// Fetch a proposed set
add (jtTXN_DATA, "fetchTxnData",
1, true, false, 0, 0);
// Write-ahead logging
add (jtWAL, "writeAhead",
maxLimit, false, false, 1000, 2500);
// A validation from a trusted source
add (jtVALIDATION_t, "trustedValidation",
maxLimit, true, false, 500, 1500);
// Write out hashed objects
add (jtWRITE, "writeObjects",
maxLimit, false, false, 1750, 2500);
// Accept a consensus ledger
add (jtACCEPT, "acceptLedger",
maxLimit, false, false, 0, 0);
// A proposal from a trusted source
add (jtPROPOSAL_t, "trustedProposal",
maxLimit, false, false, 100, 500);
// Sweep for stale structures
add (jtSWEEP, "sweep",
maxLimit, true, false, 0, 0);
// NetworkOPs cluster peer report
add (jtNETOP_CLUSTER, "clusterReport",
1, true, false, 9999, 9999);
// NetworkOPs net timer processing
add (jtNETOP_TIMER, "heartbeat",
1, true, false, 999, 999);
// An administrative operation
add (jtADMIN, "administration",
maxLimit, true, false, 0, 0);
// The rest are special job types that are not dispatched
// by the job pool. The "limit" and "skip" attributes are
// not applicable to these types of jobs.
add (jtPEER, "peerCommand",
0, false, true, 200, 2500);
add (jtDISK, "diskAccess",
0, false, true, 500, 1000);
add (jtTXN_PROC, "processTransaction",
0, false, true, 0, 0);
add (jtOB_SETUP, "orderBookSetup",
0, false, true, 0, 0);
add (jtPATH_FIND, "pathFind",
0, false, true, 0, 0);
add (jtHO_READ, "nodeRead",
0, false, true, 0, 0);
add (jtHO_WRITE, "nodeWrite",
0, false, true, 0, 0);
add (jtGENERIC, "generic",
0, false, true, 0, 0);
add (jtNS_SYNC_READ, "SyncReadNode",
0, false, true, 0, 0);
add (jtNS_ASYNC_READ, "AsyncReadNode",
0, false, true, 0, 0);
add (jtNS_WRITE, "WriteNode",
0, false, true, 0, 0);
} }
@@ -213,14 +116,14 @@ public:
private: private:
void add(JobType jt, std::string name, int limit, void add(JobType jt, std::string name, int limit,
bool skip, bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) bool special, std::uint64_t avgLatency, std::uint64_t peakLatency)
{ {
assert (m_map.find (jt) == m_map.end ()); assert (m_map.find (jt) == m_map.end ());
std::pair<Map::iterator,bool> result (m_map.emplace ( std::pair<Map::iterator,bool> result (m_map.emplace (
std::piecewise_construct, std::piecewise_construct,
std::forward_as_tuple (jt), std::forward_as_tuple (jt),
std::forward_as_tuple (jt, name, limit, skip, special, std::forward_as_tuple (jt, name, limit, special,
avgLatency, peakLatency))); avgLatency, peakLatency)));
assert (result.second == true); assert (result.second == true);

View File

@@ -112,16 +112,6 @@ JobQueue::addJob (JobType type, std::string const& name,
! areChildrenStopped())); ! areChildrenStopped()));
} }
// Don't even add it to the queue if we're stopping
// and the job type is marked for skipOnStop.
//
if (isStopping() && skipOnStop (type))
{
JLOG(m_journal.debug) <<
"Skipping addJob ('" << name << "')";
return;
}
{ {
std::lock_guard <std::mutex> lock (m_mutex); std::lock_guard <std::mutex> lock (m_mutex);
@@ -330,6 +320,13 @@ JobQueue::getJobTypeData (JobType type)
return c->second; return c->second;
} }
void
JobQueue::onStop()
{
// onStop must be defined and empty here,
// otherwise the base class will do the wrong thing.
}
void void
JobQueue::checkStopped (std::lock_guard <std::mutex> const& lock) JobQueue::checkStopped (std::lock_guard <std::mutex> const& lock)
{ {
@@ -453,38 +450,24 @@ JobQueue::processTask ()
{ {
JobType type; JobType type;
{
Job::clock_type::time_point const start_time (
Job::clock_type::now());
{ {
Job job; Job job;
{ {
std::lock_guard <std::mutex> lock (m_mutex); std::lock_guard <std::mutex> lock (m_mutex);
getNextJob (job); getNextJob (job);
++m_processCount; ++m_processCount;
} }
type = job.getType();
JobTypeData& data (getJobTypeData (job.getType ())); JobTypeData& data(getJobTypeData(type));
// Skip the job if we are stopping and the
// skipOnStop flag is set for the job type
//
if (!isStopping() || !data.info.skip ())
{
beast::Thread::setCurrentThreadName (data.name ()); beast::Thread::setCurrentThreadName (data.name ());
JLOG(m_journal.trace) << "Doing " << data.name () << " job"; JLOG(m_journal.trace) << "Doing " << data.name () << " job";
Job::clock_type::time_point const start_time (
Job::clock_type::now());
on_dequeue (job.getType (), start_time - job.queue_time ()); on_dequeue (job.getType (), start_time - job.queue_time ());
job.doJob (); job.doJob ();
on_execute (job.getType (), Job::clock_type::now() - start_time);
} }
else on_execute(type, Job::clock_type::now() - start_time);
{
JLOG(m_journal.trace) << "Skipping processTask ('" << data.name () << "')";
}
type = job.getType();
} }
{ {
@@ -502,15 +485,6 @@ JobQueue::processTask ()
// to the associated LoadEvent object (in the Job) may be destroyed. // to the associated LoadEvent object (in the Job) may be destroyed.
} }
bool
JobQueue::skipOnStop (JobType type)
{
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
return j.skip ();
}
int int
JobQueue::getJobLimit (JobType type) JobQueue::getJobLimit (JobType type)
{ {
@@ -520,59 +494,6 @@ JobQueue::getJobLimit (JobType type)
return j.limit (); return j.limit ();
} }
void
JobQueue::onStop ()
{
// VFALCO NOTE I wanted to remove all the jobs that are skippable
// but then the Workers count of tasks to process
// goes wrong.
/*
{
std::lock_guard <std::mutex> lock (m_mutex);
// Remove all jobs whose type is skipOnStop
using JobDataMap = hash_map <JobType, std::size_t>;
JobDataMap counts;
bool const report (m_journal.debug.active());
for (std::set <Job>::const_iterator iter (m_jobSet.begin());
iter != m_jobSet.end();)
{
if (skipOnStop (iter->getType()))
{
if (report)
{
std::pair <JobDataMap::iterator, bool> result (
counts.insert (std::make_pair (iter->getType(), 1)));
if (! result.second)
++(result.first->second);
}
iter = m_jobSet.erase (iter);
}
else
{
++iter;
}
}
if (report)
{
beast::Journal::ScopedStream s (m_journal.debug);
for (JobDataMap::const_iterator iter (counts.begin());
iter != counts.end(); ++iter)
{
s << std::endl <<
"Removed " << iter->second <<
" skiponStop jobs of type " << Job::toString (iter->first);
}
}
}
*/
}
void void
JobQueue::onChildrenStopped () JobQueue::onChildrenStopped ()
{ {