Refactor Job tracking and statistics gathering

This commit is contained in:
Nik Bougalis
2014-02-21 11:33:59 -08:00
committed by Vinnie Falco
parent 616a53888e
commit 7cd63489f4
7 changed files with 633 additions and 439 deletions

View File

@@ -31,19 +31,6 @@ Job::Job (JobType type, uint64 index)
{
}
#if 0
Job::Job (Job const& other)
: m_cancelCallback (other.m_cancelCallback)
, mType (other.mType)
, mJobIndex (other.mJobIndex)
, mJob (other.mJob)
, m_loadEvent (other.m_loadEvent)
, mName (other.mName)
, m_queue_time (other.m_queue_time)
{
}
#endif
Job::Job (JobType type,
std::string const& name,
uint64 index,
@@ -60,19 +47,6 @@ Job::Job (JobType type,
m_loadEvent = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
}
/*
Job& Job::operator= (Job const& other)
{
mType = other.mType;
mJobIndex = other.mJobIndex;
mJob = other.mJob;
m_loadEvent = other.m_loadEvent;
mName = other.mName;
m_cancelCallback = other.m_cancelCallback;
return *this;
}
*/
JobType Job::getType () const
{
return mType;
@@ -109,52 +83,6 @@ void Job::rename (std::string const& newName)
mName = newName;
}
const char* Job::toString (JobType t)
{
switch (t)
{
case jtINVALID: return "invalid";
case jtPACK: return "peerLedgerReq";
case jtPUBOLDLEDGER: return "publishAcqLedger";
case jtVALIDATION_ut: return "untrustedValidation";
case jtPROOFWORK: return "proofOfWork";
case jtTRANSACTION_l: return "localTransaction";
case jtPROPOSAL_ut: return "untrustedProposal";
case jtLEDGER_DATA: return "ledgerData";
case jtUPDATE_PF: return "updatePaths";
case jtCLIENT: return "clientCommand";
case jtRPC: return "RPC";
case jtTRANSACTION: return "transaction";
case jtUNL: return "unl";
case jtADVANCE: return "advanceLedger";
case jtPUBLEDGER: return "publishNewLedger";
case jtTXN_DATA: return "fetchTxnData";
case jtWAL: return "writeAhead";
case jtVALIDATION_t: return "trustedValidation";
case jtWRITE: return "writeObjects";
case jtACCEPT: return "acceptLedger";
case jtPROPOSAL_t: return "trustedProposal";
case jtSWEEP: return "sweep";
case jtNETOP_CLUSTER: return "clusterReport";
case jtNETOP_TIMER: return "heartbeat";
case jtADMIN: return "administration";
// special types not dispatched by the job pool
case jtPEER: return "peerCommand";
case jtDISK: return "diskAccess";
case jtTXN_PROC: return "processTransaction";
case jtOB_SETUP: return "orderBookSetup";
case jtPATH_FIND: return "pathFind";
case jtHO_READ: return "nodeRead";
case jtHO_WRITE: return "nodeWrite";
case jtGENERIC: return "generic";
default:
assert (false);
return "unknown";
}
}
bool Job::operator> (const Job& j) const
{
if (mType < j.mType)

View File

@@ -24,48 +24,51 @@ namespace ripple {
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
{
// must be in priority order, low to high
jtINVALID = -1,
jtPACK = 1, // Make a fetch pack for a peer
jtPUBOLDLEDGER = 2, // An old ledger has been accepted
jtVALIDATION_ut = 3, // A validation from an untrusted source
jtPROOFWORK = 4, // A proof of work demand from another server
jtTRANSACTION_l = 5, // A local transaction
jtPROPOSAL_ut = 6, // A proposal from an untrusted source
jtLEDGER_DATA = 7, // Received data for a ledger we're acquiring
jtUPDATE_PF = 8, // Update pathfinding requests
jtCLIENT = 9, // A websocket command from the client
jtRPC = 10, // A websocket command from the client
jtTRANSACTION = 11, // A transaction received from the network
jtUNL = 12, // A Score or Fetch of the UNL (DEPRECATED)
jtADVANCE = 13, // Advance validated/acquired ledgers
jtPUBLEDGER = 14, // Publish a fully-accepted ledger
jtTXN_DATA = 15, // Fetch a proposed set
jtWAL = 16, // Write-ahead logging
jtVALIDATION_t = 17, // A validation from a trusted source
jtWRITE = 18, // Write out hashed objects
jtACCEPT = 19, // Accept a consensus ledger
jtPROPOSAL_t = 20, // A proposal from a trusted source
jtSWEEP = 21, // Sweep for stale structures
jtNETOP_CLUSTER = 22, // NetworkOPs cluster peer report
jtNETOP_TIMER = 23, // NetworkOPs net timer processing
jtADMIN = 24, // An administrative operation
// Special type indicating an invalid job - will go away soon.
jtINVALID = -1,
// special types not dispatched by the job pool
jtPEER = 30,
jtDISK = 31,
jtTXN_PROC = 32,
jtOB_SETUP = 33,
jtPATH_FIND = 34,
jtHO_READ = 35,
jtHO_WRITE = 36,
jtGENERIC = 37, // Used just to measure time
}; // CAUTION: If you add new types, add them to Job.cpp too
// Job types - the position in this enum indicates the job priority with
// earlier jobs having lower priority than later jobs. If you wish to
// insert a job at a specific priority, simply add it at the right location.
jtPACK, // Make a fetch pack for a peer
jtPUBOLDLEDGER, // An old ledger has been accepted
jtVALIDATION_ut, // A validation from an untrusted source
jtPROOFWORK, // A proof of work demand from another server
jtTRANSACTION_l, // A local transaction
jtPROPOSAL_ut, // A proposal from an untrusted source
jtLEDGER_DATA, // Received data for a ledger we're acquiring
jtUPDATE_PF, // Update pathfinding requests
jtCLIENT, // A websocket command from the client
jtRPC, // A websocket command from the client
jtTRANSACTION, // A transaction received from the network
jtUNL, // A Score or Fetch of the UNL (DEPRECATED)
jtADVANCE, // Advance validated/acquired ledgers
jtPUBLEDGER, // Publish a fully-accepted ledger
jtTXN_DATA, // Fetch a proposed set
jtWAL, // Write-ahead logging
jtVALIDATION_t, // A validation from a trusted source
jtWRITE, // Write out hashed objects
jtACCEPT, // Accept a consensus ledger
jtPROPOSAL_t, // A proposal from a trusted source
jtSWEEP, // Sweep for stale structures
jtNETOP_CLUSTER, // NetworkOPs cluster peer report
jtNETOP_TIMER, // NetworkOPs net timer processing
jtADMIN, // An administrative operation
// VFALCO TODO move this into the enum so it calculates itself?
#define NUM_JOB_TYPES 48 // why 48 and not 38?
// Special job types which are not dispatched by the job pool
jtPEER ,
jtDISK ,
jtTXN_PROC ,
jtOB_SETUP ,
jtPATH_FIND ,
jtHO_READ ,
jtHO_WRITE ,
jtGENERIC , // Used just to measure time
};
class Job
{
@@ -113,14 +116,13 @@ public:
void rename (const std::string& n);
// These comparison operators make the jobs sort in priority order in the job set
// These comparison operators make the jobs sort in priority order
// in the job set
bool operator< (const Job& j) const;
bool operator> (const Job& j) const;
bool operator<= (const Job& j) const;
bool operator>= (const Job& j) const;
static const char* toString (JobType);
private:
CancelCallback m_cancelCallback;
JobType mType;

View File

@@ -18,6 +18,9 @@
//==============================================================================
#include "JobQueue.h"
#include "JobTypes.h"
#include "JobTypeInfo.h"
#include "JobTypeData.h"
#include "beast/beast/make_unique.h"
#include "beast/beast/chrono/chrono_util.h"
@@ -31,186 +34,96 @@ class JobQueueImp
, private Workers::Callback
{
public:
struct Stats
{
insight::Hook hook;
insight::Gauge job_count;
// VFALCO TODO should enumerate the map of jobtypes instead
explicit Stats (insight::Collector::ptr const& collector)
: m_collector (collector)
{
add (jtPACK , "make_pack");
add (jtPUBOLDLEDGER , "pub_oldledgx");
add (jtVALIDATION_ut, "ut_validx");
add (jtPROOFWORK , "proof_of_work");
add (jtTRANSACTION_l, "local_tx");
add (jtPROPOSAL_ut , "ut_proposal");
add (jtLEDGER_DATA , "ledgx_data");
add (jtUPDATE_PF , "upd_paths");
add (jtCLIENT , "client_cmd");
add (jtRPC , "rpc_cmd");
add (jtTRANSACTION , "recv_tx");
add (jtUNL , "unl_op");
add (jtADVANCE , "next_ledgx");
add (jtPUBLEDGER , "pub_ledgx");
add (jtTXN_DATA , "tx_data");
add (jtWAL , "wal");
add (jtVALIDATION_t , "t_validx");
add (jtWRITE , "write");
add (jtACCEPT , "accept_ledgx");
add (jtPROPOSAL_t , "t_prop");
add (jtSWEEP , "sweep");
add (jtNETOP_CLUSTER, "netop_clust");
add (jtNETOP_TIMER , "netop_heart");
add (jtADMIN , "admin");
}
template <class Rep, class Period>
void on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value) const
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
m_dequeue.find (type)->second.notify (ms);
}
template <class Rep, class Period>
void on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value) const
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
m_execute.find (type)->second.notify (ms);
}
private:
void add (JobType type, std::string const& label)
{
m_dequeue.emplace (type, m_collector->make_event (label + "_q"));
m_execute.emplace (type, m_collector->make_event (label));
}
typedef std::unordered_map <JobType, insight::Event,
std::hash <int>> JobEvents;
insight::Collector::ptr m_collector;
JobEvents m_dequeue;
JobEvents m_execute;
};
//--------------------------------------------------------------------------
struct Count
{
Count () noexcept
: type (jtINVALID)
, waiting (0)
, running (0)
, deferred (0)
{
}
Count (JobType type_) noexcept
: type (type_)
, waiting (0)
, running (0)
, deferred (0)
{
}
JobType type; // The type of Job these counts reflect
int waiting; // The number waiting
int running; // How many are running
int deferred; // Number of jobs we didn't signal due to limits
};
typedef std::set <Job> JobSet;
typedef std::map <JobType, Count> MapType;
typedef std::map <JobType, JobTypeData> JobDataMap;
typedef CriticalSection::ScopedLockType ScopedLock;
Journal m_journal;
Stats m_stats;
CriticalSection m_mutex;
uint64 m_lastJob;
JobSet m_jobSet;
MapType m_jobCounts;
JobDataMap m_jobData;
JobTypeData m_invalidJobData;
// The number of jobs running through processTask()
// The number of jobs currently in processTask()
int m_processCount;
Workers m_workers;
LoadMonitor m_loads [NUM_JOB_TYPES];
CancelCallback m_cancelCallback;
//--------------------------------------------------------------------------
// statistics tracking
insight::Collector::ptr m_collector;
insight::Gauge job_count;
insight::Hook hook;
//--------------------------------------------------------------------------
static JobTypes const& getJobTypes ()
{
static JobTypes types;
return types;
}
//--------------------------------------------------------------------------
JobQueueImp (insight::Collector::ptr const& collector,
Stoppable& parent, Journal journal)
: JobQueue ("JobQueue", parent)
, m_journal (journal)
, m_stats (collector)
, m_lastJob (0)
, m_invalidJobData (getJobTypes ().getInvalid (), collector)
, m_processCount (0)
, m_workers (*this, "JobQueue", 0)
, m_cancelCallback (boost::bind (&Stoppable::isStopping, this))
, m_collector (collector)
{
m_stats.hook = collector->make_hook (std::bind (
hook = m_collector->make_hook (std::bind (
&JobQueueImp::collect, this));
m_stats.job_count = collector->make_gauge ("job_count");
job_count = m_collector->make_gauge ("job_count");
{
ScopedLock lock (m_mutex);
// Initialize the job counts.
// The 'limit' field in particular will be set based on the limit
for (int i = 0; i < NUM_JOB_TYPES; ++i)
for (auto const& x : getJobTypes ())
{
JobType const type (static_cast <JobType> (i));
m_jobCounts [type] = Count (type);
JobTypeInfo const& jt = x.second;
// And create dynamic information for all jobs
auto const result (m_jobData.emplace (std::piecewise_construct,
std::forward_as_tuple (jt.type ()),
std::forward_as_tuple (jt, m_collector)));
assert (result.second == true);
}
}
m_loads [ jtPUBOLDLEDGER ].setTargetLatency (10000, 15000);
m_loads [ jtVALIDATION_ut ].setTargetLatency (2000, 5000);
m_loads [ jtPROOFWORK ].setTargetLatency (2000, 5000);
m_loads [ jtTRANSACTION ].setTargetLatency (250, 1000);
m_loads [ jtPROPOSAL_ut ].setTargetLatency (500, 1250);
m_loads [ jtPUBLEDGER ].setTargetLatency (3000, 4500);
m_loads [ jtWAL ].setTargetLatency (1000, 2500);
m_loads [ jtVALIDATION_t ].setTargetLatency (500, 1500);
m_loads [ jtWRITE ].setTargetLatency (1750, 2500);
m_loads [ jtTRANSACTION_l ].setTargetLatency (100, 500);
m_loads [ jtPROPOSAL_t ].setTargetLatency (100, 500);
m_loads [ jtCLIENT ].setTargetLatency (2000, 5000);
m_loads [ jtPEER ].setTargetLatency (200, 2500);
m_loads [ jtDISK ].setTargetLatency (500, 1000);
m_loads [ jtNETOP_CLUSTER ].setTargetLatency (9999, 9999); // once per 10 seconds
m_loads [ jtNETOP_TIMER ].setTargetLatency (999, 999); // once per second
}
~JobQueueImp ()
{
// Must unhook before destroying
m_stats.hook = insight::Hook ();
hook = insight::Hook ();
}
void collect ()
{
ScopedLock lock (m_mutex);
m_stats.job_count = m_jobSet.size ();
job_count = m_jobSet.size ();
}
void addJob (JobType type, std::string const& name,
boost::function <void (Job&)> const& jobFunc)
{
bassert (type != jtINVALID);
assert (type != jtINVALID);
JobDataMap::iterator iter (m_jobData.find (type));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return;
JobTypeData& data (iter->second);
// FIXME: Workaround incorrect client shutdown ordering
// do not add jobs to a queue with no threads
bassert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
assert (type == jtCLIENT || m_workers.getNumberOfThreads () > 0);
{
// If this goes off it means that a child didn't follow
@@ -225,7 +138,7 @@ public:
// * Not all children are stopped
//
ScopedLock lock (m_mutex);
bassert (! isStopped() && (
assert (! isStopped() && (
m_processCount>0 ||
! m_jobSet.empty () ||
! areChildrenStopped()));
@@ -246,7 +159,7 @@ public:
std::pair <std::set <Job>::iterator, bool> result (
m_jobSet.insert (Job (type, name, ++m_lastJob,
m_loads[type], jobFunc, m_cancelCallback)));
data.load (), jobFunc, m_cancelCallback)));
queueJob (*result.first, lock);
}
}
@@ -255,18 +168,22 @@ public:
{
ScopedLock lock (m_mutex);
MapType::const_iterator c = m_jobCounts.find (t);
JobDataMap::const_iterator c = m_jobData.find (t);
return (c == m_jobCounts.end ()) ? 0 : c->second.waiting;
return (c == m_jobData.end ())
? 0
: c->second.waiting;
}
int getJobCountTotal (JobType t)
{
ScopedLock lock (m_mutex);
MapType::const_iterator c = m_jobCounts.find (t);
JobDataMap::const_iterator c = m_jobData.find (t);
return (c == m_jobCounts.end ()) ? 0 : (c->second.waiting + c->second.running);
return (c == m_jobData.end ())
? 0
: (c->second.waiting + c->second.running);
}
int getJobCountGE (JobType t)
@@ -276,32 +193,10 @@ public:
ScopedLock lock (m_mutex);
typedef MapType::value_type jt_int_pair;
BOOST_FOREACH (jt_int_pair const& it, m_jobCounts)
for (auto const& x : m_jobData)
{
if (it.first >= t)
ret += it.second.waiting;
}
return ret;
}
std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts ()
{
// return all jobs at all priority levels
std::vector< std::pair<JobType, std::pair<int, int> > > ret;
ScopedLock lock (m_mutex);
ret.reserve (m_jobCounts.size ());
typedef MapType::value_type jt_int_pair;
BOOST_FOREACH (const jt_int_pair & it, m_jobCounts)
{
ret.push_back (std::make_pair (it.second.type,
std::make_pair (it.second.waiting, it.second.running)));
if (x.first >= t)
ret += x.second.waiting;
}
return ret;
@@ -338,7 +233,8 @@ public:
c += 2;
m_journal.info << "Auto-tuning to " << c << " validation/transaction/proposal threads";
m_journal.info << "Auto-tuning to " << c <<
" validation/transaction/proposal threads";
}
m_workers.setNumberOfThreads (c);
@@ -347,21 +243,37 @@ public:
LoadEvent::pointer getLoadEvent (JobType t, const std::string& name)
{
return boost::make_shared<LoadEvent> (boost::ref (m_loads[t]), name, true);
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return boost::shared_ptr<LoadEvent> ();
return boost::make_shared<LoadEvent> (
boost::ref (iter-> second.load ()), name, true);
}
LoadEvent::autoptr getLoadEventAP (JobType t, const std::string& name)
{
return LoadEvent::autoptr (new LoadEvent (m_loads[t], name, true));
JobDataMap::iterator iter (m_jobData.find (t));
assert (iter != m_jobData.end ());
if (iter == m_jobData.end ())
return LoadEvent::autoptr ();
return LoadEvent::autoptr (
new LoadEvent (iter-> second.load (), name, true));
}
bool isOverloaded ()
{
int count = 0;
for (int i = 0; i < NUM_JOB_TYPES; ++i)
if (m_loads[i].isOver ())
for (auto& x : m_jobData)
{
if (x.second.load ().isOver ())
++count;
}
return count > 0;
}
@@ -377,42 +289,32 @@ public:
ScopedLock lock (m_mutex);
for (int i = 0; i < NUM_JOB_TYPES; ++i)
for (auto& x : m_jobData)
{
JobType const type (static_cast <JobType> (i));
assert (x.first != jtINVALID);
if (type == jtGENERIC)
if (x.first == jtGENERIC)
continue;
LoadMonitor::Stats stats = m_loads [i].getStats ();
int jobCount;
int threadCount;
JobTypeData& data (x.second);
MapType::const_iterator it = m_jobCounts.find (type);
LoadMonitor::Stats stats (data.stats ());
int waiting (data.running);
int running (data.waiting);
if (it == m_jobCounts.end ())
{
jobCount = 0;
threadCount = 0;
}
else
{
jobCount = it->second.waiting;
threadCount = it->second.running;
}
if ((stats.count != 0) || (jobCount != 0) ||
(stats.latencyPeak != 0) || (threadCount != 0))
if ((stats.count != 0) || (waiting != 0) ||
(stats.latencyPeak != 0) || (running != 0))
{
Json::Value& pri = priorities.append (Json::objectValue);
pri["job_type"] = Job::toString (type);
pri["job_type"] = data.name ();
if (stats.isOverloaded)
pri["over_target"] = true;
if (jobCount != 0)
pri["waiting"] = jobCount;
if (waiting != 0)
pri["waiting"] = waiting;
if (stats.count != 0)
pri["per_second"] = static_cast<int> (stats.count);
@@ -423,8 +325,8 @@ public:
if (stats.latencyAvg != 0)
pri["avg_time"] = static_cast<int> (stats.latencyAvg);
if (threadCount != 0)
pri["in_progress"] = threadCount;
if (running != 0)
pri["in_progress"] = running;
}
}
@@ -434,7 +336,21 @@ public:
}
private:
//------------------------------------------------------------------------------
//--------------------------------------------------------------------------
JobTypeData& getJobTypeData (JobType type)
{
JobDataMap::iterator c (m_jobData.find (type));
assert (c != m_jobData.end ());
// NIKB: This is ugly and I hate it. We must remove jtINVALID completely
// and use something sane.
if (c == m_jobData.end ())
return m_invalidJobData;
return c->second;
}
//--------------------------------------------------------------------------
// Signals the service stopped if the stopped condition is met.
//
@@ -456,7 +372,7 @@ private:
}
}
//------------------------------------------------------------------------------
//--------------------------------------------------------------------------
//
// Signals an added Job for processing.
//
@@ -478,9 +394,9 @@ private:
assert (type != jtINVALID);
assert (m_jobSet.find (job) != m_jobSet.end ());
Count& count (m_jobCounts [type]);
JobTypeData& data (getJobTypeData (type));
if (count.waiting + count.running < getJobLimit (type))
if (data.waiting + data.running < getJobLimit (type))
{
m_workers.addTask ();
}
@@ -488,9 +404,9 @@ private:
{
// defer the task until we go below the limit
//
++count.deferred;
++data.deferred;
}
++count.waiting;
++data.waiting;
}
//------------------------------------------------------------------------------
@@ -515,35 +431,35 @@ private:
//
void getNextJob (Job& job, ScopedLock const& lock)
{
bassert (! m_jobSet.empty ());
assert (! m_jobSet.empty ());
JobSet::const_iterator iter;
for (iter = m_jobSet.begin (); iter != m_jobSet.end (); ++iter)
{
Count& count (m_jobCounts [iter->getType ()]);
JobTypeData& data (getJobTypeData (iter->getType ()));
bassert (count.running <= getJobLimit (count.type));
assert (data.running <= getJobLimit (data.type ()));
// Run this job if we're running below the limit.
if (count.running < getJobLimit (count.type))
if (data.running < getJobLimit (data.type ()))
{
bassert (count.waiting > 0);
assert (data.waiting > 0);
break;
}
}
bassert (iter != m_jobSet.end ());
assert (iter != m_jobSet.end ());
JobType const type = iter->getType ();
Count& count (m_jobCounts [type]);
JobTypeData& data (getJobTypeData (type));
bassert (type != jtINVALID);
assert (type != jtINVALID);
job = *iter;
m_jobSet.erase (iter);
--count.waiting;
++count.running;
--data.waiting;
++data.running;
}
//------------------------------------------------------------------------------
@@ -565,24 +481,45 @@ private:
{
JobType const type = job.getType ();
bassert (m_jobSet.find (job) == m_jobSet.end ());
bassert (type != jtINVALID);
assert (m_jobSet.find (job) == m_jobSet.end ());
assert (type != jtINVALID);
Count& count (m_jobCounts [type]);
JobTypeData& data (getJobTypeData (type));
// Queue a deferred task if possible
if (count.deferred > 0)
if (data.deferred > 0)
{
bassert (count.running + count.waiting >= getJobLimit (type));
assert (data.running + data.waiting >= getJobLimit (type));
--count.deferred;
--data.deferred;
m_workers.addTask ();
}
--count.running;
--data.running;
}
//------------------------------------------------------------------------------
//--------------------------------------------------------------------------
template <class Rep, class Period>
void on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
getJobTypeData (type).dequeue.notify (ms);
}
template <class Rep, class Period>
void on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value)
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
getJobTypeData (type).execute.notify (ms);
}
//--------------------------------------------------------------------------
//
// Runs the next appropriate waiting Job.
//
@@ -605,29 +542,26 @@ private:
++m_processCount;
}
JobType const type (job.getType ());
String const name (Job::toString (type));
JobTypeData& data (getJobTypeData (job.getType ()));
// Skip the job if we are stopping and the
// skipOnStop flag is set for the job type
//
if (!isStopping() || !skipOnStop (type))
if (!isStopping() || !data.info.skip ())
{
Thread::setCurrentThreadName (name);
m_journal.trace << "Doing " << name << " job";
Thread::setCurrentThreadName (data.name ());
m_journal.trace << "Doing " << data.name () << " job";
Job::clock_type::time_point const start_time (
Job::clock_type::now());
m_stats.on_dequeue (type, start_time - job.queue_time ());
on_dequeue (job.getType (), start_time - job.queue_time ());
job.doJob ();
m_stats.on_execute (type, Job::clock_type::now() - start_time);
on_execute (job.getType (), Job::clock_type::now() - start_time);
}
else
{
m_journal.trace << "Skipping processTask ('" << name << "')";
m_journal.trace << "Skipping processTask ('" << data.name () << "')";
}
{
@@ -647,109 +581,24 @@ private:
// the JobQueue receives a stop notification. If the job type isn't
// skipped, the Job will be called and the job must call Job::shouldCancel
// to determine if a long running or non-mandatory operation should be canceled.
static bool skipOnStop (JobType type)
bool skipOnStop (JobType type)
{
switch (type)
{
// These are skipped when a stop notification is received
case jtPACK:
case jtPUBOLDLEDGER:
case jtVALIDATION_ut:
case jtPROOFWORK:
case jtTRANSACTION_l:
case jtPROPOSAL_ut:
case jtLEDGER_DATA:
case jtUPDATE_PF:
case jtCLIENT:
case jtTRANSACTION:
case jtUNL:
case jtADVANCE:
case jtPUBLEDGER:
case jtTXN_DATA:
case jtVALIDATION_t:
case jtPROPOSAL_t:
case jtSWEEP:
case jtNETOP_CLUSTER:
case jtNETOP_TIMER:
case jtADMIN:
//case jtACCEPT:
return true;
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
default:
bassertfalse;
case jtWAL:
case jtWRITE:
break;
}
return false;
return j.skip ();
}
// Returns the limit of running jobs for the given job type.
// For jobs with no limit, we return the largest int. Hopefully that
// will be enough.
//
static int getJobLimit (JobType type)
int getJobLimit (JobType type)
{
int limit = std::numeric_limits <int>::max ();
JobTypeInfo const& j (getJobTypes ().get (type));
assert (j.type () != jtINVALID);
switch (type)
{
// These are not dispatched by JobQueue
case jtPEER:
case jtDISK:
case jtTXN_PROC:
case jtOB_SETUP:
case jtPATH_FIND:
case jtHO_READ:
case jtHO_WRITE:
case jtGENERIC:
limit = 0;
break;
default:
// Someone added a JobType but forgot to set a limit.
// Did they also forget to add it to Job.cpp?
bassertfalse;
break;
case jtVALIDATION_ut:
case jtPROOFWORK:
case jtTRANSACTION_l:
case jtPROPOSAL_ut:
case jtUPDATE_PF:
case jtCLIENT:
case jtRPC:
case jtTRANSACTION:
case jtPUBLEDGER:
case jtADVANCE:
case jtWAL:
case jtVALIDATION_t:
case jtWRITE:
case jtPROPOSAL_t:
case jtSWEEP:
case jtADMIN:
case jtACCEPT:
limit = std::numeric_limits <int>::max ();
break;
case jtLEDGER_DATA: limit = 2; break;
case jtPACK: limit = 1; break;
case jtPUBOLDLEDGER: limit = 2; break;
case jtTXN_DATA: limit = 1; break;
case jtUNL: limit = 1; break;
// If either of the next two are processing so slowly
// or we are so busy we have two of them at once, it
// indicates a serious problem!
//
case jtNETOP_TIMER:
case jtNETOP_CLUSTER:
limit = 1;
break;
};
return limit;
return j.limit ();
}
//--------------------------------------------------------------------------
@@ -765,8 +614,8 @@ private:
ScopedLock lock (m_mutex);
// Remove all jobs whose type is skipOnStop
typedef boost::unordered_map <JobType, std::size_t> MapType;
MapType counts;
typedef boost::unordered_map <JobType, std::size_t> JobDataMap;
JobDataMap counts;
bool const report (m_journal.debug.active());
for (JobSet::const_iterator iter (m_jobSet.begin());
@@ -776,7 +625,7 @@ private:
{
if (report)
{
std::pair <MapType::iterator, bool> result (
std::pair <JobDataMap::iterator, bool> result (
counts.insert (std::make_pair (iter->getType(), 1)));
if (! result.second)
++(result.first->second);
@@ -794,7 +643,7 @@ private:
{
Journal::ScopedStream s (m_journal.debug);
for (MapType::const_iterator iter (counts.begin());
for (JobDataMap::const_iterator iter (counts.begin());
iter != counts.end(); ++iter)
{
s << std::endl <<

View File

@@ -47,9 +47,6 @@ public:
// All waiting jobs at or greater than this priority
virtual int getJobCountGE (JobType t) = 0;
// jobs waiting, threads doing
virtual std::vector< std::pair<JobType, std::pair<int, int> > > getJobCounts () = 0;
virtual void shutdown () = 0;
virtual void setThreadCount (int c, bool const standaloneMode) = 0;

View File

@@ -0,0 +1,99 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_CORE_JOBTYPEDATA_H_INCLUDED
#define RIPPLE_CORE_JOBTYPEDATA_H_INCLUDED
#include "JobTypeInfo.h"
namespace ripple
{
struct JobTypeData
{
private:
LoadMonitor m_load;
/* Support for insight */
insight::Collector::ptr m_collector;
public:
/* The job category which we represent */
JobTypeInfo const& info;
/* The number of jobs waiting */
int waiting;
/* The number presently running */
int running;
/* And the number we deferred executing because of job limits */
int deferred;
/* Notification callbacks */
insight::Event dequeue;
insight::Event execute;
explicit JobTypeData (JobTypeInfo const& info_,
insight::Collector::ptr const& collector) noexcept
: m_collector (collector)
, info (info_)
, waiting (0)
, running (0)
, deferred (0)
{
m_load.setTargetLatency (
info.getAverageLatency (),
info.getPeakLatency());
if (!info.special ())
{
dequeue = m_collector->make_event (info.name () + "_q");
execute = m_collector->make_event (info.name ());
}
}
/* Not copy-constructible or assignable */
JobTypeData (JobTypeData const& other) = delete;
JobTypeData& operator= (JobTypeData const& other) = delete;
std::string name () const
{
return info.name ();
}
JobType type () const
{
return info.type ();
}
LoadMonitor& load ()
{
return m_load;
}
LoadMonitor::Stats stats ()
{
return m_load.getStats ();
}
};
}
#endif

View File

@@ -0,0 +1,101 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_CORE_JOBTYPEINFO_H_INCLUDED
#define RIPPLE_CORE_JOBTYPEINFO_H_INCLUDED
namespace ripple
{
/** Holds all the 'static' information about a job, which does not change */
class JobTypeInfo
{
private:
JobType const m_type;
std::string const m_name;
/** The limit on the number of running jobs for this job type. */
int const m_limit;
/** Can be skipped */
bool const m_skip;
/** Special jobs are not dispatched via the job queue */
bool const m_special;
/** Average and peak latencies for this job type. 0 is none specified */
uint64 const m_avgLatency;
uint64 const m_peakLatency;
public:
// Not default constructible
JobTypeInfo () = delete;
JobTypeInfo (JobType type, std::string name, int limit,
bool skip, bool special, uint64 avgLatency, uint64 peakLatency)
: m_type (type)
, m_name (name)
, m_limit (limit)
, m_skip (skip)
, m_special (special)
, m_avgLatency (avgLatency)
, m_peakLatency (peakLatency)
{
}
JobType type () const
{
return m_type;
}
std::string name () const
{
return m_name;
}
int limit () const
{
return m_limit;
}
bool skip () const
{
return m_skip;
}
bool special () const
{
return m_special;
}
uint64 getAverageLatency () const
{
return m_avgLatency;
}
uint64 getPeakLatency () const
{
return m_peakLatency;
}
};
}
#endif

View File

@@ -0,0 +1,218 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include "Job.h"
#include "JobTypeInfo.h"
namespace ripple
{
class JobTypes
{
public:
typedef std::map <JobType, JobTypeInfo> Map;
typedef Map::const_iterator const_iterator;
JobTypes ()
: m_unknown (jtINVALID, "invalid", 0, true, true, 0, 0)
{
int maxLimit = std::numeric_limits <int>::max ();
// Make a fetch pack for a peer
add (jtPACK, "makeFetchPack",
1, true, false, 0, 0);
// An old ledger has been accepted
add (jtPUBOLDLEDGER, "publishAcqLedger",
2, true, false, 10000, 15000);
// A validation from an untrusted source
add (jtVALIDATION_ut, "untrustedValidation",
maxLimit, true, false, 2000, 5000);
// A proof of work demand from another server
add (jtPROOFWORK, "proofOfWork",
maxLimit, true, false, 2000, 5000);
// A local transaction
add (jtTRANSACTION_l, "localTransaction",
maxLimit, true, false, 100, 500);
// A proposal from an untrusted source
add (jtPROPOSAL_ut, "untrustedProposal",
maxLimit, true, false, 500, 1250);
// Received data for a ledger we're acquiring
add (jtLEDGER_DATA, "ledgerData",
2, true, false, 0, 0);
// Update pathfinding requests
add (jtUPDATE_PF, "updatePaths",
maxLimit, true, false, 0, 0);
// A websocket command from the client
add (jtCLIENT, "clientCommand",
maxLimit, true, false, 2000, 5000);
// A websocket command from the client
add (jtRPC, "RPC",
maxLimit, false, false, 0, 0);
// A transaction received from the network
add (jtTRANSACTION, "transaction",
maxLimit, true, false, 250, 1000);
// A Score or Fetch of the UNL (DEPRECATED)
add (jtUNL, "unl",
1, true, false, 0, 0);
// Advance validated/acquired ledgers
add (jtADVANCE, "advanceLedger",
maxLimit, true, false, 0, 0);
// Publish a fully-accepted ledger
add (jtPUBLEDGER, "publishNewLedger",
maxLimit, true, false, 3000, 4500);
// Fetch a proposed set
add (jtTXN_DATA, "fetchTxnData",
1, true, false, 0, 0);
// Write-ahead logging
add (jtWAL, "writeAhead",
maxLimit, false, false, 1000, 2500);
// A validation from a trusted source
add (jtVALIDATION_t, "trustedValidation",
maxLimit, true, false, 500, 1500);
// Write out hashed objects
add (jtWRITE, "writeObjects",
maxLimit, false, false, 1750, 2500);
// Accept a consensus ledger
add (jtACCEPT, "acceptLedger",
maxLimit, false, false, 0, 0);
// A proposal from a trusted source
add (jtPROPOSAL_t, "trustedProposal",
maxLimit, false, false, 100, 500);
// Sweep for stale structures
add (jtSWEEP, "sweep",
maxLimit, true, false, 0, 0);
// NetworkOPs cluster peer report
add (jtNETOP_CLUSTER, "clusterReport",
1, true, false, 9999, 9999);
// NetworkOPs net timer processing
add (jtNETOP_TIMER, "heartbeat",
1, true, false, 999, 999);
// An administrative operation
add (jtADMIN, "administration",
maxLimit, true, false, 0, 0);
// The rest are special job types that are not dispatched
// by the job pool. The "limit" and "skip" attributes are
// not applicable to these types of jobs.
add (jtPEER, "peerCommand",
0, false, true, 200, 2500);
add (jtDISK, "diskAccess",
0, false, true, 500, 1000);
add (jtTXN_PROC, "processTransaction",
0, false, true, 0, 0);
add (jtOB_SETUP, "orderBookSetup",
0, false, true, 0, 0);
add (jtPATH_FIND, "pathFind",
0, false, true, 0, 0);
add (jtHO_READ, "nodeRead",
0, false, true, 0, 0);
add (jtHO_WRITE, "nodeWrite",
0, false, true, 0, 0);
add (jtGENERIC, "generic",
0, false, true, 0, 0);
}
JobTypeInfo const& get (JobType jt) const
{
Map::const_iterator const iter (m_map.find (jt));
assert (iter != m_map.end ());
if (iter != m_map.end())
return iter->second;
return m_unknown;
}
JobTypeInfo const& getInvalid () const
{
return m_unknown;
}
const_iterator begin () const
{
return m_map.cbegin ();
}
const_iterator cbegin () const
{
return m_map.cbegin ();
}
const_iterator end () const
{
return m_map.cend ();
}
const_iterator cend () const
{
return m_map.cend ();
}
private:
void add(JobType jt, std::string name, int limit,
bool skip, bool special, uint64 avgLatency, uint64 peakLatency)
{
assert (m_map.find (jt) == m_map.end ());
std::pair<Map::iterator,bool> result (m_map.emplace (
std::piecewise_construct,
std::forward_as_tuple (jt),
std::forward_as_tuple (jt, name, limit, skip, special,
avgLatency, peakLatency)));
assert (result.second == true);
}
JobTypeInfo m_unknown;
Map m_map;
};
}