mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-18 18:15:50 +00:00
Per XLS-0095, we are taking steps to rename ripple(d) to xrpl(d). This change specifically removes all copyright notices referencing Ripple, XRPLF, and certain affiliated contributors upon mutual agreement, so the notice in the LICENSE.md file applies throughout. Copyright notices referencing external contributions remain as-is. Duplicate verbiage is also removed.
254 lines
6.3 KiB
C++
254 lines
6.3 KiB
C++
#ifndef BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED
|
|
#define BEAST_ASIO_IO_LATENCY_PROBE_H_INCLUDED
|
|
|
|
#include <xrpl/beast/utility/instrumentation.h>
|
|
|
|
#include <boost/asio/basic_waitable_timer.hpp>
|
|
#include <boost/asio/io_context.hpp>
|
|
#include <boost/asio/post.hpp>
|
|
|
|
#include <chrono>
|
|
#include <condition_variable>
|
|
#include <mutex>
|
|
#include <stdexcept>
|
|
|
|
namespace beast {
|
|
|
|
/** Measures handler latency on an io_context queue. */
|
|
template <class Clock>
|
|
class io_latency_probe
|
|
{
|
|
private:
|
|
using duration = typename Clock::duration;
|
|
using time_point = typename Clock::time_point;
|
|
|
|
std::recursive_mutex m_mutex;
|
|
std::condition_variable_any m_cond;
|
|
std::size_t m_count;
|
|
duration const m_period;
|
|
boost::asio::io_context& m_ios;
|
|
boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
|
|
bool m_cancel;
|
|
|
|
public:
|
|
io_latency_probe(duration const& period, boost::asio::io_context& ios)
|
|
: m_count(1)
|
|
, m_period(period)
|
|
, m_ios(ios)
|
|
, m_timer(m_ios)
|
|
, m_cancel(false)
|
|
{
|
|
}
|
|
|
|
~io_latency_probe()
|
|
{
|
|
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
|
|
cancel(lock, true);
|
|
}
|
|
|
|
/** Return the io_context associated with the latency probe. */
|
|
/** @{ */
|
|
boost::asio::io_context&
|
|
get_io_context()
|
|
{
|
|
return m_ios;
|
|
}
|
|
|
|
boost::asio::io_context const&
|
|
get_io_context() const
|
|
{
|
|
return m_ios;
|
|
}
|
|
/** @} */
|
|
|
|
/** Cancel all pending i/o.
|
|
Any handlers which have already been queued will still be called.
|
|
*/
|
|
/** @{ */
|
|
void
|
|
cancel()
|
|
{
|
|
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
|
|
cancel(lock, true);
|
|
}
|
|
|
|
void
|
|
cancel_async()
|
|
{
|
|
std::unique_lock<decltype(m_mutex)> lock(m_mutex);
|
|
cancel(lock, false);
|
|
}
|
|
/** @} */
|
|
|
|
/** Measure one sample of i/o latency.
|
|
Handler will be called with this signature:
|
|
void Handler (Duration d);
|
|
*/
|
|
template <class Handler>
|
|
void
|
|
sample_one(Handler&& handler)
|
|
{
|
|
std::lock_guard lock(m_mutex);
|
|
if (m_cancel)
|
|
throw std::logic_error("io_latency_probe is canceled");
|
|
boost::asio::post(
|
|
m_ios,
|
|
sample_op<Handler>(
|
|
std::forward<Handler>(handler), Clock::now(), false, this));
|
|
}
|
|
|
|
/** Initiate continuous i/o latency sampling.
|
|
Handler will be called with this signature:
|
|
void Handler (std::chrono::milliseconds);
|
|
*/
|
|
template <class Handler>
|
|
void
|
|
sample(Handler&& handler)
|
|
{
|
|
std::lock_guard lock(m_mutex);
|
|
if (m_cancel)
|
|
throw std::logic_error("io_latency_probe is canceled");
|
|
boost::asio::post(
|
|
m_ios,
|
|
sample_op<Handler>(
|
|
std::forward<Handler>(handler), Clock::now(), true, this));
|
|
}
|
|
|
|
private:
|
|
void
|
|
cancel(std::unique_lock<decltype(m_mutex)>& lock, bool wait)
|
|
{
|
|
if (!m_cancel)
|
|
{
|
|
--m_count;
|
|
m_cancel = true;
|
|
}
|
|
|
|
if (wait)
|
|
m_cond.wait(lock, [this] { return this->m_count == 0; });
|
|
}
|
|
|
|
void
|
|
addref()
|
|
{
|
|
std::lock_guard lock(m_mutex);
|
|
++m_count;
|
|
}
|
|
|
|
void
|
|
release()
|
|
{
|
|
std::lock_guard lock(m_mutex);
|
|
if (--m_count == 0)
|
|
m_cond.notify_all();
|
|
}
|
|
|
|
template <class Handler>
|
|
struct sample_op
|
|
{
|
|
Handler m_handler;
|
|
time_point m_start;
|
|
bool m_repeat;
|
|
io_latency_probe* m_probe;
|
|
|
|
sample_op(
|
|
Handler const& handler,
|
|
time_point const& start,
|
|
bool repeat,
|
|
io_latency_probe* probe)
|
|
: m_handler(handler)
|
|
, m_start(start)
|
|
, m_repeat(repeat)
|
|
, m_probe(probe)
|
|
{
|
|
XRPL_ASSERT(
|
|
m_probe,
|
|
"beast::io_latency_probe::sample_op::sample_op : non-null "
|
|
"probe input");
|
|
m_probe->addref();
|
|
}
|
|
|
|
sample_op(sample_op&& from) noexcept
|
|
: m_handler(std::move(from.m_handler))
|
|
, m_start(from.m_start)
|
|
, m_repeat(from.m_repeat)
|
|
, m_probe(from.m_probe)
|
|
{
|
|
XRPL_ASSERT(
|
|
m_probe,
|
|
"beast::io_latency_probe::sample_op::sample_op(sample_op&&) : "
|
|
"non-null probe input");
|
|
from.m_probe = nullptr;
|
|
}
|
|
|
|
sample_op(sample_op const&) = delete;
|
|
sample_op
|
|
operator=(sample_op const&) = delete;
|
|
sample_op&
|
|
operator=(sample_op&&) = delete;
|
|
|
|
~sample_op()
|
|
{
|
|
if (m_probe)
|
|
m_probe->release();
|
|
}
|
|
|
|
void
|
|
operator()() const
|
|
{
|
|
if (!m_probe)
|
|
return;
|
|
typename Clock::time_point const now(Clock::now());
|
|
typename Clock::duration const elapsed(now - m_start);
|
|
|
|
m_handler(elapsed);
|
|
|
|
{
|
|
std::lock_guard lock(m_probe->m_mutex);
|
|
if (m_probe->m_cancel)
|
|
return;
|
|
}
|
|
|
|
if (m_repeat)
|
|
{
|
|
// Calculate when we want to sample again, and
|
|
// adjust for the expected latency.
|
|
//
|
|
typename Clock::time_point const when(
|
|
now + m_probe->m_period - 2 * elapsed);
|
|
|
|
if (when <= now)
|
|
{
|
|
// The latency is too high to maintain the desired
|
|
// period so don't bother with a timer.
|
|
//
|
|
boost::asio::post(
|
|
m_probe->m_ios,
|
|
sample_op<Handler>(m_handler, now, m_repeat, m_probe));
|
|
}
|
|
else
|
|
{
|
|
m_probe->m_timer.expires_after(when - now);
|
|
m_probe->m_timer.async_wait(
|
|
sample_op<Handler>(m_handler, now, m_repeat, m_probe));
|
|
}
|
|
}
|
|
}
|
|
|
|
void
|
|
operator()(boost::system::error_code const& ec)
|
|
{
|
|
if (!m_probe)
|
|
return;
|
|
typename Clock::time_point const now(Clock::now());
|
|
boost::asio::post(
|
|
m_probe->m_ios,
|
|
sample_op<Handler>(m_handler, now, m_repeat, m_probe));
|
|
}
|
|
};
|
|
};
|
|
|
|
} // namespace beast
|
|
|
|
#endif
|