rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1#include <xrpl/beast/core/List.h>
2#include <xrpl/beast/insight/CounterImpl.h>
3#include <xrpl/beast/insight/EventImpl.h>
4#include <xrpl/beast/insight/GaugeImpl.h>
5#include <xrpl/beast/insight/Hook.h>
6#include <xrpl/beast/insight/HookImpl.h>
7#include <xrpl/beast/insight/MeterImpl.h>
8#include <xrpl/beast/insight/StatsDCollector.h>
9#include <xrpl/beast/net/IPEndpoint.h>
10#include <xrpl/beast/utility/Journal.h>
11#include <xrpl/beast/utility/instrumentation.h>
12
13#include <boost/asio/basic_waitable_timer.hpp>
14#include <boost/asio/bind_executor.hpp>
15#include <boost/asio/buffer.hpp>
16#include <boost/asio/error.hpp>
17#include <boost/asio/executor_work_guard.hpp>
18#include <boost/asio/io_context.hpp>
19#include <boost/asio/ip/udp.hpp>
20#include <boost/asio/strand.hpp>
21#include <boost/system/detail/error_code.hpp>
22
23#include <chrono>
24#include <cstddef>
25#include <deque>
26#include <functional>
27#include <limits>
28#include <memory>
29#include <mutex>
30#include <optional>
31#include <sstream>
32#include <string>
33#include <thread>
34#include <utility>
35#include <vector>
36
37#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
38#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
39#endif
40
41namespace beast {
42namespace insight {
43
44namespace detail {
45
46class StatsDCollectorImp;
47
48//------------------------------------------------------------------------------
49
50class StatsDMetricBase : public List<StatsDMetricBase>::Node
51{
52public:
53 virtual void
55 virtual ~StatsDMetricBase() = default;
56 StatsDMetricBase() = default;
59 operator=(StatsDMetricBase const&) = delete;
60};
61
62//------------------------------------------------------------------------------
63
65{
66public:
68
69 ~StatsDHookImpl() override;
70
71 void
72 do_process() override;
73
74private:
77
80};
81
82//------------------------------------------------------------------------------
83
85{
86public:
88
89 ~StatsDCounterImpl() override;
90
91 void
92 increment(CounterImpl::value_type amount) override;
93
94 void
95 flush();
96 void
98 void
99 do_process() override;
100
101private:
104
109};
110
111//------------------------------------------------------------------------------
112
114{
115public:
117
118 ~StatsDEventImpl() = default;
119
120 void
121 notify(EventImpl::value_type const& value) override;
122
123 void
124 do_notify(EventImpl::value_type const& value);
125 void
127
128private:
131
134};
135
136//------------------------------------------------------------------------------
137
139{
140public:
142
143 ~StatsDGaugeImpl() override;
144
145 void
146 set(GaugeImpl::value_type value) override;
147 void
148 increment(GaugeImpl::difference_type amount) override;
149
150 void
151 flush();
152 void
154 void
156 void
157 do_process() override;
158
159private:
162
168};
169
170//------------------------------------------------------------------------------
171
173{
174public:
175 explicit StatsDMeterImpl(std::string const& name, std::shared_ptr<StatsDCollectorImp> const& impl);
176
177 ~StatsDMeterImpl() override;
178
179 void
180 increment(MeterImpl::value_type amount) override;
181
182 void
183 flush();
184 void
186 void
187 do_process() override;
188
189private:
192
197};
198
199//------------------------------------------------------------------------------
200
201class StatsDCollectorImp : public StatsDCollector, public std::enable_shared_from_this<StatsDCollectorImp>
202{
203private:
204 enum {
205 // max_packet_size = 484
206 max_packet_size = 1472
207 };
208
212 boost::asio::io_context m_io_context;
214 boost::asio::strand<boost::asio::io_context::executor_type> m_strand;
215 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
216 boost::asio::ip::udp::socket m_socket;
220
221 // Must come last for order of init
223
224 static boost::asio::ip::udp::endpoint
226 {
227 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
228 }
229
230public:
232 : m_journal(journal)
233 , m_address(address)
235 , m_work(boost::asio::make_work_guard(m_io_context))
236 , m_strand(boost::asio::make_strand(m_io_context))
240 {
241 }
242
244 {
245 try
246 {
247 m_timer.cancel();
248 }
249 catch (boost::system::system_error const&)
250 {
251 // ignored
252 }
253
254 m_work.reset();
255 m_thread.join();
256 }
257
258 Hook
259 make_hook(HookImpl::HandlerType const& handler) override
260 {
262 }
263
264 Counter
265 make_counter(std::string const& name) override
266 {
268 }
269
270 Event
271 make_event(std::string const& name) override
272 {
274 }
275
276 Gauge
277 make_gauge(std::string const& name) override
278 {
280 }
281
282 Meter
283 make_meter(std::string const& name) override
284 {
286 }
287
288 //--------------------------------------------------------------------------
289
290 void
292 {
294 metrics_.push_back(metric);
295 }
296
297 void
299 {
301 metrics_.erase(metrics_.iterator_to(metric));
302 }
303
304 //--------------------------------------------------------------------------
305
306 boost::asio::io_context&
308 {
309 return m_io_context;
310 }
311
312 std::string const&
313 prefix() const
314 {
315 return m_prefix;
316 }
317
318 void
320 {
321 m_data.emplace_back(buffer);
322 }
323
324 void
326 {
327 boost::asio::dispatch(
329 boost::asio::bind_executor(
330 m_strand, std::bind(&StatsDCollectorImp::do_post_buffer, this, std::move(buffer))));
331 }
332
333 // The keepAlive parameter makes sure the buffers sent to
334 // boost::asio::async_send do not go away until the call is finished
335 void
336 on_send(std::shared_ptr<std::deque<std::string>> /*keepAlive*/, boost::system::error_code ec, std::size_t)
337 {
338 if (ec == boost::asio::error::operation_aborted)
339 return;
340
341 if (ec)
342 {
343 if (auto stream = m_journal.error())
344 stream << "async_send failed: " << ec.message();
345 return;
346 }
347 }
348
349 void
351 {
352 (void)buffers;
353#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
354 for (auto const& buffer : buffers)
355 {
356 std::string const s(buffer.data(), boost::asio::buffer_size(buffer));
357 std::cerr << s;
358 }
359 std::cerr << '\n';
360#endif
361 }
362
363 // Send what we have
364 void
366 {
367 if (m_data.empty())
368 return;
369
370 // Break up the array of strings into blocks
371 // that each fit into one UDP packet.
372 //
374 buffers.reserve(m_data.size());
375 std::size_t size(0);
376
377 auto keepAlive = std::make_shared<std::deque<std::string>>(std::move(m_data));
378 m_data.clear();
379
380 for (auto const& s : *keepAlive)
381 {
382 std::size_t const length(s.size());
383 XRPL_ASSERT(
384 !s.empty(),
385 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
386 "non-empty payload");
387 if (!buffers.empty() && (size + length) > max_packet_size)
388 {
389 log(buffers);
390 m_socket.async_send(
391 buffers,
392 std::bind(
393 &StatsDCollectorImp::on_send, this, keepAlive, std::placeholders::_1, std::placeholders::_2));
394 buffers.clear();
395 size = 0;
396 }
397
398 buffers.emplace_back(&s[0], length);
399 size += length;
400 }
401
402 if (!buffers.empty())
403 {
404 log(buffers);
405 m_socket.async_send(
406 buffers,
407 std::bind(&StatsDCollectorImp::on_send, this, keepAlive, std::placeholders::_1, std::placeholders::_2));
408 }
409 }
410
411 void
413 {
414 using namespace std::chrono_literals;
415 m_timer.expires_after(1s);
416 m_timer.async_wait(std::bind(&StatsDCollectorImp::on_timer, this, std::placeholders::_1));
417 }
418
419 void
420 on_timer(boost::system::error_code ec)
421 {
422 if (ec == boost::asio::error::operation_aborted)
423 return;
424
425 if (ec)
426 {
427 if (auto stream = m_journal.error())
428 stream << "on_timer failed: " << ec.message();
429 return;
430 }
431
433
434 for (auto& m : metrics_)
435 m.do_process();
436
437 send_buffers();
438
439 set_timer();
440 }
441
442 void
444 {
445 boost::system::error_code ec;
446
447 if (m_socket.connect(to_endpoint(m_address), ec))
448 {
449 if (auto stream = m_journal.error())
450 stream << "Connect failed: " << ec.message();
451 return;
452 }
453
454 set_timer();
455
456 m_io_context.run();
457
458 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
459
460 m_socket.close();
461
462 m_io_context.poll();
463 }
464};
465
466//------------------------------------------------------------------------------
467
469 : m_impl(impl), m_handler(handler)
470{
471 m_impl->add(*this);
472}
473
475{
476 m_impl->remove(*this);
477}
478
479void
484
485//------------------------------------------------------------------------------
486
488 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
489{
490 m_impl->add(*this);
491}
492
494{
495 m_impl->remove(*this);
496}
497
498void
506
507void
509{
510 if (m_dirty)
511 {
512 m_dirty = false;
514 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
515 << "\n";
516 m_value = 0;
517 m_impl->post_buffer(ss.str());
518 }
519}
520
521void
523{
524 m_value += amount;
525 m_dirty = true;
526}
527
528void
533
534//------------------------------------------------------------------------------
535
537 : m_impl(impl), m_name(name)
538{
539}
540
541void
543{
544 boost::asio::dispatch(
545 m_impl->get_io_context(),
547}
548
549void
551{
553 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
554 << "\n";
555 m_impl->post_buffer(ss.str());
556}
557
558//------------------------------------------------------------------------------
559
561 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
562{
563 m_impl->add(*this);
564}
565
567{
568 m_impl->remove(*this);
569}
570
571void
578
579void
587
588void
590{
591 if (m_dirty)
592 {
593 m_dirty = false;
595 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
596 << "\n";
597 m_impl->post_buffer(ss.str());
598 }
599}
600
601void
603{
604 m_value = value;
605
606 if (m_value != m_last_value)
607 {
609 m_dirty = true;
610 }
611}
612
613void
615{
617
618 if (amount > 0)
619 {
620 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(amount));
623 : d;
624 }
625 else if (amount < 0)
626 {
627 GaugeImpl::value_type const d(static_cast<GaugeImpl::value_type>(-amount));
628 value = (d >= value) ? 0 : value - d;
629 }
630
631 do_set(value);
632}
633
634void
639
640//------------------------------------------------------------------------------
641
643 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
644{
645 m_impl->add(*this);
646}
647
649{
650 m_impl->remove(*this);
651}
652
653void
661
662void
664{
665 if (m_dirty)
666 {
667 m_dirty = false;
669 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
670 << "\n";
671 m_value = 0;
672 m_impl->post_buffer(ss.str());
673 }
674}
675
676void
678{
679 m_value += amount;
680 m_dirty = true;
681}
682
683void
688
689} // namespace detail
690
691//------------------------------------------------------------------------------
692
694StatsDCollector::New(IP::Endpoint const& address, std::string const& prefix, Journal journal)
695{
696 return std::make_shared<detail::StatsDCollectorImp>(address, prefix, journal);
697}
698
699} // namespace insight
700} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition IPEndpoint.h:19
Address const & address() const
Returns the address portion of this endpoint.
Definition IPEndpoint.h:56
Port port() const
Returns the port number on the endpoint.
Definition IPEndpoint.h:42
A generic endpoint for log messages.
Definition Journal.h:41
Stream error() const
Definition Journal.h:319
Intrusive doubly linked list.
Definition List.h:259
A metric for measuring an integral value.
Definition Counter.h:20
A metric for reporting event timing.
Definition Event.h:22
A metric for measuring an integral value.
Definition Gauge.h:21
A reference to a handler for performing polled collection.
Definition Hook.h:13
A metric for measuring an integral value.
Definition Meter.h:19
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
std::optional< boost::asio::executor_work_guard< boost::asio::io_context::executor_type > > m_work
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::strand< boost::asio::io_context::executor_type > m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T is_same_v
T join(T... args)
T max(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)