From 6e70a6c6f5a44ea68cc97a92e397da22a28d7db7 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 26 Feb 2016 15:59:07 -0500 Subject: [PATCH] Remove 'skip on stop' job attribute --- src/ripple/core/JobQueue.h | 17 +-- src/ripple/core/JobTypeInfo.h | 11 +- src/ripple/core/JobTypes.h | 175 +++++++----------------------- src/ripple/core/impl/JobQueue.cpp | 115 +++----------------- 4 files changed, 68 insertions(+), 250 deletions(-) diff --git a/src/ripple/core/JobQueue.h b/src/ripple/core/JobQueue.h index 608d2f740..84d38c9d1 100644 --- a/src/ripple/core/JobQueue.h +++ b/src/ripple/core/JobQueue.h @@ -37,6 +37,14 @@ namespace ripple { class Logs; /** A pool of threads to perform work. + + A job posted will always run to completion. + + Coroutines that are suspended must be resumed, + and run to completion. + + When the JobQueue stops, it waits for all jobs + and coroutines to finish. */ class JobQueue : public beast::Stoppable @@ -139,6 +147,8 @@ private: void collect(); JobTypeData& getJobTypeData (JobType type); + void onStop() override; + // Signals the service stopped if the stopped condition is met. void checkStopped (std::lock_guard const& lock); @@ -210,18 +220,11 @@ private: // void processTask () override; - // Returns `true` if all jobs of this type should be skipped when - // the JobQueue receives a stop notification. If the job type isn't - // skipped, the Job will be called and the job must call Job::shouldCancel - // to determine if a long running or non-mandatory operation should be canceled. - bool skipOnStop (JobType type); - // Returns the limit of running jobs for the given job type. // For jobs with no limit, we return the largest int. Hopefully that // will be enough. int getJobLimit (JobType type); - void onStop () override; void onChildrenStopped () override; }; diff --git a/src/ripple/core/JobTypeInfo.h b/src/ripple/core/JobTypeInfo.h index 66a9f3b1d..cc251e384 100644 --- a/src/ripple/core/JobTypeInfo.h +++ b/src/ripple/core/JobTypeInfo.h @@ -33,9 +33,6 @@ private: /** The limit on the number of running jobs for this job type. */ int const m_limit; - /** Can be skipped */ - bool const m_skip; - /** Special jobs are not dispatched via the job queue */ bool const m_special; @@ -48,11 +45,10 @@ public: JobTypeInfo () = delete; JobTypeInfo (JobType type, std::string name, int limit, - bool skip, bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) + bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) : m_type (type) , m_name (name) , m_limit (limit) - , m_skip (skip) , m_special (special) , m_avgLatency (avgLatency) , m_peakLatency (peakLatency) @@ -75,11 +71,6 @@ public: return m_limit; } - bool skip () const - { - return m_skip; - } - bool special () const { return m_special; diff --git a/src/ripple/core/JobTypes.h b/src/ripple/core/JobTypes.h index 494a66280..97de34e8f 100644 --- a/src/ripple/core/JobTypes.h +++ b/src/ripple/core/JobTypes.h @@ -34,144 +34,47 @@ public: using const_iterator = Map::const_iterator; JobTypes () - : m_unknown (jtINVALID, "invalid", 0, true, true, 0, 0) + : m_unknown (jtINVALID, "invalid", 0, true, 0, 0) { int maxLimit = std::numeric_limits ::max (); - // Make a fetch pack for a peer - add (jtPACK, "makeFetchPack", - 1, true, false, 0, 0); +add( jtPACK, "makeFetchPack", 1, false, 0, 0); +add( jtPUBOLDLEDGER, "publishAcqLedger", 2, false, 10000, 15000); +add( jtVALIDATION_ut, "untrustedValidation", maxLimit, false, 2000, 5000); +add( jtTRANSACTION_l, "localTransaction", maxLimit, false, 100, 500); +add( jtLEDGER_REQ, "ledgerRequest", 2, false, 0, 0); +add( jtPROPOSAL_ut, "untrustedProposal", maxLimit, false, 500, 1250); +add( jtLEDGER_DATA, "ledgerData", 2, false, 0, 0); +add( jtCLIENT, "clientCommand", maxLimit, false, 2000, 5000); +add( jtRPC, "RPC", maxLimit, false, 0, 0); +add( jtUPDATE_PF, "updatePaths", maxLimit, false, 0, 0); +add( jtTRANSACTION, "transaction", maxLimit, false, 250, 1000); +add( jtBATCH, "batch", maxLimit, false, 250, 1000); +add( jtUNL, "unl", 1, false, 0, 0); +add( jtADVANCE, "advanceLedger", maxLimit, false, 0, 0); +add( jtPUBLEDGER, "publishNewLedger", maxLimit, false, 3000, 4500); +add( jtTXN_DATA, "fetchTxnData", 1, false, 0, 0); +add( jtWAL, "writeAhead", maxLimit, false, 1000, 2500); +add( jtVALIDATION_t, "trustedValidation", maxLimit, false, 500, 1500); +add( jtWRITE, "writeObjects", maxLimit, false, 1750, 2500); +add( jtACCEPT, "acceptLedger", maxLimit, false, 0, 0); +add( jtPROPOSAL_t, "trustedProposal", maxLimit, false, 100, 500); +add( jtSWEEP, "sweep", maxLimit, false, 0, 0); +add( jtNETOP_CLUSTER, "clusterReport", 1, false, 9999, 9999); +add( jtNETOP_TIMER, "heartbeat", 1, false, 999, 999); +add( jtADMIN, "administration", maxLimit, false, 0, 0); - // An old ledger has been accepted - add (jtPUBOLDLEDGER, "publishAcqLedger", - 2, true, false, 10000, 15000); - - // A validation from an untrusted source - add (jtVALIDATION_ut, "untrustedValidation", - maxLimit, true, false, 2000, 5000); - - // A local transaction - add (jtTRANSACTION_l, "localTransaction", - maxLimit, true, false, 100, 500); - - // A request for ledger/txnset data from another server - add (jtLEDGER_REQ, "ledgerRequest", - 2, true, false, 0, 0); - - // A proposal from an untrusted source - add (jtPROPOSAL_ut, "untrustedProposal", - maxLimit, true, false, 500, 1250); - - // Received data for a ledger we're acquiring - add (jtLEDGER_DATA, "ledgerData", - 2, true, false, 0, 0); - - // A websocket command from the client - add (jtCLIENT, "clientCommand", - maxLimit, true, false, 2000, 5000); - - // A websocket command from the client - add (jtRPC, "RPC", - maxLimit, false, false, 0, 0); - - // Update pathfinding requests - add (jtUPDATE_PF, "updatePaths", - maxLimit, true, false, 0, 0); - - // A transaction received from the network - add (jtTRANSACTION, "transaction", - maxLimit, true, false, 250, 1000); - - // Apply batched transactions - add (jtBATCH, "batch", - maxLimit, true, false, 250, 1000); - - // A Score or Fetch of the UNL (DEPRECATED) - add (jtUNL, "unl", - 1, true, false, 0, 0); - - // Advance validated/acquired ledgers - add (jtADVANCE, "advanceLedger", - maxLimit, true, false, 0, 0); - - // Publish a fully-accepted ledger - add (jtPUBLEDGER, "publishNewLedger", - maxLimit, true, false, 3000, 4500); - - // Fetch a proposed set - add (jtTXN_DATA, "fetchTxnData", - 1, true, false, 0, 0); - - // Write-ahead logging - add (jtWAL, "writeAhead", - maxLimit, false, false, 1000, 2500); - - // A validation from a trusted source - add (jtVALIDATION_t, "trustedValidation", - maxLimit, true, false, 500, 1500); - - // Write out hashed objects - add (jtWRITE, "writeObjects", - maxLimit, false, false, 1750, 2500); - - // Accept a consensus ledger - add (jtACCEPT, "acceptLedger", - maxLimit, false, false, 0, 0); - - // A proposal from a trusted source - add (jtPROPOSAL_t, "trustedProposal", - maxLimit, false, false, 100, 500); - - // Sweep for stale structures - add (jtSWEEP, "sweep", - maxLimit, true, false, 0, 0); - - // NetworkOPs cluster peer report - add (jtNETOP_CLUSTER, "clusterReport", - 1, true, false, 9999, 9999); - - // NetworkOPs net timer processing - add (jtNETOP_TIMER, "heartbeat", - 1, true, false, 999, 999); - - // An administrative operation - add (jtADMIN, "administration", - maxLimit, true, false, 0, 0); - - // The rest are special job types that are not dispatched - // by the job pool. The "limit" and "skip" attributes are - // not applicable to these types of jobs. - - add (jtPEER, "peerCommand", - 0, false, true, 200, 2500); - - add (jtDISK, "diskAccess", - 0, false, true, 500, 1000); - - add (jtTXN_PROC, "processTransaction", - 0, false, true, 0, 0); - - add (jtOB_SETUP, "orderBookSetup", - 0, false, true, 0, 0); - - add (jtPATH_FIND, "pathFind", - 0, false, true, 0, 0); - - add (jtHO_READ, "nodeRead", - 0, false, true, 0, 0); - - add (jtHO_WRITE, "nodeWrite", - 0, false, true, 0, 0); - - add (jtGENERIC, "generic", - 0, false, true, 0, 0); - - add (jtNS_SYNC_READ, "SyncReadNode", - 0, false, true, 0, 0); - add (jtNS_ASYNC_READ, "AsyncReadNode", - 0, false, true, 0, 0); - add (jtNS_WRITE, "WriteNode", - 0, false, true, 0, 0); +add( jtPEER, "peerCommand", 0, true, 200, 2500); +add( jtDISK, "diskAccess", 0, true, 500, 1000); +add( jtTXN_PROC, "processTransaction", 0, true, 0, 0); +add( jtOB_SETUP, "orderBookSetup", 0, true, 0, 0); +add( jtPATH_FIND, "pathFind", 0, true, 0, 0); +add( jtHO_READ, "nodeRead", 0, true, 0, 0); +add( jtHO_WRITE, "nodeWrite", 0, true, 0, 0); +add( jtGENERIC, "generic", 0, true, 0, 0); +add( jtNS_SYNC_READ, "SyncReadNode", 0, true, 0, 0); +add( jtNS_ASYNC_READ, "AsyncReadNode", 0, true, 0, 0); +add( jtNS_WRITE, "WriteNode", 0, true, 0, 0); } @@ -213,14 +116,14 @@ public: private: void add(JobType jt, std::string name, int limit, - bool skip, bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) + bool special, std::uint64_t avgLatency, std::uint64_t peakLatency) { assert (m_map.find (jt) == m_map.end ()); std::pair result (m_map.emplace ( std::piecewise_construct, std::forward_as_tuple (jt), - std::forward_as_tuple (jt, name, limit, skip, special, + std::forward_as_tuple (jt, name, limit, special, avgLatency, peakLatency))); assert (result.second == true); diff --git a/src/ripple/core/impl/JobQueue.cpp b/src/ripple/core/impl/JobQueue.cpp index c10c8c78b..b90bab82b 100644 --- a/src/ripple/core/impl/JobQueue.cpp +++ b/src/ripple/core/impl/JobQueue.cpp @@ -112,16 +112,6 @@ JobQueue::addJob (JobType type, std::string const& name, ! areChildrenStopped())); } - // Don't even add it to the queue if we're stopping - // and the job type is marked for skipOnStop. - // - if (isStopping() && skipOnStop (type)) - { - JLOG(m_journal.debug) << - "Skipping addJob ('" << name << "')"; - return; - } - { std::lock_guard lock (m_mutex); @@ -330,6 +320,13 @@ JobQueue::getJobTypeData (JobType type) return c->second; } +void +JobQueue::onStop() +{ + // onStop must be defined and empty here, + // otherwise the base class will do the wrong thing. +} + void JobQueue::checkStopped (std::lock_guard const& lock) { @@ -454,37 +451,23 @@ JobQueue::processTask () JobType type; { - Job job; - - { - std::lock_guard lock (m_mutex); - getNextJob (job); - ++m_processCount; - } - - JobTypeData& data (getJobTypeData (job.getType ())); - - // Skip the job if we are stopping and the - // skipOnStop flag is set for the job type - // - if (!isStopping() || !data.info.skip ()) + Job::clock_type::time_point const start_time ( + Job::clock_type::now()); { + Job job; + { + std::lock_guard lock (m_mutex); + getNextJob (job); + ++m_processCount; + } + type = job.getType(); + JobTypeData& data(getJobTypeData(type)); beast::Thread::setCurrentThreadName (data.name ()); JLOG(m_journal.trace) << "Doing " << data.name () << " job"; - - Job::clock_type::time_point const start_time ( - Job::clock_type::now()); - on_dequeue (job.getType (), start_time - job.queue_time ()); job.doJob (); - on_execute (job.getType (), Job::clock_type::now() - start_time); } - else - { - JLOG(m_journal.trace) << "Skipping processTask ('" << data.name () << "')"; - } - - type = job.getType(); + on_execute(type, Job::clock_type::now() - start_time); } { @@ -502,15 +485,6 @@ JobQueue::processTask () // to the associated LoadEvent object (in the Job) may be destroyed. } -bool -JobQueue::skipOnStop (JobType type) -{ - JobTypeInfo const& j (getJobTypes ().get (type)); - assert (j.type () != jtINVALID); - - return j.skip (); -} - int JobQueue::getJobLimit (JobType type) { @@ -520,59 +494,6 @@ JobQueue::getJobLimit (JobType type) return j.limit (); } -void -JobQueue::onStop () -{ - // VFALCO NOTE I wanted to remove all the jobs that are skippable - // but then the Workers count of tasks to process - // goes wrong. - - /* - { - std::lock_guard lock (m_mutex); - - // Remove all jobs whose type is skipOnStop - using JobDataMap = hash_map ; - JobDataMap counts; - bool const report (m_journal.debug.active()); - - for (std::set ::const_iterator iter (m_jobSet.begin()); - iter != m_jobSet.end();) - { - if (skipOnStop (iter->getType())) - { - if (report) - { - std::pair result ( - counts.insert (std::make_pair (iter->getType(), 1))); - if (! result.second) - ++(result.first->second); - } - - iter = m_jobSet.erase (iter); - } - else - { - ++iter; - } - } - - if (report) - { - beast::Journal::ScopedStream s (m_journal.debug); - - for (JobDataMap::const_iterator iter (counts.begin()); - iter != counts.end(); ++iter) - { - s << std::endl << - "Removed " << iter->second << - " skiponStop jobs of type " << Job::toString (iter->first); - } - } - } - */ -} - void JobQueue::onChildrenStopped () {