rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of Beast: https://github.com/vinniefalco/Beast
4 Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpl/beast/core/List.h>
21#include <xrpl/beast/insight/CounterImpl.h>
22#include <xrpl/beast/insight/EventImpl.h>
23#include <xrpl/beast/insight/GaugeImpl.h>
24#include <xrpl/beast/insight/Hook.h>
25#include <xrpl/beast/insight/HookImpl.h>
26#include <xrpl/beast/insight/MeterImpl.h>
27#include <xrpl/beast/insight/StatsDCollector.h>
28#include <xrpl/beast/net/IPEndpoint.h>
29#include <xrpl/beast/utility/Journal.h>
30#include <xrpl/beast/utility/instrumentation.h>
31
32#include <boost/asio/basic_waitable_timer.hpp>
33#include <boost/asio/buffer.hpp>
34#include <boost/asio/error.hpp>
35#include <boost/asio/io_service.hpp>
36#include <boost/asio/ip/udp.hpp>
37#include <boost/asio/strand.hpp>
38#include <boost/system/detail/error_code.hpp>
39
40#include <chrono>
41#include <cstddef>
42#include <deque>
43#include <functional>
44#include <limits>
45#include <memory>
46#include <mutex>
47#include <optional>
48#include <sstream>
49#include <string>
50#include <thread>
51#include <utility>
52#include <vector>
53
54#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
55#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
56#endif
57
58namespace beast {
59namespace insight {
60
61namespace detail {
62
63class StatsDCollectorImp;
64
65//------------------------------------------------------------------------------
66
67class StatsDMetricBase : public List<StatsDMetricBase>::Node
68{
69public:
70 virtual void
72 virtual ~StatsDMetricBase() = default;
73 StatsDMetricBase() = default;
76 operator=(StatsDMetricBase const&) = delete;
77};
78
79//------------------------------------------------------------------------------
80
82{
83public:
85 HandlerType const& handler,
87
88 ~StatsDHookImpl() override;
89
90 void
91 do_process() override;
92
93private:
96
99};
100
101//------------------------------------------------------------------------------
102
104{
105public:
107 std::string const& name,
109
110 ~StatsDCounterImpl() override;
111
112 void
113 increment(CounterImpl::value_type amount) override;
114
115 void
116 flush();
117 void
119 void
120 do_process() override;
121
122private:
125
130};
131
132//------------------------------------------------------------------------------
133
135{
136public:
138 std::string const& name,
140
141 ~StatsDEventImpl() = default;
142
143 void
144 notify(EventImpl::value_type const& value) override;
145
146 void
147 do_notify(EventImpl::value_type const& value);
148 void
150
151private:
154
157};
158
159//------------------------------------------------------------------------------
160
162{
163public:
165 std::string const& name,
167
168 ~StatsDGaugeImpl() override;
169
170 void
171 set(GaugeImpl::value_type value) override;
172 void
173 increment(GaugeImpl::difference_type amount) override;
174
175 void
176 flush();
177 void
179 void
181 void
182 do_process() override;
183
184private:
187
193};
194
195//------------------------------------------------------------------------------
196
198{
199public:
200 explicit StatsDMeterImpl(
201 std::string const& name,
203
204 ~StatsDMeterImpl() override;
205
206 void
207 increment(MeterImpl::value_type amount) override;
208
209 void
210 flush();
211 void
213 void
214 do_process() override;
215
216private:
219
224};
225
226//------------------------------------------------------------------------------
227
229 : public StatsDCollector,
230 public std::enable_shared_from_this<StatsDCollectorImp>
231{
232private:
233 enum {
234 // max_packet_size = 484
235 max_packet_size = 1472
236 };
237
241 boost::asio::io_service m_io_service;
243 boost::asio::io_service::strand m_strand;
244 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
245 boost::asio::ip::udp::socket m_socket;
249
250 // Must come last for order of init
252
253 static boost::asio::ip::udp::endpoint
255 {
256 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
257 }
258
259public:
261 IP::Endpoint const& address,
262 std::string const& prefix,
263 Journal journal)
264 : m_journal(journal)
265 , m_address(address)
267 , m_work(std::ref(m_io_service))
272 {
273 }
274
276 {
277 boost::system::error_code ec;
278 m_timer.cancel(ec);
279
280 m_work.reset();
281 m_thread.join();
282 }
283
284 Hook
285 make_hook(HookImpl::HandlerType const& handler) override
286 {
288 handler, shared_from_this()));
289 }
290
291 Counter
292 make_counter(std::string const& name) override
293 {
295 name, shared_from_this()));
296 }
297
298 Event
299 make_event(std::string const& name) override
300 {
302 name, shared_from_this()));
303 }
304
305 Gauge
306 make_gauge(std::string const& name) override
307 {
309 name, shared_from_this()));
310 }
311
312 Meter
313 make_meter(std::string const& name) override
314 {
316 name, shared_from_this()));
317 }
318
319 //--------------------------------------------------------------------------
320
321 void
323 {
325 metrics_.push_back(metric);
326 }
327
328 void
330 {
332 metrics_.erase(metrics_.iterator_to(metric));
333 }
334
335 //--------------------------------------------------------------------------
336
337 boost::asio::io_service&
339 {
340 return m_io_service;
341 }
342
343 std::string const&
344 prefix() const
345 {
346 return m_prefix;
347 }
348
349 void
351 {
352 m_data.emplace_back(buffer);
353 }
354
355 void
357 {
358 m_io_service.dispatch(m_strand.wrap(std::bind(
359 &StatsDCollectorImp::do_post_buffer, this, std::move(buffer))));
360 }
361
362 // The keepAlive parameter makes sure the buffers sent to
363 // boost::asio::async_send do not go away until the call is finished
364 void
367 boost::system::error_code ec,
369 {
370 if (ec == boost::asio::error::operation_aborted)
371 return;
372
373 if (ec)
374 {
375 if (auto stream = m_journal.error())
376 stream << "async_send failed: " << ec.message();
377 return;
378 }
379 }
380
381 void
383 {
384 (void)buffers;
385#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
386 for (auto const& buffer : buffers)
387 {
388 std::string const s(
389 boost::asio::buffer_cast<char const*>(buffer),
390 boost::asio::buffer_size(buffer));
391 std::cerr << s;
392 }
393 std::cerr << '\n';
394#endif
395 }
396
397 // Send what we have
398 void
400 {
401 if (m_data.empty())
402 return;
403
404 // Break up the array of strings into blocks
405 // that each fit into one UDP packet.
406 //
408 buffers.reserve(m_data.size());
409 std::size_t size(0);
410
411 auto keepAlive =
413 m_data.clear();
414
415 for (auto const& s : *keepAlive)
416 {
417 std::size_t const length(s.size());
418 XRPL_ASSERT(
419 !s.empty(),
420 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
421 "non-empty payload");
422 if (!buffers.empty() && (size + length) > max_packet_size)
423 {
424 log(buffers);
425 m_socket.async_send(
426 buffers,
427 std::bind(
429 this,
430 keepAlive,
431 std::placeholders::_1,
432 std::placeholders::_2));
433 buffers.clear();
434 size = 0;
435 }
436
437 buffers.emplace_back(&s[0], length);
438 size += length;
439 }
440
441 if (!buffers.empty())
442 {
443 log(buffers);
444 m_socket.async_send(
445 buffers,
446 std::bind(
448 this,
449 keepAlive,
450 std::placeholders::_1,
451 std::placeholders::_2));
452 }
453 }
454
455 void
457 {
458 using namespace std::chrono_literals;
459 m_timer.expires_from_now(1s);
460 m_timer.async_wait(std::bind(
461 &StatsDCollectorImp::on_timer, this, std::placeholders::_1));
462 }
463
464 void
465 on_timer(boost::system::error_code ec)
466 {
467 if (ec == boost::asio::error::operation_aborted)
468 return;
469
470 if (ec)
471 {
472 if (auto stream = m_journal.error())
473 stream << "on_timer failed: " << ec.message();
474 return;
475 }
476
478
479 for (auto& m : metrics_)
480 m.do_process();
481
482 send_buffers();
483
484 set_timer();
485 }
486
487 void
489 {
490 boost::system::error_code ec;
491
492 if (m_socket.connect(to_endpoint(m_address), ec))
493 {
494 if (auto stream = m_journal.error())
495 stream << "Connect failed: " << ec.message();
496 return;
497 }
498
499 set_timer();
500
501 m_io_service.run();
502
503 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
504
505 m_socket.close();
506
507 m_io_service.poll();
508 }
509};
510
511//------------------------------------------------------------------------------
512
514 HandlerType const& handler,
516 : m_impl(impl), m_handler(handler)
517{
518 m_impl->add(*this);
519}
520
522{
523 m_impl->remove(*this);
524}
525
526void
531
532//------------------------------------------------------------------------------
533
535 std::string const& name,
537 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
538{
539 m_impl->add(*this);
540}
541
543{
544 m_impl->remove(*this);
545}
546
547void
555
556void
558{
559 if (m_dirty)
560 {
561 m_dirty = false;
563 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
564 << "\n";
565 m_value = 0;
566 m_impl->post_buffer(ss.str());
567 }
568}
569
570void
572{
573 m_value += amount;
574 m_dirty = true;
575}
576
577void
582
583//------------------------------------------------------------------------------
584
586 std::string const& name,
588 : m_impl(impl), m_name(name)
589{
590}
591
592void
600
601void
603{
605 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
606 << "\n";
607 m_impl->post_buffer(ss.str());
608}
609
610//------------------------------------------------------------------------------
611
613 std::string const& name,
615 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
616{
617 m_impl->add(*this);
618}
619
621{
622 m_impl->remove(*this);
623}
624
625void
633
634void
642
643void
645{
646 if (m_dirty)
647 {
648 m_dirty = false;
650 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
651 << "\n";
652 m_impl->post_buffer(ss.str());
653 }
654}
655
656void
658{
659 m_value = value;
660
661 if (m_value != m_last_value)
662 {
664 m_dirty = true;
665 }
666}
667
668void
670{
672
673 if (amount > 0)
674 {
675 GaugeImpl::value_type const d(
676 static_cast<GaugeImpl::value_type>(amount));
677 value +=
680 : d;
681 }
682 else if (amount < 0)
683 {
684 GaugeImpl::value_type const d(
685 static_cast<GaugeImpl::value_type>(-amount));
686 value = (d >= value) ? 0 : value - d;
687 }
688
689 do_set(value);
690}
691
692void
697
698//------------------------------------------------------------------------------
699
701 std::string const& name,
703 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
704{
705 m_impl->add(*this);
706}
707
709{
710 m_impl->remove(*this);
711}
712
713void
721
722void
724{
725 if (m_dirty)
726 {
727 m_dirty = false;
729 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
730 << "\n";
731 m_value = 0;
732 m_impl->post_buffer(ss.str());
733 }
734}
735
736void
738{
739 m_value += amount;
740 m_dirty = true;
741}
742
743void
748
749} // namespace detail
750
751//------------------------------------------------------------------------------
752
755 IP::Endpoint const& address,
756 std::string const& prefix,
757 Journal journal)
758{
760 address, prefix, journal);
761}
762
763} // namespace insight
764} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:75
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:61
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Intrusive doubly linked list.
Definition List.h:279
A metric for measuring an integral value.
Definition Counter.h:39
A metric for reporting event timing.
Definition Event.h:41
A metric for measuring an integral value.
Definition Gauge.h:40
A reference to a handler for performing polled collection.
Definition Hook.h:32
A metric for measuring an integral value.
Definition Meter.h:38
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
std::optional< boost::asio::io_service::work > m_work
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::io_service::strand m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
STL namespace.
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)