From 4d5df92cbc1129453d7c3176b870a4f3e1f9e278 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Fri, 10 Jan 2014 22:09:25 -0800 Subject: [PATCH] Add io_latency_probe --- .../Builds/VisualStudio2012/beast.vcxproj | 1 + .../VisualStudio2012/beast.vcxproj.filters | 3 + src/beast/beast/asio/io_latency_probe.h | 245 ++++++++++++++++++ src/ripple_app/main/Application.cpp | 176 ++++++++----- src/ripple_app/ripple_app.cpp | 1 + 5 files changed, 363 insertions(+), 63 deletions(-) create mode 100644 src/beast/beast/asio/io_latency_probe.h diff --git a/src/beast/Builds/VisualStudio2012/beast.vcxproj b/src/beast/Builds/VisualStudio2012/beast.vcxproj index e9a17f387f..ab81dafc4d 100644 --- a/src/beast/Builds/VisualStudio2012/beast.vcxproj +++ b/src/beast/Builds/VisualStudio2012/beast.vcxproj @@ -83,6 +83,7 @@ + diff --git a/src/beast/Builds/VisualStudio2012/beast.vcxproj.filters b/src/beast/Builds/VisualStudio2012/beast.vcxproj.filters index c1717f4456..97808dbe02 100644 --- a/src/beast/Builds/VisualStudio2012/beast.vcxproj.filters +++ b/src/beast/Builds/VisualStudio2012/beast.vcxproj.filters @@ -1242,6 +1242,9 @@ beast\utility + + beast\asio + diff --git a/src/beast/beast/asio/io_latency_probe.h b/src/beast/beast/asio/io_latency_probe.h new file mode 100644 index 0000000000..419b60ebde --- /dev/null +++ b/src/beast/beast/asio/io_latency_probe.h @@ -0,0 +1,245 @@ +//------------------------------------------------------------------------------ +/* + This file is part of Beast: https://github.com/vinniefalco/Beast + Copyright 2013, Vinnie Falco + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED +#define BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED + +#include +#include +#include +#include + +#include +#include +#include + +namespace beast { + +/** Measures handler latency on an io_service queue. */ +template +class io_latency_probe +{ +private: + typedef typename Clock::duration duration; + typedef typename Clock::time_point time_point; + + std::recursive_mutex m_mutex; + std::condition_variable_any m_cond; + std::size_t m_count; + duration const m_period; + boost::asio::io_service& m_ios; + boost::asio::deadline_timer m_timer; + bool m_cancel; + +public: + io_latency_probe (duration const& period, + boost::asio::io_service& ios) + : m_count (1) + , m_period (period) + , m_ios (ios) + , m_timer (m_ios) + , m_cancel (false) + { + } + + ~io_latency_probe () + { + std::unique_lock lock (m_mutex); + cancel (lock, true); + } + + /** Return the io_service associated with the latency probe. */ + /** @{ */ + boost::asio::io_service& get_io_service () + { + return m_ios; + } + + boost::asio::io_service const& get_io_service () const + { + return m_ios; + } + /** @} */ + + /** Cancel all pending i/o. + Any handlers which have already been queued will still be called. + */ + /** @{ */ + void cancel () + { + std::unique_lock lock (m_mutex); + cancel (lock, true); + } + + void cancel_async () + { + std::unique_lock lock (m_mutex); + cancel (lock, false); + } + /** @} */ + + /** Measure one sample of i/o latency. + Handler will be called with this signature: + void Handler (Duration d); + */ + template + void sample_one (Handler&& handler) + { + std::lock_guard lock (m_mutex); + if (m_cancel) + throw std::logic_error ("io_latency_probe is canceled"); + m_ios.post (sample_op ( + std::forward (handler), + Clock::now(), false, this)); + } + + /** Initiate continuous i/o latency sampling. + Handler will be called with this signature: + void Handler (std::chrono::milliseconds); + */ + template + void sample (Handler&& handler) + { + std::lock_guard lock (m_mutex); + if (m_cancel) + throw std::logic_error ("io_latency_probe is canceled"); + m_ios.post (sample_op ( + std::forward (handler), + Clock::now(), true, this)); + } + +private: + void cancel (std::unique_lock & lock, + bool wait) + { + if (! m_cancel) + { + --m_count; + m_cancel = true; + } + + if (wait) +#ifdef BOOST_NO_CXX11_LAMBDAS + while (m_count != 0) + m_cond.wait (lock); +#else + m_cond.wait (lock, [this] { + return this->m_count == 0; }); +#endif + } + + void addref () + { + std::lock_guard lock (m_mutex); + ++m_count; + } + + void release () + { + std::lock_guard lock (m_mutex); + if (--m_count == 0) + m_cond.notify_all (); + } + + template + struct sample_op + { + Handler m_handler; + time_point m_start; + bool m_repeat; + io_latency_probe* m_probe; + + sample_op (Handler const& handler, time_point const& start, + bool repeat, io_latency_probe* probe) + : m_handler (handler) + , m_start (start) + , m_repeat (repeat) + , m_probe (probe) + { + m_probe->addref(); + } + + sample_op (sample_op const& other) + : m_handler (other.m_handler) + , m_start (other.m_start) + , m_probe (other.m_probe) + { + m_probe->addref(); + } + + ~sample_op () + { + m_probe->release(); + } + + void operator() () const + { + typename Clock::time_point const now (Clock::now()); + typename Clock::duration const elapsed (now - m_start); + + m_handler (elapsed); + + { + std::lock_guard m_mutex) + > lock (m_probe->m_mutex); + if (m_probe->m_cancel) + return; + } + + if (m_repeat) + { + // Calculate when we want to sample again, and + // adjust for the expected latency. + // + typename Clock::time_point const when ( + now + m_probe->m_period - 2 * elapsed); + + if (when <= now) + { + // The latency is too high to maintain the desired + // period so don't bother with a timer. + // + m_probe->m_ios.post (sample_op ( + m_handler, now, m_repeat, m_probe)); + } + else + { + boost::posix_time::microseconds mms ( + std::chrono::duration_cast < + std::chrono::microseconds> ( + when - now).count ()); + m_probe->m_timer.expires_from_now (mms); + m_probe->m_timer.async_wait (sample_op ( + m_handler, now, m_repeat, m_probe)); + } + } + } + + void operator () (boost::system::error_code const& ec) + { + typename Clock::time_point const now (Clock::now()); + m_probe->m_ios.post (sample_op ( + m_handler, now, m_repeat, m_probe)); + } + }; +}; + +} + +#endif diff --git a/src/ripple_app/main/Application.cpp b/src/ripple_app/main/Application.cpp index fa7d0cded9..53b78a457e 100644 --- a/src/ripple_app/main/Application.cpp +++ b/src/ripple_app/main/Application.cpp @@ -63,10 +63,70 @@ class ApplicationImp , public DeadlineTimer::Listener , public LeakChecked { -private: +public: + Journal m_journal; + Application::LockType m_masterMutex; + + // These are not Stoppable-derived + NodeCache m_tempNodeCache; + SLECache m_sleCache; + LocalCredentials m_localCredentials; + TransactionMaster m_txMaster; + + std::unique_ptr m_collectorManager; + std::unique_ptr m_resourceManager; + std::unique_ptr m_rpcServiceManager; + + // These are Stoppable-related + NodeStoreScheduler m_nodeStoreScheduler; + std::unique_ptr m_jobQueue; + IoServicePool m_mainIoPool; + std::unique_ptr m_siteFiles; + // VFALCO TODO Make OrderBookDB abstract + OrderBookDB m_orderBookDB; + std::unique_ptr m_pathRequests; + std::unique_ptr m_ledgerMaster; + std::unique_ptr m_networkOPs; + std::unique_ptr m_deprecatedUNL; + std::unique_ptr m_rpcHTTPServer; +#if ! RIPPLE_USE_RPC_SERVICE_MANAGER + RPCServerHandler m_rpcServerHandler; +#endif + std::unique_ptr m_nodeStore; + std::unique_ptr m_sntpClient; + std::unique_ptr m_inboundLedgers; + std::unique_ptr m_txQueue; + std::unique_ptr m_validators; + std::unique_ptr mFeatures; + std::unique_ptr mFeeVote; + std::unique_ptr mFeeTrack; + std::unique_ptr mHashRouter; + std::unique_ptr mValidations; + std::unique_ptr mProofOfWorkFactory; + std::unique_ptr m_loadManager; + DeadlineTimer m_sweepTimer; + bool volatile mShutdown; + + std::unique_ptr mRpcDB; + std::unique_ptr mTxnDB; + std::unique_ptr mLedgerDB; + std::unique_ptr mWalletDB; + + std::unique_ptr m_peerSSLContext; + std::unique_ptr m_wsSSLContext; + std::unique_ptr m_peers; + OwnedArray m_peerDoors; + std::unique_ptr m_rpcDoor; + std::unique_ptr m_wsPublicDoor; + std::unique_ptr m_wsPrivateDoor; + std::unique_ptr m_wsProxyDoor; + + WaitableEvent m_stop; + + io_latency_probe m_probe; + static ApplicationImp* s_instance; -public: static Application& getInstance () { bassert (s_instance != nullptr); @@ -75,6 +135,39 @@ public: //-------------------------------------------------------------------------- + class sample_io_service_latency + { + public: + insight::Event latency; + Journal journal; + + sample_io_service_latency (insight::Event latency_, + Journal journal_) + : latency (latency_) + , journal (journal_) + { + } + + template + void operator() (Duration const& elapsed) const + { + auto ms (std::chrono::duration_cast < + std::chrono::milliseconds> (elapsed)); + latency.notify (ms.count()); + if (ms.count() >= 500) + journal.warning << + "io_service latency = " << ms; + +#if 0 + std::stringstream ss; + ss << "io_service latency = " << ms; + Logger::outputDebugString (ss.str()); +#endif + } + }; + + //-------------------------------------------------------------------------- + ApplicationImp () : RootStoppable ("Application") , m_journal (LogPartition::getJournal ()) @@ -169,6 +262,8 @@ public: , m_sweepTimer (this) , mShutdown (false) + + , m_probe (std::chrono::milliseconds (100), m_mainIoPool.getService()) { // VFALCO HACK m_nodeStoreScheduler.setJobQueue (*m_jobQueue); @@ -180,6 +275,11 @@ public: // VFALCO TODO remove these once the call is thread safe. HashMaps::getInstance ().initializeNonce (); + + m_probe.sample (sample_io_service_latency ( + m_collectorManager->collector()->make_event ( + "ios_latency"), LogPartition::getJournal ())); + } ~ApplicationImp () @@ -189,7 +289,7 @@ public: } //-------------------------------------------------------------------------- - + CollectorManager& getCollectorManager () { return *m_collectorManager; @@ -734,6 +834,16 @@ public: { m_journal.debug << "Application stopping"; + m_probe.cancel_async (); + + // VFALCO Enormous hack, we have to force the probe to cancel + // before we stop the io_service queue or else it never + // unblocks in its destructor. The fix is to make all + // io_objects gracefully handle exit so that we can + // naturally return from io_service::run() instead of + // forcing a call to io_service::stop() + m_probe.cancel (); + m_sweepTimer.cancel(); // VFALCO TODO get rid of this flag @@ -906,66 +1016,6 @@ private: bool loadOldLedger (const std::string&, bool); void onAnnounceAddress (); - -private: - Journal m_journal; - Application::LockType m_masterMutex; - - // These are not Stoppable-derived - NodeCache m_tempNodeCache; - SLECache m_sleCache; - LocalCredentials m_localCredentials; - TransactionMaster m_txMaster; - - std::unique_ptr m_collectorManager; - std::unique_ptr m_resourceManager; - std::unique_ptr m_rpcServiceManager; - - // These are Stoppable-related - NodeStoreScheduler m_nodeStoreScheduler; - std::unique_ptr m_jobQueue; - IoServicePool m_mainIoPool; - std::unique_ptr m_siteFiles; - // VFALCO TODO Make OrderBookDB abstract - OrderBookDB m_orderBookDB; - std::unique_ptr m_pathRequests; - std::unique_ptr m_ledgerMaster; - std::unique_ptr m_networkOPs; - std::unique_ptr m_deprecatedUNL; - std::unique_ptr m_rpcHTTPServer; -#if ! RIPPLE_USE_RPC_SERVICE_MANAGER - RPCServerHandler m_rpcServerHandler; -#endif - std::unique_ptr m_nodeStore; - std::unique_ptr m_sntpClient; - std::unique_ptr m_inboundLedgers; - std::unique_ptr m_txQueue; - std::unique_ptr m_validators; - std::unique_ptr mFeatures; - std::unique_ptr mFeeVote; - std::unique_ptr mFeeTrack; - std::unique_ptr mHashRouter; - std::unique_ptr mValidations; - std::unique_ptr mProofOfWorkFactory; - std::unique_ptr m_loadManager; - DeadlineTimer m_sweepTimer; - bool volatile mShutdown; - - std::unique_ptr mRpcDB; - std::unique_ptr mTxnDB; - std::unique_ptr mLedgerDB; - std::unique_ptr mWalletDB; - - std::unique_ptr m_peerSSLContext; - std::unique_ptr m_wsSSLContext; - std::unique_ptr m_peers; - OwnedArray m_peerDoors; - std::unique_ptr m_rpcDoor; - std::unique_ptr m_wsPublicDoor; - std::unique_ptr m_wsPrivateDoor; - std::unique_ptr m_wsProxyDoor; - - WaitableEvent m_stop; }; //------------------------------------------------------------------------------ diff --git a/src/ripple_app/ripple_app.cpp b/src/ripple_app/ripple_app.cpp index 9fa3d7293f..881db08bc2 100644 --- a/src/ripple_app/ripple_app.cpp +++ b/src/ripple_app/ripple_app.cpp @@ -41,6 +41,7 @@ #include "../ripple/validators/ripple_validators.h" #include "beast/beast/Asio.h" +#include "beast/beast/asio/io_latency_probe.h" # include "main/CollectorManager.h" #include "main/CollectorManager.cpp"