rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of Beast: https://github.com/vinniefalco/Beast
4 Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpl/beast/core/List.h>
21#include <xrpl/beast/insight/CounterImpl.h>
22#include <xrpl/beast/insight/EventImpl.h>
23#include <xrpl/beast/insight/GaugeImpl.h>
24#include <xrpl/beast/insight/Hook.h>
25#include <xrpl/beast/insight/HookImpl.h>
26#include <xrpl/beast/insight/MeterImpl.h>
27#include <xrpl/beast/insight/StatsDCollector.h>
28#include <xrpl/beast/net/IPEndpoint.h>
29#include <xrpl/beast/utility/Journal.h>
30#include <xrpl/beast/utility/instrumentation.h>
31
32#include <boost/asio/basic_waitable_timer.hpp>
33#include <boost/asio/bind_executor.hpp>
34#include <boost/asio/buffer.hpp>
35#include <boost/asio/error.hpp>
36#include <boost/asio/executor_work_guard.hpp>
37#include <boost/asio/io_context.hpp>
38#include <boost/asio/ip/udp.hpp>
39#include <boost/asio/strand.hpp>
40#include <boost/system/detail/error_code.hpp>
41
42#include <chrono>
43#include <cstddef>
44#include <deque>
45#include <functional>
46#include <limits>
47#include <memory>
48#include <mutex>
49#include <optional>
50#include <sstream>
51#include <string>
52#include <thread>
53#include <utility>
54#include <vector>
55
56#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
57#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
58#endif
59
60namespace beast {
61namespace insight {
62
63namespace detail {
64
65class StatsDCollectorImp;
66
67//------------------------------------------------------------------------------
68
69class StatsDMetricBase : public List<StatsDMetricBase>::Node
70{
71public:
72 virtual void
74 virtual ~StatsDMetricBase() = default;
75 StatsDMetricBase() = default;
78 operator=(StatsDMetricBase const&) = delete;
79};
80
81//------------------------------------------------------------------------------
82
84{
85public:
87 HandlerType const& handler,
89
90 ~StatsDHookImpl() override;
91
92 void
93 do_process() override;
94
95private:
98
101};
102
103//------------------------------------------------------------------------------
104
106{
107public:
109 std::string const& name,
111
112 ~StatsDCounterImpl() override;
113
114 void
115 increment(CounterImpl::value_type amount) override;
116
117 void
118 flush();
119 void
121 void
122 do_process() override;
123
124private:
127
132};
133
134//------------------------------------------------------------------------------
135
137{
138public:
140 std::string const& name,
142
143 ~StatsDEventImpl() = default;
144
145 void
146 notify(EventImpl::value_type const& value) override;
147
148 void
149 do_notify(EventImpl::value_type const& value);
150 void
152
153private:
156
159};
160
161//------------------------------------------------------------------------------
162
164{
165public:
167 std::string const& name,
169
170 ~StatsDGaugeImpl() override;
171
172 void
173 set(GaugeImpl::value_type value) override;
174 void
175 increment(GaugeImpl::difference_type amount) override;
176
177 void
178 flush();
179 void
181 void
183 void
184 do_process() override;
185
186private:
189
195};
196
197//------------------------------------------------------------------------------
198
200{
201public:
202 explicit StatsDMeterImpl(
203 std::string const& name,
205
206 ~StatsDMeterImpl() override;
207
208 void
209 increment(MeterImpl::value_type amount) override;
210
211 void
212 flush();
213 void
215 void
216 do_process() override;
217
218private:
221
226};
227
228//------------------------------------------------------------------------------
229
231 : public StatsDCollector,
232 public std::enable_shared_from_this<StatsDCollectorImp>
233{
234private:
235 enum {
236 // max_packet_size = 484
237 max_packet_size = 1472
238 };
239
243 boost::asio::io_context m_io_context;
244 std::optional<boost::asio::executor_work_guard<
245 boost::asio::io_context::executor_type>>
247 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
248 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
249 boost::asio::ip::udp::socket m_socket;
253
254 // Must come last for order of init
256
257 static boost::asio::ip::udp::endpoint
259 {
260 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
261 }
262
263public:
265 IP::Endpoint const& address,
266 std::string const& prefix,
267 Journal journal)
268 : m_journal(journal)
269 , m_address(address)
271 , m_work(boost::asio::make_work_guard(m_io_context))
272 , m_strand(boost::asio::make_strand(m_io_context))
276 {
277 }
278
280 {
281 try
282 {
283 m_timer.cancel();
284 }
285 catch (boost::system::system_error const&)
286 {
287 // ignored
288 }
289
290 m_work.reset();
291 m_thread.join();
292 }
293
294 Hook
295 make_hook(HookImpl::HandlerType const& handler) override
296 {
298 handler, shared_from_this()));
299 }
300
301 Counter
302 make_counter(std::string const& name) override
303 {
305 name, shared_from_this()));
306 }
307
308 Event
309 make_event(std::string const& name) override
310 {
312 name, shared_from_this()));
313 }
314
315 Gauge
316 make_gauge(std::string const& name) override
317 {
319 name, shared_from_this()));
320 }
321
322 Meter
323 make_meter(std::string const& name) override
324 {
326 name, shared_from_this()));
327 }
328
329 //--------------------------------------------------------------------------
330
331 void
333 {
335 metrics_.push_back(metric);
336 }
337
338 void
340 {
342 metrics_.erase(metrics_.iterator_to(metric));
343 }
344
345 //--------------------------------------------------------------------------
346
347 boost::asio::io_context&
349 {
350 return m_io_context;
351 }
352
353 std::string const&
354 prefix() const
355 {
356 return m_prefix;
357 }
358
359 void
361 {
362 m_data.emplace_back(buffer);
363 }
364
365 void
367 {
368 boost::asio::dispatch(
370 boost::asio::bind_executor(
371 m_strand,
372 std::bind(
374 this,
375 std::move(buffer))));
376 }
377
378 // The keepAlive parameter makes sure the buffers sent to
379 // boost::asio::async_send do not go away until the call is finished
380 void
383 boost::system::error_code ec,
385 {
386 if (ec == boost::asio::error::operation_aborted)
387 return;
388
389 if (ec)
390 {
391 if (auto stream = m_journal.error())
392 stream << "async_send failed: " << ec.message();
393 return;
394 }
395 }
396
397 void
399 {
400 (void)buffers;
401#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
402 for (auto const& buffer : buffers)
403 {
404 std::string const s(
405 buffer.data(), boost::asio::buffer_size(buffer));
406 std::cerr << s;
407 }
408 std::cerr << '\n';
409#endif
410 }
411
412 // Send what we have
413 void
415 {
416 if (m_data.empty())
417 return;
418
419 // Break up the array of strings into blocks
420 // that each fit into one UDP packet.
421 //
423 buffers.reserve(m_data.size());
424 std::size_t size(0);
425
426 auto keepAlive =
428 m_data.clear();
429
430 for (auto const& s : *keepAlive)
431 {
432 std::size_t const length(s.size());
433 XRPL_ASSERT(
434 !s.empty(),
435 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
436 "non-empty payload");
437 if (!buffers.empty() && (size + length) > max_packet_size)
438 {
439 log(buffers);
440 m_socket.async_send(
441 buffers,
442 std::bind(
444 this,
445 keepAlive,
446 std::placeholders::_1,
447 std::placeholders::_2));
448 buffers.clear();
449 size = 0;
450 }
451
452 buffers.emplace_back(&s[0], length);
453 size += length;
454 }
455
456 if (!buffers.empty())
457 {
458 log(buffers);
459 m_socket.async_send(
460 buffers,
461 std::bind(
463 this,
464 keepAlive,
465 std::placeholders::_1,
466 std::placeholders::_2));
467 }
468 }
469
470 void
472 {
473 using namespace std::chrono_literals;
474 m_timer.expires_after(1s);
475 m_timer.async_wait(std::bind(
476 &StatsDCollectorImp::on_timer, this, std::placeholders::_1));
477 }
478
479 void
480 on_timer(boost::system::error_code ec)
481 {
482 if (ec == boost::asio::error::operation_aborted)
483 return;
484
485 if (ec)
486 {
487 if (auto stream = m_journal.error())
488 stream << "on_timer failed: " << ec.message();
489 return;
490 }
491
493
494 for (auto& m : metrics_)
495 m.do_process();
496
497 send_buffers();
498
499 set_timer();
500 }
501
502 void
504 {
505 boost::system::error_code ec;
506
507 if (m_socket.connect(to_endpoint(m_address), ec))
508 {
509 if (auto stream = m_journal.error())
510 stream << "Connect failed: " << ec.message();
511 return;
512 }
513
514 set_timer();
515
516 m_io_context.run();
517
518 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
519
520 m_socket.close();
521
522 m_io_context.poll();
523 }
524};
525
526//------------------------------------------------------------------------------
527
529 HandlerType const& handler,
531 : m_impl(impl), m_handler(handler)
532{
533 m_impl->add(*this);
534}
535
537{
538 m_impl->remove(*this);
539}
540
541void
546
547//------------------------------------------------------------------------------
548
550 std::string const& name,
552 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
553{
554 m_impl->add(*this);
555}
556
558{
559 m_impl->remove(*this);
560}
561
562void
564{
565 boost::asio::dispatch(
566 m_impl->get_io_context(),
567 std::bind(
570 amount));
571}
572
573void
575{
576 if (m_dirty)
577 {
578 m_dirty = false;
580 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
581 << "\n";
582 m_value = 0;
583 m_impl->post_buffer(ss.str());
584 }
585}
586
587void
589{
590 m_value += amount;
591 m_dirty = true;
592}
593
594void
599
600//------------------------------------------------------------------------------
601
603 std::string const& name,
605 : m_impl(impl), m_name(name)
606{
607}
608
609void
611{
612 boost::asio::dispatch(
613 m_impl->get_io_context(),
614 std::bind(
617 value));
618}
619
620void
622{
624 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
625 << "\n";
626 m_impl->post_buffer(ss.str());
627}
628
629//------------------------------------------------------------------------------
630
632 std::string const& name,
634 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
635{
636 m_impl->add(*this);
637}
638
640{
641 m_impl->remove(*this);
642}
643
644void
646{
647 boost::asio::dispatch(
648 m_impl->get_io_context(),
649 std::bind(
652 value));
653}
654
655void
657{
658 boost::asio::dispatch(
659 m_impl->get_io_context(),
660 std::bind(
663 amount));
664}
665
666void
668{
669 if (m_dirty)
670 {
671 m_dirty = false;
673 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
674 << "\n";
675 m_impl->post_buffer(ss.str());
676 }
677}
678
679void
681{
682 m_value = value;
683
684 if (m_value != m_last_value)
685 {
687 m_dirty = true;
688 }
689}
690
691void
693{
695
696 if (amount > 0)
697 {
698 GaugeImpl::value_type const d(
699 static_cast<GaugeImpl::value_type>(amount));
700 value +=
703 : d;
704 }
705 else if (amount < 0)
706 {
707 GaugeImpl::value_type const d(
708 static_cast<GaugeImpl::value_type>(-amount));
709 value = (d >= value) ? 0 : value - d;
710 }
711
712 do_set(value);
713}
714
715void
720
721//------------------------------------------------------------------------------
722
724 std::string const& name,
726 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
727{
728 m_impl->add(*this);
729}
730
732{
733 m_impl->remove(*this);
734}
735
736void
738{
739 boost::asio::dispatch(
740 m_impl->get_io_context(),
741 std::bind(
744 amount));
745}
746
747void
749{
750 if (m_dirty)
751 {
752 m_dirty = false;
754 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
755 << "\n";
756 m_value = 0;
757 m_impl->post_buffer(ss.str());
758 }
759}
760
761void
763{
764 m_value += amount;
765 m_dirty = true;
766}
767
768void
773
774} // namespace detail
775
776//------------------------------------------------------------------------------
777
780 IP::Endpoint const& address,
781 std::string const& prefix,
782 Journal journal)
783{
785 address, prefix, journal);
786}
787
788} // namespace insight
789} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:38
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:75
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:61
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Intrusive doubly linked list.
Definition List.h:279
A metric for measuring an integral value.
Definition Counter.h:39
A metric for reporting event timing.
Definition Event.h:41
A metric for measuring an integral value.
Definition Gauge.h:40
A reference to a handler for performing polled collection.
Definition Hook.h:32
A metric for measuring an integral value.
Definition Meter.h:38
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)