Add io_latency_probe

This commit is contained in:
Vinnie Falco
2014-01-10 22:09:25 -08:00
parent 1fcb2872b9
commit 4d5df92cbc
5 changed files with 363 additions and 63 deletions

View File

@@ -83,6 +83,7 @@
<ItemGroup>
<ClInclude Include="..\..\beast\Arithmetic.h" />
<ClInclude Include="..\..\beast\Asio.h" />
<ClInclude Include="..\..\beast\asio\io_latency_probe.h" />
<ClInclude Include="..\..\beast\asio\IPAddressConversion.h" />
<ClInclude Include="..\..\beast\Atomic.h" />
<ClInclude Include="..\..\beast\Boost.h" />

View File

@@ -1242,6 +1242,9 @@
<ClInclude Include="..\..\beast\utility\hash_pair.h">
<Filter>beast\utility</Filter>
</ClInclude>
<ClInclude Include="..\..\beast\asio\io_latency_probe.h">
<Filter>beast\asio</Filter>
</ClInclude>
</ItemGroup>
<ItemGroup>
<ClCompile Include="..\..\modules\beast_core\containers\DynamicObject.cpp">

View File

@@ -0,0 +1,245 @@
//------------------------------------------------------------------------------
/*
This file is part of Beast: https://github.com/vinniefalco/Beast
Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED
#define BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED
#include <chrono>
#include <condition_variable>
#include <mutex>
#include <stdexcept>
#include <boost/asio/deadline_timer.hpp>
#include <boost/asio/io_service.hpp>
#include <boost/config.hpp>
namespace beast {
/** Measures handler latency on an io_service queue. */
template <class Clock>
class io_latency_probe
{
private:
typedef typename Clock::duration duration;
typedef typename Clock::time_point time_point;
std::recursive_mutex m_mutex;
std::condition_variable_any m_cond;
std::size_t m_count;
duration const m_period;
boost::asio::io_service& m_ios;
boost::asio::deadline_timer m_timer;
bool m_cancel;
public:
io_latency_probe (duration const& period,
boost::asio::io_service& ios)
: m_count (1)
, m_period (period)
, m_ios (ios)
, m_timer (m_ios)
, m_cancel (false)
{
}
~io_latency_probe ()
{
std::unique_lock <decltype (m_mutex)> lock (m_mutex);
cancel (lock, true);
}
/** Return the io_service associated with the latency probe. */
/** @{ */
boost::asio::io_service& get_io_service ()
{
return m_ios;
}
boost::asio::io_service const& get_io_service () const
{
return m_ios;
}
/** @} */
/** Cancel all pending i/o.
Any handlers which have already been queued will still be called.
*/
/** @{ */
void cancel ()
{
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
cancel (lock, true);
}
void cancel_async ()
{
std::unique_lock <decltype(m_mutex)> lock (m_mutex);
cancel (lock, false);
}
/** @} */
/** Measure one sample of i/o latency.
Handler will be called with this signature:
void Handler (Duration d);
*/
template <class Handler>
void sample_one (Handler&& handler)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
if (m_cancel)
throw std::logic_error ("io_latency_probe is canceled");
m_ios.post (sample_op <Handler> (
std::forward <Handler> (handler),
Clock::now(), false, this));
}
/** Initiate continuous i/o latency sampling.
Handler will be called with this signature:
void Handler (std::chrono::milliseconds);
*/
template <class Handler>
void sample (Handler&& handler)
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
if (m_cancel)
throw std::logic_error ("io_latency_probe is canceled");
m_ios.post (sample_op <Handler> (
std::forward <Handler> (handler),
Clock::now(), true, this));
}
private:
void cancel (std::unique_lock <decltype (m_mutex)>& lock,
bool wait)
{
if (! m_cancel)
{
--m_count;
m_cancel = true;
}
if (wait)
#ifdef BOOST_NO_CXX11_LAMBDAS
while (m_count != 0)
m_cond.wait (lock);
#else
m_cond.wait (lock, [this] {
return this->m_count == 0; });
#endif
}
void addref ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
++m_count;
}
void release ()
{
std::lock_guard <decltype(m_mutex)> lock (m_mutex);
if (--m_count == 0)
m_cond.notify_all ();
}
template <class Handler>
struct sample_op
{
Handler m_handler;
time_point m_start;
bool m_repeat;
io_latency_probe* m_probe;
sample_op (Handler const& handler, time_point const& start,
bool repeat, io_latency_probe* probe)
: m_handler (handler)
, m_start (start)
, m_repeat (repeat)
, m_probe (probe)
{
m_probe->addref();
}
sample_op (sample_op const& other)
: m_handler (other.m_handler)
, m_start (other.m_start)
, m_probe (other.m_probe)
{
m_probe->addref();
}
~sample_op ()
{
m_probe->release();
}
void operator() () const
{
typename Clock::time_point const now (Clock::now());
typename Clock::duration const elapsed (now - m_start);
m_handler (elapsed);
{
std::lock_guard <decltype (m_probe->m_mutex)
> lock (m_probe->m_mutex);
if (m_probe->m_cancel)
return;
}
if (m_repeat)
{
// Calculate when we want to sample again, and
// adjust for the expected latency.
//
typename Clock::time_point const when (
now + m_probe->m_period - 2 * elapsed);
if (when <= now)
{
// The latency is too high to maintain the desired
// period so don't bother with a timer.
//
m_probe->m_ios.post (sample_op <Handler> (
m_handler, now, m_repeat, m_probe));
}
else
{
boost::posix_time::microseconds mms (
std::chrono::duration_cast <
std::chrono::microseconds> (
when - now).count ());
m_probe->m_timer.expires_from_now (mms);
m_probe->m_timer.async_wait (sample_op <Handler> (
m_handler, now, m_repeat, m_probe));
}
}
}
void operator () (boost::system::error_code const& ec)
{
typename Clock::time_point const now (Clock::now());
m_probe->m_ios.post (sample_op <Handler> (
m_handler, now, m_repeat, m_probe));
}
};
};
}
#endif

View File

@@ -63,10 +63,70 @@ class ApplicationImp
, public DeadlineTimer::Listener
, public LeakChecked <ApplicationImp>
{
private:
public:
Journal m_journal;
Application::LockType m_masterMutex;
// These are not Stoppable-derived
NodeCache m_tempNodeCache;
SLECache m_sleCache;
LocalCredentials m_localCredentials;
TransactionMaster m_txMaster;
std::unique_ptr <CollectorManager> m_collectorManager;
std::unique_ptr <Resource::Manager> m_resourceManager;
std::unique_ptr <RPC::Manager> m_rpcServiceManager;
// These are Stoppable-related
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr <JobQueue> m_jobQueue;
IoServicePool m_mainIoPool;
std::unique_ptr <SiteFiles::Manager> m_siteFiles;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
std::unique_ptr <LedgerMaster> m_ledgerMaster;
std::unique_ptr <NetworkOPs> m_networkOPs;
std::unique_ptr <UniqueNodeList> m_deprecatedUNL;
std::unique_ptr <RPCHTTPServer> m_rpcHTTPServer;
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER
RPCServerHandler m_rpcServerHandler;
#endif
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <SNTPClient> m_sntpClient;
std::unique_ptr <InboundLedgers> m_inboundLedgers;
std::unique_ptr <TxQueue> m_txQueue;
std::unique_ptr <Validators::Manager> m_validators;
std::unique_ptr <IFeatures> mFeatures;
std::unique_ptr <IFeeVote> mFeeVote;
std::unique_ptr <LoadFeeTrack> mFeeTrack;
std::unique_ptr <IHashRouter> mHashRouter;
std::unique_ptr <Validations> mValidations;
std::unique_ptr <ProofOfWorkFactory> mProofOfWorkFactory;
std::unique_ptr <LoadManager> m_loadManager;
DeadlineTimer m_sweepTimer;
bool volatile mShutdown;
std::unique_ptr <DatabaseCon> mRpcDB;
std::unique_ptr <DatabaseCon> mTxnDB;
std::unique_ptr <DatabaseCon> mLedgerDB;
std::unique_ptr <DatabaseCon> mWalletDB;
std::unique_ptr <SSLContext> m_peerSSLContext;
std::unique_ptr <SSLContext> m_wsSSLContext;
std::unique_ptr <Peers> m_peers;
OwnedArray <PeerDoor> m_peerDoors;
std::unique_ptr <RPCDoor> m_rpcDoor;
std::unique_ptr <WSDoor> m_wsPublicDoor;
std::unique_ptr <WSDoor> m_wsPrivateDoor;
std::unique_ptr <WSDoor> m_wsProxyDoor;
WaitableEvent m_stop;
io_latency_probe <std::chrono::steady_clock> m_probe;
static ApplicationImp* s_instance;
public:
static Application& getInstance ()
{
bassert (s_instance != nullptr);
@@ -75,6 +135,39 @@ public:
//--------------------------------------------------------------------------
class sample_io_service_latency
{
public:
insight::Event latency;
Journal journal;
sample_io_service_latency (insight::Event latency_,
Journal journal_)
: latency (latency_)
, journal (journal_)
{
}
template <class Duration>
void operator() (Duration const& elapsed) const
{
auto ms (std::chrono::duration_cast <
std::chrono::milliseconds> (elapsed));
latency.notify (ms.count());
if (ms.count() >= 500)
journal.warning <<
"io_service latency = " << ms;
#if 0
std::stringstream ss;
ss << "io_service latency = " << ms;
Logger::outputDebugString (ss.str());
#endif
}
};
//--------------------------------------------------------------------------
ApplicationImp ()
: RootStoppable ("Application")
, m_journal (LogPartition::getJournal <ApplicationLog> ())
@@ -169,6 +262,8 @@ public:
, m_sweepTimer (this)
, mShutdown (false)
, m_probe (std::chrono::milliseconds (100), m_mainIoPool.getService())
{
// VFALCO HACK
m_nodeStoreScheduler.setJobQueue (*m_jobQueue);
@@ -180,6 +275,11 @@ public:
// VFALCO TODO remove these once the call is thread safe.
HashMaps::getInstance ().initializeNonce <size_t> ();
m_probe.sample (sample_io_service_latency (
m_collectorManager->collector()->make_event (
"ios_latency"), LogPartition::getJournal <ApplicationLog> ()));
}
~ApplicationImp ()
@@ -734,6 +834,16 @@ public:
{
m_journal.debug << "Application stopping";
m_probe.cancel_async ();
// VFALCO Enormous hack, we have to force the probe to cancel
// before we stop the io_service queue or else it never
// unblocks in its destructor. The fix is to make all
// io_objects gracefully handle exit so that we can
// naturally return from io_service::run() instead of
// forcing a call to io_service::stop()
m_probe.cancel ();
m_sweepTimer.cancel();
// VFALCO TODO get rid of this flag
@@ -906,66 +1016,6 @@ private:
bool loadOldLedger (const std::string&, bool);
void onAnnounceAddress ();
private:
Journal m_journal;
Application::LockType m_masterMutex;
// These are not Stoppable-derived
NodeCache m_tempNodeCache;
SLECache m_sleCache;
LocalCredentials m_localCredentials;
TransactionMaster m_txMaster;
std::unique_ptr <CollectorManager> m_collectorManager;
std::unique_ptr <Resource::Manager> m_resourceManager;
std::unique_ptr <RPC::Manager> m_rpcServiceManager;
// These are Stoppable-related
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr <JobQueue> m_jobQueue;
IoServicePool m_mainIoPool;
std::unique_ptr <SiteFiles::Manager> m_siteFiles;
// VFALCO TODO Make OrderBookDB abstract
OrderBookDB m_orderBookDB;
std::unique_ptr <PathRequests> m_pathRequests;
std::unique_ptr <LedgerMaster> m_ledgerMaster;
std::unique_ptr <NetworkOPs> m_networkOPs;
std::unique_ptr <UniqueNodeList> m_deprecatedUNL;
std::unique_ptr <RPCHTTPServer> m_rpcHTTPServer;
#if ! RIPPLE_USE_RPC_SERVICE_MANAGER
RPCServerHandler m_rpcServerHandler;
#endif
std::unique_ptr <NodeStore::Database> m_nodeStore;
std::unique_ptr <SNTPClient> m_sntpClient;
std::unique_ptr <InboundLedgers> m_inboundLedgers;
std::unique_ptr <TxQueue> m_txQueue;
std::unique_ptr <Validators::Manager> m_validators;
std::unique_ptr <IFeatures> mFeatures;
std::unique_ptr <IFeeVote> mFeeVote;
std::unique_ptr <LoadFeeTrack> mFeeTrack;
std::unique_ptr <IHashRouter> mHashRouter;
std::unique_ptr <Validations> mValidations;
std::unique_ptr <ProofOfWorkFactory> mProofOfWorkFactory;
std::unique_ptr <LoadManager> m_loadManager;
DeadlineTimer m_sweepTimer;
bool volatile mShutdown;
std::unique_ptr <DatabaseCon> mRpcDB;
std::unique_ptr <DatabaseCon> mTxnDB;
std::unique_ptr <DatabaseCon> mLedgerDB;
std::unique_ptr <DatabaseCon> mWalletDB;
std::unique_ptr <SSLContext> m_peerSSLContext;
std::unique_ptr <SSLContext> m_wsSSLContext;
std::unique_ptr <Peers> m_peers;
OwnedArray <PeerDoor> m_peerDoors;
std::unique_ptr <RPCDoor> m_rpcDoor;
std::unique_ptr <WSDoor> m_wsPublicDoor;
std::unique_ptr <WSDoor> m_wsPrivateDoor;
std::unique_ptr <WSDoor> m_wsProxyDoor;
WaitableEvent m_stop;
};
//------------------------------------------------------------------------------

View File

@@ -41,6 +41,7 @@
#include "../ripple/validators/ripple_validators.h"
#include "beast/beast/Asio.h"
#include "beast/beast/asio/io_latency_probe.h"
# include "main/CollectorManager.h"
#include "main/CollectorManager.cpp"