Insight support for jobs:

* Add insight Groups to Application singleton
* Put JobQueue metrics into "jobq" Group
* Add queued time to Job
* Add per-type Job queue time metrics
* Add per-type Job execution time metrics
* Break JobQueue sources out of the namespace
* Use free function to create the JobQueue
This commit is contained in:
Vinnie Falco
2014-02-08 09:33:30 -08:00
parent a681a4fcd4
commit f469e3853d
9 changed files with 154 additions and 40 deletions

View File

@@ -184,7 +184,8 @@ public:
void operator() (Duration const& elapsed) const
{
auto ms (ceil <std::chrono::milliseconds> (elapsed));
latency.notify (ms);
if (ms.count() >= 10)
latency.notify (ms);
if (ms.count() >= 500)
journal.warning <<
"io_service latency = " << ms;
@@ -235,9 +236,8 @@ public:
// The JobQueue has to come pretty early since
// almost everything is a Stoppable child of the JobQueue.
//
, m_jobQueue (JobQueue::New (
m_collectorManager->collector (),
m_nodeStoreScheduler, LogPartition::getJournal <JobQueueLog> ()))
, m_jobQueue (make_JobQueue (m_collectorManager->group ("jobq"),
m_nodeStoreScheduler, LogPartition::getJournal <JobQueueLog> ()))
// The io_service must be a child of the JobQueue since we call addJob
// in response to newtwork data from peers and also client requests.

View File

@@ -25,6 +25,7 @@ class CollectorManagerImp
public:
Journal m_journal;
insight::Collector::ptr m_collector;
std::unique_ptr <insight::Groups> m_groups;
CollectorManagerImp (StringPairArray const& params,
Journal journal)
@@ -44,6 +45,8 @@ public:
{
m_collector = insight::NullCollector::New ();
}
m_groups = insight::make_Groups (m_collector);
}
~CollectorManagerImp ()
@@ -54,6 +57,11 @@ public:
{
return m_collector;
}
insight::Group::ptr const& group (std::string const& name)
{
return m_groups->get (name);
}
};
//------------------------------------------------------------------------------

View File

@@ -32,6 +32,7 @@ public:
Journal journal);
virtual ~CollectorManager () = 0;
virtual insight::Collector::ptr const& collector () = 0;
virtual insight::Group::ptr const& group (std::string const& name) = 0;
};
}

View File

@@ -17,6 +17,8 @@
*/
//==============================================================================
namespace ripple {
Job::Job ()
: mType (jtINVALID)
, mJobIndex (0)
@@ -29,6 +31,7 @@ Job::Job (JobType type, uint64 index)
{
}
#if 0
Job::Job (Job const& other)
: m_cancelCallback (other.m_cancelCallback)
, mType (other.mType)
@@ -36,8 +39,10 @@ Job::Job (Job const& other)
, mJob (other.mJob)
, m_loadEvent (other.m_loadEvent)
, mName (other.mName)
, m_queue_time (other.m_queue_time)
{
}
#endif
Job::Job (JobType type,
std::string const& name,
@@ -50,10 +55,12 @@ Job::Job (JobType type,
, mJobIndex (index)
, mJob (job)
, mName (name)
, m_queue_time (clock_type::now ())
{
m_loadEvent = boost::make_shared <LoadEvent> (boost::ref (lm), name, false);
}
/*
Job& Job::operator= (Job const& other)
{
mType = other.mType;
@@ -64,6 +71,7 @@ Job& Job::operator= (Job const& other)
m_cancelCallback = other.m_cancelCallback;
return *this;
}
*/
JobType Job::getType () const
{
@@ -76,6 +84,11 @@ CancelCallback Job::getCancelCallback () const
return m_cancelCallback;
}
Job::clock_type::time_point const& Job::queue_time () const
{
return m_queue_time;
}
bool Job::shouldCancel () const
{
if (! m_cancelCallback.empty ())
@@ -185,3 +198,5 @@ bool Job::operator<= (const Job& j) const
return mJobIndex <= j.mJobIndex;
}
}

View File

@@ -20,6 +20,8 @@
#ifndef RIPPLE_JOB_H
#define RIPPLE_JOB_H
namespace ripple {
// Note that this queue should only be used for CPU-bound jobs
// It is primarily intended for signature checking
enum JobType
@@ -68,6 +70,8 @@ enum JobType
class Job
{
public:
typedef std::chrono::steady_clock clock_type;
/** Default constructor.
Allows Job to be used as a container type.
@@ -81,7 +85,7 @@ public:
//
Job ();
Job (Job const& other);
//Job (Job const& other);
Job (JobType type, uint64 index);
@@ -93,12 +97,15 @@ public:
std::function <void (Job&)> const& job,
CancelCallback cancelCallback);
Job& operator= (Job const& other);
//Job& operator= (Job const& other);
JobType getType () const;
CancelCallback getCancelCallback () const;
/** Returns the time when the job was queued. */
clock_type::time_point const& queue_time () const;
/** Returns `true` if the running job should make a best-effort cancel. */
bool shouldCancel () const;
@@ -121,6 +128,9 @@ private:
std::function <void (Job&)> mJob;
LoadEvent::pointer m_loadEvent;
std::string mName;
clock_type::time_point m_queue_time;
};
}
#endif

View File

@@ -17,19 +17,90 @@
*/
//==============================================================================
#include "JobQueue.h"
#include "beast/beast/make_unique.h"
#include "beast/beast/chrono/chrono_util.h"
#include <chrono>
namespace ripple {
class JobQueueImp
: public JobQueue
, private Workers::Callback
{
public:
struct Metrics
struct Stats
{
insight::Hook hook;
insight::Gauge job_count;
// VFALCO TODO should enumerate the map of jobtypes instead
explicit Stats (insight::Collector::ptr const& collector)
: m_collector (collector)
{
add (jtPACK , "make_pack");
add (jtPUBOLDLEDGER , "pub_oldledgx");
add (jtVALIDATION_ut, "ut_validx");
add (jtPROOFWORK , "proof_of_work");
add (jtTRANSACTION_l, "local_tx");
add (jtPROPOSAL_ut , "ut_proposal");
add (jtLEDGER_DATA , "ledgx_data");
add (jtUPDATE_PF , "upd_paths");
add (jtCLIENT , "client_cmd");
add (jtRPC , "rpc_cmd");
add (jtTRANSACTION , "recv_tx");
add (jtUNL , "unl_op");
add (jtADVANCE , "next_ledgx");
add (jtPUBLEDGER , "pub_ledgx");
add (jtTXN_DATA , "tx_data");
add (jtWAL , "wal");
add (jtVALIDATION_t , "t_validx");
add (jtWRITE , "write");
add (jtACCEPT , "accept_ledgx");
add (jtPROPOSAL_t , "t_prop");
add (jtSWEEP , "sweep");
add (jtNETOP_CLUSTER, "netop_clust");
add (jtNETOP_TIMER , "netop_heart");
add (jtADMIN , "admin");
}
template <class Rep, class Period>
void on_dequeue (JobType type,
std::chrono::duration <Rep, Period> const& value) const
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
m_dequeue.find (type)->second.notify (ms);
}
template <class Rep, class Period>
void on_execute (JobType type,
std::chrono::duration <Rep, Period> const& value) const
{
auto const ms (ceil <std::chrono::milliseconds> (value));
if (ms.count() >= 10)
m_execute.find (type)->second.notify (ms);
}
private:
void add (JobType type, std::string const& label)
{
m_dequeue.emplace (type, m_collector->make_event (label + "_q"));
m_execute.emplace (type, m_collector->make_event (label));
}
typedef std::unordered_map <JobType, insight::Event,
std::hash <int>> JobEvents;
insight::Collector::ptr m_collector;
JobEvents m_dequeue;
JobEvents m_execute;
};
// Statistics for each JobType
//
//--------------------------------------------------------------------------
struct Count
{
Count () noexcept
@@ -59,7 +130,7 @@ public:
typedef CriticalSection::ScopedLockType ScopedLock;
Journal m_journal;
Metrics m_metrics;
Stats m_stats;
CriticalSection m_mutex;
uint64 m_lastJob;
JobSet m_jobSet;
@@ -78,14 +149,15 @@ public:
Stoppable& parent, Journal journal)
: JobQueue ("JobQueue", parent)
, m_journal (journal)
, m_stats (collector)
, m_lastJob (0)
, m_processCount (0)
, m_workers (*this, "JobQueue", 0)
, m_cancelCallback (boost::bind (&Stoppable::isStopping, this))
{
m_metrics.hook = collector->make_hook (std::bind (
m_stats.hook = collector->make_hook (std::bind (
&JobQueueImp::collect, this));
m_metrics.job_count = collector->make_gauge ("job_count");
m_stats.job_count = collector->make_gauge ("job_count");
{
ScopedLock lock (m_mutex);
@@ -122,13 +194,13 @@ public:
~JobQueueImp ()
{
// Must unhook before destroying
m_metrics.hook = insight::Hook ();
m_stats.hook = insight::Hook ();
}
void collect ()
{
ScopedLock lock (m_mutex);
m_metrics.job_count = m_jobSet.size ();
m_stats.job_count = m_jobSet.size ();
}
void addJob (JobType type, std::string const& name,
@@ -172,11 +244,10 @@ public:
{
ScopedLock lock (m_mutex);
std::pair< std::set <Job>::iterator, bool > it =
m_jobSet.insert (Job (
type, name, ++m_lastJob, m_loads[type], jobFunc, m_cancelCallback));
queueJob (*it.first, lock);
std::pair <std::set <Job>::iterator, bool> result (
m_jobSet.insert (Job (type, name, ++m_lastJob,
m_loads[type], jobFunc, m_cancelCallback)));
queueJob (*result.first, lock);
}
}
@@ -404,9 +475,8 @@ private:
void queueJob (Job const& job, ScopedLock const& lock)
{
JobType const type (job.getType ());
bassert (type != jtINVALID);
bassert (m_jobSet.find (job) != m_jobSet.end ());
assert (type != jtINVALID);
assert (m_jobSet.find (job) != m_jobSet.end ());
Count& count (m_jobCounts [type]);
@@ -545,7 +615,15 @@ private:
{
Thread::setCurrentThreadName (name);
m_journal.trace << "Doing " << name << " job";
Job::clock_type::time_point const start_time (
Job::clock_type::now());
m_stats.on_dequeue (type, start_time - job.queue_time ());
job.doJob ();
m_stats.on_execute (type, Job::clock_type::now() - start_time);
}
else
{
@@ -745,8 +823,11 @@ JobQueue::JobQueue (char const* name, Stoppable& parent)
//------------------------------------------------------------------------------
JobQueue* JobQueue::New (insight::Collector::ptr const& collector,
Stoppable& parent, Journal journal)
std::unique_ptr <JobQueue> make_JobQueue (
insight::Collector::ptr const& collector,
Stoppable& parent, Journal journal)
{
return new JobQueueImp (collector, parent, journal);
return std::make_unique <JobQueueImp> (collector, parent, journal);
}
}

View File

@@ -20,15 +20,14 @@
#ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED
#define RIPPLE_CORE_JOBQUEUE_H_INCLUDED
namespace ripple {
class JobQueue : public Stoppable
{
protected:
JobQueue (char const* name, Stoppable& parent);
public:
static JobQueue* New (insight::Collector::ptr const& collector,
Stoppable& parent, Journal journal);
virtual ~JobQueue () { }
// VFALCO NOTE Using boost::function here because Visual Studio 2012
@@ -70,4 +69,9 @@ public:
virtual Json::Value getJson (int c = 0) = 0;
};
std::unique_ptr <JobQueue> make_JobQueue (insight::Collector::ptr const& collector,
Stoppable& parent, Journal journal);
}
#endif

View File

@@ -33,15 +33,13 @@
#include "../ripple_net/ripple_net.h" // for HTTPClient
namespace ripple
{
namespace ripple {
#include "functional/Config.cpp"
# include "functional/LoadFeeTrackImp.h" // private
#include "functional/LoadFeeTrackImp.cpp"
#include "functional/Job.cpp"
#include "functional/JobQueue.cpp"
#include "functional/LoadEvent.cpp"
#include "functional/LoadMonitor.cpp"
}
#include "functional/Job.cpp"
#include "functional/JobQueue.cpp"

View File

@@ -30,19 +30,16 @@
#include "nodestore/NodeStore.h"
namespace ripple
{
namespace ripple {
// Order matters
# include "functional/ConfigSections.h"
#include "functional/Config.h"
#include "functional/LoadFeeTrack.h"
# include "functional/LoadEvent.h"
# include "functional/LoadMonitor.h"
}
# include "functional/Job.h"
#include "functional/JobQueue.h"
}
#endif