diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index 40d1f93eb..2ca890cfe 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -184,7 +184,8 @@ public: void operator() (Duration const& elapsed) const { auto ms (ceil (elapsed)); - latency.notify (ms); + if (ms.count() >= 10) + latency.notify (ms); if (ms.count() >= 500) journal.warning << "io_service latency = " << ms; @@ -235,9 +236,8 @@ public: // The JobQueue has to come pretty early since // almost everything is a Stoppable child of the JobQueue. // - , m_jobQueue (JobQueue::New ( - m_collectorManager->collector (), - m_nodeStoreScheduler, LogPartition::getJournal ())) + , m_jobQueue (make_JobQueue (m_collectorManager->group ("jobq"), + m_nodeStoreScheduler, LogPartition::getJournal ())) // The io_service must be a child of the JobQueue since we call addJob // in response to newtwork data from peers and also client requests. diff --git a/src/ripple_app/main/CollectorManager.cpp b/src/ripple_app/main/CollectorManager.cpp index 0d3919b3a..611ee6cb5 100644 --- a/src/ripple_app/main/CollectorManager.cpp +++ b/src/ripple_app/main/CollectorManager.cpp @@ -25,6 +25,7 @@ class CollectorManagerImp public: Journal m_journal; insight::Collector::ptr m_collector; + std::unique_ptr m_groups; CollectorManagerImp (StringPairArray const& params, Journal journal) @@ -44,6 +45,8 @@ public: { m_collector = insight::NullCollector::New (); } + + m_groups = insight::make_Groups (m_collector); } ~CollectorManagerImp () @@ -54,6 +57,11 @@ public: { return m_collector; } + + insight::Group::ptr const& group (std::string const& name) + { + return m_groups->get (name); + } }; //------------------------------------------------------------------------------ diff --git a/src/ripple_app/main/CollectorManager.h b/src/ripple_app/main/CollectorManager.h index 5e215185d..5afb3ab32 100644 --- a/src/ripple_app/main/CollectorManager.h +++ b/src/ripple_app/main/CollectorManager.h @@ -32,6 +32,7 @@ public: Journal journal); virtual ~CollectorManager () = 0; virtual insight::Collector::ptr const& collector () = 0; + virtual insight::Group::ptr const& group (std::string const& name) = 0; }; } diff --git a/src/ripple_core/functional/Job.cpp b/src/ripple_core/functional/Job.cpp index 1465e5c80..47e8c3bb4 100644 --- a/src/ripple_core/functional/Job.cpp +++ b/src/ripple_core/functional/Job.cpp @@ -17,6 +17,8 @@ */ //============================================================================== +namespace ripple { + Job::Job () : mType (jtINVALID) , mJobIndex (0) @@ -29,6 +31,7 @@ Job::Job (JobType type, uint64 index) { } +#if 0 Job::Job (Job const& other) : m_cancelCallback (other.m_cancelCallback) , mType (other.mType) @@ -36,8 +39,10 @@ Job::Job (Job const& other) , mJob (other.mJob) , m_loadEvent (other.m_loadEvent) , mName (other.mName) + , m_queue_time (other.m_queue_time) { } +#endif Job::Job (JobType type, std::string const& name, @@ -50,10 +55,12 @@ Job::Job (JobType type, , mJobIndex (index) , mJob (job) , mName (name) + , m_queue_time (clock_type::now ()) { m_loadEvent = boost::make_shared (boost::ref (lm), name, false); } +/* Job& Job::operator= (Job const& other) { mType = other.mType; @@ -64,6 +71,7 @@ Job& Job::operator= (Job const& other) m_cancelCallback = other.m_cancelCallback; return *this; } +*/ JobType Job::getType () const { @@ -76,6 +84,11 @@ CancelCallback Job::getCancelCallback () const return m_cancelCallback; } +Job::clock_type::time_point const& Job::queue_time () const +{ + return m_queue_time; +} + bool Job::shouldCancel () const { if (! m_cancelCallback.empty ()) @@ -185,3 +198,5 @@ bool Job::operator<= (const Job& j) const return mJobIndex <= j.mJobIndex; } + +} diff --git a/src/ripple_core/functional/Job.h b/src/ripple_core/functional/Job.h index 3a9041602..0c817a8e7 100644 --- a/src/ripple_core/functional/Job.h +++ b/src/ripple_core/functional/Job.h @@ -20,6 +20,8 @@ #ifndef RIPPLE_JOB_H #define RIPPLE_JOB_H +namespace ripple { + // Note that this queue should only be used for CPU-bound jobs // It is primarily intended for signature checking enum JobType @@ -68,6 +70,8 @@ enum JobType class Job { public: + typedef std::chrono::steady_clock clock_type; + /** Default constructor. Allows Job to be used as a container type. @@ -81,7 +85,7 @@ public: // Job (); - Job (Job const& other); + //Job (Job const& other); Job (JobType type, uint64 index); @@ -93,12 +97,15 @@ public: std::function const& job, CancelCallback cancelCallback); - Job& operator= (Job const& other); + //Job& operator= (Job const& other); JobType getType () const; CancelCallback getCancelCallback () const; + /** Returns the time when the job was queued. */ + clock_type::time_point const& queue_time () const; + /** Returns `true` if the running job should make a best-effort cancel. */ bool shouldCancel () const; @@ -121,6 +128,9 @@ private: std::function mJob; LoadEvent::pointer m_loadEvent; std::string mName; + clock_type::time_point m_queue_time; }; +} + #endif diff --git a/src/ripple_core/functional/JobQueue.cpp b/src/ripple_core/functional/JobQueue.cpp index 1be4a6fcf..7f2e44643 100644 --- a/src/ripple_core/functional/JobQueue.cpp +++ b/src/ripple_core/functional/JobQueue.cpp @@ -17,19 +17,90 @@ */ //============================================================================== +#include "JobQueue.h" + +#include "beast/beast/make_unique.h" +#include "beast/beast/chrono/chrono_util.h" + +#include + +namespace ripple { + class JobQueueImp : public JobQueue , private Workers::Callback { public: - struct Metrics + struct Stats { insight::Hook hook; insight::Gauge job_count; + + // VFALCO TODO should enumerate the map of jobtypes instead + explicit Stats (insight::Collector::ptr const& collector) + : m_collector (collector) + { + add (jtPACK , "make_pack"); + add (jtPUBOLDLEDGER , "pub_oldledgx"); + add (jtVALIDATION_ut, "ut_validx"); + add (jtPROOFWORK , "proof_of_work"); + add (jtTRANSACTION_l, "local_tx"); + add (jtPROPOSAL_ut , "ut_proposal"); + add (jtLEDGER_DATA , "ledgx_data"); + add (jtUPDATE_PF , "upd_paths"); + add (jtCLIENT , "client_cmd"); + add (jtRPC , "rpc_cmd"); + add (jtTRANSACTION , "recv_tx"); + add (jtUNL , "unl_op"); + add (jtADVANCE , "next_ledgx"); + add (jtPUBLEDGER , "pub_ledgx"); + add (jtTXN_DATA , "tx_data"); + add (jtWAL , "wal"); + add (jtVALIDATION_t , "t_validx"); + add (jtWRITE , "write"); + add (jtACCEPT , "accept_ledgx"); + add (jtPROPOSAL_t , "t_prop"); + add (jtSWEEP , "sweep"); + add (jtNETOP_CLUSTER, "netop_clust"); + add (jtNETOP_TIMER , "netop_heart"); + add (jtADMIN , "admin"); + } + + template + void on_dequeue (JobType type, + std::chrono::duration const& value) const + { + auto const ms (ceil (value)); + if (ms.count() >= 10) + m_dequeue.find (type)->second.notify (ms); + } + + template + void on_execute (JobType type, + std::chrono::duration const& value) const + { + auto const ms (ceil (value)); + if (ms.count() >= 10) + m_execute.find (type)->second.notify (ms); + } + + private: + void add (JobType type, std::string const& label) + { + m_dequeue.emplace (type, m_collector->make_event (label + "_q")); + m_execute.emplace (type, m_collector->make_event (label)); + } + + typedef std::unordered_map > JobEvents; + + insight::Collector::ptr m_collector; + JobEvents m_dequeue; + JobEvents m_execute; }; - // Statistics for each JobType - // + //-------------------------------------------------------------------------- + struct Count { Count () noexcept @@ -59,7 +130,7 @@ public: typedef CriticalSection::ScopedLockType ScopedLock; Journal m_journal; - Metrics m_metrics; + Stats m_stats; CriticalSection m_mutex; uint64 m_lastJob; JobSet m_jobSet; @@ -78,14 +149,15 @@ public: Stoppable& parent, Journal journal) : JobQueue ("JobQueue", parent) , m_journal (journal) + , m_stats (collector) , m_lastJob (0) , m_processCount (0) , m_workers (*this, "JobQueue", 0) , m_cancelCallback (boost::bind (&Stoppable::isStopping, this)) { - m_metrics.hook = collector->make_hook (std::bind ( + m_stats.hook = collector->make_hook (std::bind ( &JobQueueImp::collect, this)); - m_metrics.job_count = collector->make_gauge ("job_count"); + m_stats.job_count = collector->make_gauge ("job_count"); { ScopedLock lock (m_mutex); @@ -122,13 +194,13 @@ public: ~JobQueueImp () { // Must unhook before destroying - m_metrics.hook = insight::Hook (); + m_stats.hook = insight::Hook (); } void collect () { ScopedLock lock (m_mutex); - m_metrics.job_count = m_jobSet.size (); + m_stats.job_count = m_jobSet.size (); } void addJob (JobType type, std::string const& name, @@ -172,11 +244,10 @@ public: { ScopedLock lock (m_mutex); - std::pair< std::set ::iterator, bool > it = - m_jobSet.insert (Job ( - type, name, ++m_lastJob, m_loads[type], jobFunc, m_cancelCallback)); - - queueJob (*it.first, lock); + std::pair ::iterator, bool> result ( + m_jobSet.insert (Job (type, name, ++m_lastJob, + m_loads[type], jobFunc, m_cancelCallback))); + queueJob (*result.first, lock); } } @@ -404,9 +475,8 @@ private: void queueJob (Job const& job, ScopedLock const& lock) { JobType const type (job.getType ()); - - bassert (type != jtINVALID); - bassert (m_jobSet.find (job) != m_jobSet.end ()); + assert (type != jtINVALID); + assert (m_jobSet.find (job) != m_jobSet.end ()); Count& count (m_jobCounts [type]); @@ -545,7 +615,15 @@ private: { Thread::setCurrentThreadName (name); m_journal.trace << "Doing " << name << " job"; + + Job::clock_type::time_point const start_time ( + Job::clock_type::now()); + + m_stats.on_dequeue (type, start_time - job.queue_time ()); + job.doJob (); + + m_stats.on_execute (type, Job::clock_type::now() - start_time); } else { @@ -745,8 +823,11 @@ JobQueue::JobQueue (char const* name, Stoppable& parent) //------------------------------------------------------------------------------ -JobQueue* JobQueue::New (insight::Collector::ptr const& collector, - Stoppable& parent, Journal journal) +std::unique_ptr make_JobQueue ( + insight::Collector::ptr const& collector, + Stoppable& parent, Journal journal) { - return new JobQueueImp (collector, parent, journal); + return std::make_unique (collector, parent, journal); +} + } diff --git a/src/ripple_core/functional/JobQueue.h b/src/ripple_core/functional/JobQueue.h index c6a6a5b3b..0fc64b19d 100644 --- a/src/ripple_core/functional/JobQueue.h +++ b/src/ripple_core/functional/JobQueue.h @@ -20,15 +20,14 @@ #ifndef RIPPLE_CORE_JOBQUEUE_H_INCLUDED #define RIPPLE_CORE_JOBQUEUE_H_INCLUDED +namespace ripple { + class JobQueue : public Stoppable { protected: JobQueue (char const* name, Stoppable& parent); public: - static JobQueue* New (insight::Collector::ptr const& collector, - Stoppable& parent, Journal journal); - virtual ~JobQueue () { } // VFALCO NOTE Using boost::function here because Visual Studio 2012 @@ -70,4 +69,9 @@ public: virtual Json::Value getJson (int c = 0) = 0; }; +std::unique_ptr make_JobQueue (insight::Collector::ptr const& collector, + Stoppable& parent, Journal journal); + +} + #endif diff --git a/src/ripple_core/ripple_core.cpp b/src/ripple_core/ripple_core.cpp index d93255385..a6df5b06c 100644 --- a/src/ripple_core/ripple_core.cpp +++ b/src/ripple_core/ripple_core.cpp @@ -33,15 +33,13 @@ #include "../ripple_net/ripple_net.h" // for HTTPClient -namespace ripple -{ - +namespace ripple { #include "functional/Config.cpp" # include "functional/LoadFeeTrackImp.h" // private #include "functional/LoadFeeTrackImp.cpp" -#include "functional/Job.cpp" -#include "functional/JobQueue.cpp" #include "functional/LoadEvent.cpp" #include "functional/LoadMonitor.cpp" - } + +#include "functional/Job.cpp" +#include "functional/JobQueue.cpp" diff --git a/src/ripple_core/ripple_core.h b/src/ripple_core/ripple_core.h index d6c8109d0..5606bbf8b 100644 --- a/src/ripple_core/ripple_core.h +++ b/src/ripple_core/ripple_core.h @@ -30,19 +30,16 @@ #include "nodestore/NodeStore.h" -namespace ripple -{ - +namespace ripple { // Order matters - # include "functional/ConfigSections.h" #include "functional/Config.h" #include "functional/LoadFeeTrack.h" # include "functional/LoadEvent.h" # include "functional/LoadMonitor.h" +} + # include "functional/Job.h" #include "functional/JobQueue.h" -} - #endif