rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1#include <xrpl/beast/core/List.h>
2#include <xrpl/beast/insight/CounterImpl.h>
3#include <xrpl/beast/insight/EventImpl.h>
4#include <xrpl/beast/insight/GaugeImpl.h>
5#include <xrpl/beast/insight/Hook.h>
6#include <xrpl/beast/insight/HookImpl.h>
7#include <xrpl/beast/insight/MeterImpl.h>
8#include <xrpl/beast/insight/StatsDCollector.h>
9#include <xrpl/beast/net/IPEndpoint.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/beast/utility/instrumentation.h>
12
13#include <boost/asio/basic_waitable_timer.hpp>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/executor_work_guard.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/ip/udp.hpp>
20#include <boost/asio/strand.hpp>
21#include <boost/system/detail/error_code.hpp>
22
23#include <chrono>
24#include <cstddef>
25#include <deque>
26#include <functional>
27#include <limits>
28#include <memory>
29#include <mutex>
30#include <optional>
31#include <sstream>
32#include <string>
33#include <thread>
34#include <utility>
35#include <vector>
36
37#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
38#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
39#endif
40
41namespace beast {
42namespace insight {
43
44namespace detail {
45
46class StatsDCollectorImp;
47
48//------------------------------------------------------------------------------
49
50class StatsDMetricBase : public List<StatsDMetricBase>::Node
51{
52public:
53 virtual void
55 virtual ~StatsDMetricBase() = default;
56 StatsDMetricBase() = default;
59 operator=(StatsDMetricBase const&) = delete;
60};
61
62//------------------------------------------------------------------------------
63
65{
66public:
68
69 ~StatsDHookImpl() override;
70
71 void
72 do_process() override;
73
74private:
77
80};
81
82//------------------------------------------------------------------------------
83
85{
86public:
88
89 ~StatsDCounterImpl() override;
90
91 void
92 increment(CounterImpl::value_type amount) override;
93
94 void
95 flush();
96 void
98 void
99 do_process() override;
100
101private:
104
109};
110
111//------------------------------------------------------------------------------
112
114{
115public:
117
118 ~StatsDEventImpl() = default;
119
120 void
121 notify(EventImpl::value_type const& value) override;
122
123 void
124 do_notify(EventImpl::value_type const& value);
125 void
127
128private:
131
134};
135
136//------------------------------------------------------------------------------
137
139{
140public:
142
143 ~StatsDGaugeImpl() override;
144
145 void
146 set(GaugeImpl::value_type value) override;
147 void
148 increment(GaugeImpl::difference_type amount) override;
149
150 void
151 flush();
152 void
154 void
156 void
157 do_process() override;
158
159private:
162
168};
169
170//------------------------------------------------------------------------------
171
173{
174public:
175 explicit StatsDMeterImpl(
176 std::string const& name,
178
179 ~StatsDMeterImpl() override;
180
181 void
182 increment(MeterImpl::value_type amount) override;
183
184 void
185 flush();
186 void
188 void
189 do_process() override;
190
191private:
194
199};
200
201//------------------------------------------------------------------------------
202
204 public std::enable_shared_from_this<StatsDCollectorImp>
205{
206private:
207 enum {
208 // max_packet_size = 484
209 max_packet_size = 1472
210 };
211
215 boost::asio::io_context m_io_context;
217 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
218 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
219 boost::asio::ip::udp::socket m_socket;
223
224 // Must come last for order of init
226
227 static boost::asio::ip::udp::endpoint
229 {
230 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
231 }
232
233public:
235 : m_journal(journal)
236 , m_address(address)
238 , m_work(boost::asio::make_work_guard(m_io_context))
239 , m_strand(boost::asio::make_strand(m_io_context))
243 {
244 }
245
247 {
248 try
249 {
250 m_timer.cancel();
251 }
252 catch (boost::system::system_error const&)
253 {
254 // ignored
255 }
256
257 m_work.reset();
258 m_thread.join();
259 }
260
261 Hook
262 make_hook(HookImpl::HandlerType const& handler) override
263 {
265 }
266
267 Counter
268 make_counter(std::string const& name) override
269 {
271 }
272
273 Event
274 make_event(std::string const& name) override
275 {
277 }
278
279 Gauge
280 make_gauge(std::string const& name) override
281 {
283 }
284
285 Meter
286 make_meter(std::string const& name) override
287 {
289 }
290
291 //--------------------------------------------------------------------------
292
293 void
295 {
297 metrics_.push_back(metric);
298 }
299
300 void
302 {
304 metrics_.erase(metrics_.iterator_to(metric));
305 }
306
307 //--------------------------------------------------------------------------
308
309 boost::asio::io_context&
311 {
312 return m_io_context;
313 }
314
315 std::string const&
316 prefix() const
317 {
318 return m_prefix;
319 }
320
321 void
323 {
324 m_data.emplace_back(buffer);
325 }
326
327 void
329 {
330 boost::asio::dispatch(
332 boost::asio::bind_executor(
333 m_strand, std::bind(&StatsDCollectorImp::do_post_buffer, this, std::move(buffer))));
334 }
335
336 // The keepAlive parameter makes sure the buffers sent to
337 // boost::asio::async_send do not go away until the call is finished
338 void
341 boost::system::error_code ec,
343 {
344 if (ec == boost::asio::error::operation_aborted)
345 return;
346
347 if (ec)
348 {
349 if (auto stream = m_journal.error())
350 stream << "async_send failed: " << ec.message();
351 return;
352 }
353 }
354
355 void
357 {
358 (void)buffers;
359#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
360 for (auto const& buffer : buffers)
361 {
362 std::string const s(buffer.data(), boost::asio::buffer_size(buffer));
363 std::cerr << s;
364 }
365 std::cerr << '\n';
366#endif
367 }
368
369 // Send what we have
370 void
372 {
373 if (m_data.empty())
374 return;
375
376 // Break up the array of strings into blocks
377 // that each fit into one UDP packet.
378 //
380 buffers.reserve(m_data.size());
381 std::size_t size(0);
382
383 auto keepAlive = std::make_shared<std::deque<std::string>>(std::move(m_data));
384 m_data.clear();
385
386 for (auto const& s : *keepAlive)
387 {
388 std::size_t const length(s.size());
389 XRPL_ASSERT(
390 !s.empty(),
391 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
392 "non-empty payload");
393 if (!buffers.empty() && (size + length) > max_packet_size)
394 {
395 log(buffers);
396 m_socket.async_send(
397 buffers,
398 std::bind(
400 this,
401 keepAlive,
402 std::placeholders::_1,
403 std::placeholders::_2));
404 buffers.clear();
405 size = 0;
406 }
407
408 buffers.emplace_back(&s[0], length);
409 size += length;
410 }
411
412 if (!buffers.empty())
413 {
414 log(buffers);
415 m_socket.async_send(
416 buffers,
417 std::bind(
419 this,
420 keepAlive,
421 std::placeholders::_1,
422 std::placeholders::_2));
423 }
424 }
425
426 void
428 {
429 using namespace std::chrono_literals;
430 m_timer.expires_after(1s);
431 m_timer.async_wait(std::bind(&StatsDCollectorImp::on_timer, this, std::placeholders::_1));
432 }
433
434 void
435 on_timer(boost::system::error_code ec)
436 {
437 if (ec == boost::asio::error::operation_aborted)
438 return;
439
440 if (ec)
441 {
442 if (auto stream = m_journal.error())
443 stream << "on_timer failed: " << ec.message();
444 return;
445 }
446
448
449 for (auto& m : metrics_)
450 m.do_process();
451
452 send_buffers();
453
454 set_timer();
455 }
456
457 void
459 {
460 boost::system::error_code ec;
461
462 if (m_socket.connect(to_endpoint(m_address), ec))
463 {
464 if (auto stream = m_journal.error())
465 stream << "Connect failed: " << ec.message();
466 return;
467 }
468
469 set_timer();
470
471 m_io_context.run();
472
473 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
474
475 m_socket.close();
476
477 m_io_context.poll();
478 }
479};
480
481//------------------------------------------------------------------------------
482
484 HandlerType const& handler,
486 : m_impl(impl), m_handler(handler)
487{
488 m_impl->add(*this);
489}
490
492{
493 m_impl->remove(*this);
494}
495
496void
501
502//------------------------------------------------------------------------------
503
505 std::string const& name,
507 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
508{
509 m_impl->add(*this);
510}
511
513{
514 m_impl->remove(*this);
515}
516
517void
519{
520 boost::asio::dispatch(
521 m_impl->get_io_context(),
522 std::bind(
525 amount));
526}
527
528void
530{
531 if (m_dirty)
532 {
533 m_dirty = false;
535 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
536 << "\n";
537 m_value = 0;
538 m_impl->post_buffer(ss.str());
539 }
540}
541
542void
544{
545 m_value += amount;
546 m_dirty = true;
547}
548
549void
554
555//------------------------------------------------------------------------------
556
558 std::string const& name,
560 : m_impl(impl), m_name(name)
561{
562}
563
564void
566{
567 boost::asio::dispatch(
568 m_impl->get_io_context(),
569 std::bind(
572 value));
573}
574
575void
577{
579 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
580 << "\n";
581 m_impl->post_buffer(ss.str());
582}
583
584//------------------------------------------------------------------------------
585
587 std::string const& name,
589 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
590{
591 m_impl->add(*this);
592}
593
595{
596 m_impl->remove(*this);
597}
598
599void
601{
602 boost::asio::dispatch(
603 m_impl->get_io_context(),
604 std::bind(
607 value));
608}
609
610void
612{
613 boost::asio::dispatch(
614 m_impl->get_io_context(),
615 std::bind(
618 amount));
619}
620
621void
623{
624 if (m_dirty)
625 {
626 m_dirty = false;
628 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
629 << "\n";
630 m_impl->post_buffer(ss.str());
631 }
632}
633
634void
636{
637 m_value = value;
638
639 if (m_value != m_last_value)
640 {
642 m_dirty = true;
643 }
644}
645
646void
648{
650
651 if (amount > 0)
652 {
653 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(amount));
656 : d;
657 }
658 else if (amount < 0)
659 {
660 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(-amount));
661 value = (d >= value) ? 0 : value - d;
662 }
663
664 do_set(value);
665}
666
667void
672
673//------------------------------------------------------------------------------
674
676 std::string const& name,
678 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
679{
680 m_impl->add(*this);
681}
682
684{
685 m_impl->remove(*this);
686}
687
688void
690{
691 boost::asio::dispatch(
692 m_impl->get_io_context(),
693 std::bind(
696 amount));
697}
698
699void
701{
702 if (m_dirty)
703 {
704 m_dirty = false;
706 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
707 << "\n";
708 m_value = 0;
709 m_impl->post_buffer(ss.str());
710 }
711}
712
713void
715{
716 m_value += amount;
717 m_dirty = true;
718}
719
720void
725
726} // namespace detail
727
728//------------------------------------------------------------------------------
729
731StatsDCollector::New(IP::Endpoint const& address, std::string const& prefix, Journal journal)
732{
733 return std::make_shared<detail::StatsDCollectorImp>(address, prefix, journal);
734}
735
736} // namespace insight
737} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:18
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:55
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:41
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Intrusive doubly linked list.
Definition List.h:258
A metric for measuring an integral value.
Definition Counter.h:19
A metric for reporting event timing.
Definition Event.h:21
A metric for measuring an integral value.
Definition Gauge.h:20
A reference to a handler for performing polled collection.
Definition Hook.h:12
A metric for measuring an integral value.
Definition Meter.h:18
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)