rippled
Loading...
Searching...
No Matches
StatsDCollector.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of Beast: https://github.com/vinniefalco/Beast
4 Copyright 2013, Vinnie Falco <vinnie.falco@gmail.com>
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpl/beast/core/List.h>
21#include <xrpl/beast/insight/CounterImpl.h>
22#include <xrpl/beast/insight/EventImpl.h>
23#include <xrpl/beast/insight/GaugeImpl.h>
24#include <xrpl/beast/insight/HookImpl.h>
25#include <xrpl/beast/insight/MeterImpl.h>
26#include <xrpl/beast/insight/StatsDCollector.h>
27#include <xrpl/beast/net/IPAddressConversion.h>
28#include <xrpl/beast/utility/instrumentation.h>
29#include <boost/asio/ip/tcp.hpp>
30#include <climits>
31#include <deque>
32#include <functional>
33#include <mutex>
34#include <optional>
35#include <set>
36#include <sstream>
37#include <thread>
38
39#ifndef BEAST_STATSDCOLLECTOR_TRACING_ENABLED
40#define BEAST_STATSDCOLLECTOR_TRACING_ENABLED 0
41#endif
42
43namespace beast {
44namespace insight {
45
46namespace detail {
47
48class StatsDCollectorImp;
49
50//------------------------------------------------------------------------------
51
52class StatsDMetricBase : public List<StatsDMetricBase>::Node
53{
54public:
55 virtual void
57 virtual ~StatsDMetricBase() = default;
58 StatsDMetricBase() = default;
61 operator=(StatsDMetricBase const&) = delete;
62};
63
64//------------------------------------------------------------------------------
65
67{
68public:
70 HandlerType const& handler,
72
73 ~StatsDHookImpl() override;
74
75 void
76 do_process() override;
77
78private:
81
84};
85
86//------------------------------------------------------------------------------
87
89{
90public:
92 std::string const& name,
94
95 ~StatsDCounterImpl() override;
96
97 void
98 increment(CounterImpl::value_type amount) override;
99
100 void
101 flush();
102 void
104 void
105 do_process() override;
106
107private:
110
115};
116
117//------------------------------------------------------------------------------
118
120{
121public:
123 std::string const& name,
125
126 ~StatsDEventImpl() = default;
127
128 void
129 notify(EventImpl::value_type const& value) override;
130
131 void
132 do_notify(EventImpl::value_type const& value);
133 void
135
136private:
139
142};
143
144//------------------------------------------------------------------------------
145
147{
148public:
150 std::string const& name,
152
153 ~StatsDGaugeImpl() override;
154
155 void
156 set(GaugeImpl::value_type value) override;
157 void
158 increment(GaugeImpl::difference_type amount) override;
159
160 void
161 flush();
162 void
164 void
166 void
167 do_process() override;
168
169private:
172
178};
179
180//------------------------------------------------------------------------------
181
183{
184public:
185 explicit StatsDMeterImpl(
186 std::string const& name,
188
189 ~StatsDMeterImpl() override;
190
191 void
192 increment(MeterImpl::value_type amount) override;
193
194 void
195 flush();
196 void
198 void
199 do_process() override;
200
201private:
204
209};
210
211//------------------------------------------------------------------------------
212
214 : public StatsDCollector,
215 public std::enable_shared_from_this<StatsDCollectorImp>
216{
217private:
218 enum {
219 // max_packet_size = 484
220 max_packet_size = 1472
221 };
222
226 boost::asio::io_service m_io_service;
228 boost::asio::io_service::strand m_strand;
229 boost::asio::basic_waitable_timer<std::chrono::steady_clock> m_timer;
230 boost::asio::ip::udp::socket m_socket;
234
235 // Must come last for order of init
237
238 static boost::asio::ip::udp::endpoint
240 {
241 return boost::asio::ip::udp::endpoint(ep.address(), ep.port());
242 }
243
244public:
246 IP::Endpoint const& address,
247 std::string const& prefix,
248 Journal journal)
249 : m_journal(journal)
250 , m_address(address)
252 , m_work(std::ref(m_io_service))
257 {
258 }
259
261 {
262 boost::system::error_code ec;
263 m_timer.cancel(ec);
264
265 m_work.reset();
266 m_thread.join();
267 }
268
269 Hook
270 make_hook(HookImpl::HandlerType const& handler) override
271 {
272 return Hook(std::make_shared<detail::StatsDHookImpl>(
273 handler, shared_from_this()));
274 }
275
276 Counter
277 make_counter(std::string const& name) override
278 {
279 return Counter(std::make_shared<detail::StatsDCounterImpl>(
280 name, shared_from_this()));
281 }
282
283 Event
284 make_event(std::string const& name) override
285 {
286 return Event(std::make_shared<detail::StatsDEventImpl>(
287 name, shared_from_this()));
288 }
289
290 Gauge
291 make_gauge(std::string const& name) override
292 {
293 return Gauge(std::make_shared<detail::StatsDGaugeImpl>(
294 name, shared_from_this()));
295 }
296
297 Meter
298 make_meter(std::string const& name) override
299 {
300 return Meter(std::make_shared<detail::StatsDMeterImpl>(
301 name, shared_from_this()));
302 }
303
304 //--------------------------------------------------------------------------
305
306 void
308 {
310 metrics_.push_back(metric);
311 }
312
313 void
315 {
317 metrics_.erase(metrics_.iterator_to(metric));
318 }
319
320 //--------------------------------------------------------------------------
321
322 boost::asio::io_service&
324 {
325 return m_io_service;
326 }
327
328 std::string const&
329 prefix() const
330 {
331 return m_prefix;
332 }
333
334 void
336 {
337 m_data.emplace_back(buffer);
338 }
339
340 void
342 {
343 m_io_service.dispatch(m_strand.wrap(std::bind(
344 &StatsDCollectorImp::do_post_buffer, this, std::move(buffer))));
345 }
346
347 // The keepAlive parameter makes sure the buffers sent to
348 // boost::asio::async_send do not go away until the call is finished
349 void
352 boost::system::error_code ec,
354 {
355 if (ec == boost::asio::error::operation_aborted)
356 return;
357
358 if (ec)
359 {
360 if (auto stream = m_journal.error())
361 stream << "async_send failed: " << ec.message();
362 return;
363 }
364 }
365
366 void
368 {
369 (void)buffers;
370#if BEAST_STATSDCOLLECTOR_TRACING_ENABLED
371 for (auto const& buffer : buffers)
372 {
373 std::string const s(
374 boost::asio::buffer_cast<char const*>(buffer),
375 boost::asio::buffer_size(buffer));
376 std::cerr << s;
377 }
378 std::cerr << '\n';
379#endif
380 }
381
382 // Send what we have
383 void
385 {
386 if (m_data.empty())
387 return;
388
389 // Break up the array of strings into blocks
390 // that each fit into one UDP packet.
391 //
393 buffers.reserve(m_data.size());
394 std::size_t size(0);
395
396 auto keepAlive =
397 std::make_shared<std::deque<std::string>>(std::move(m_data));
398 m_data.clear();
399
400 for (auto const& s : *keepAlive)
401 {
402 std::size_t const length(s.size());
403 XRPL_ASSERT(
404 !s.empty(),
405 "beast::insight::detail::StatsDCollectorImp::send_buffers : "
406 "non-empty payload");
407 if (!buffers.empty() && (size + length) > max_packet_size)
408 {
409 log(buffers);
410 m_socket.async_send(
411 buffers,
412 std::bind(
414 this,
415 keepAlive,
416 std::placeholders::_1,
417 std::placeholders::_2));
418 buffers.clear();
419 size = 0;
420 }
421
422 buffers.emplace_back(&s[0], length);
423 size += length;
424 }
425
426 if (!buffers.empty())
427 {
428 log(buffers);
429 m_socket.async_send(
430 buffers,
431 std::bind(
433 this,
434 keepAlive,
435 std::placeholders::_1,
436 std::placeholders::_2));
437 }
438 }
439
440 void
442 {
443 using namespace std::chrono_literals;
444 m_timer.expires_from_now(1s);
445 m_timer.async_wait(std::bind(
446 &StatsDCollectorImp::on_timer, this, std::placeholders::_1));
447 }
448
449 void
450 on_timer(boost::system::error_code ec)
451 {
452 if (ec == boost::asio::error::operation_aborted)
453 return;
454
455 if (ec)
456 {
457 if (auto stream = m_journal.error())
458 stream << "on_timer failed: " << ec.message();
459 return;
460 }
461
463
464 for (auto& m : metrics_)
465 m.do_process();
466
467 send_buffers();
468
469 set_timer();
470 }
471
472 void
474 {
475 boost::system::error_code ec;
476
477 if (m_socket.connect(to_endpoint(m_address), ec))
478 {
479 if (auto stream = m_journal.error())
480 stream << "Connect failed: " << ec.message();
481 return;
482 }
483
484 set_timer();
485
486 m_io_service.run();
487
488 m_socket.shutdown(boost::asio::ip::udp::socket::shutdown_send, ec);
489
490 m_socket.close();
491
492 m_io_service.poll();
493 }
494};
495
496//------------------------------------------------------------------------------
497
499 HandlerType const& handler,
501 : m_impl(impl), m_handler(handler)
502{
503 m_impl->add(*this);
504}
505
507{
508 m_impl->remove(*this);
509}
510
511void
513{
514 m_handler();
515}
516
517//------------------------------------------------------------------------------
518
520 std::string const& name,
522 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
523{
524 m_impl->add(*this);
525}
526
528{
529 m_impl->remove(*this);
530}
531
532void
534{
535 m_impl->get_io_service().dispatch(std::bind(
537 std::static_pointer_cast<StatsDCounterImpl>(shared_from_this()),
538 amount));
539}
540
541void
543{
544 if (m_dirty)
545 {
546 m_dirty = false;
548 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|c"
549 << "\n";
550 m_value = 0;
551 m_impl->post_buffer(ss.str());
552 }
553}
554
555void
557{
558 m_value += amount;
559 m_dirty = true;
560}
561
562void
564{
565 flush();
566}
567
568//------------------------------------------------------------------------------
569
571 std::string const& name,
573 : m_impl(impl), m_name(name)
574{
575}
576
577void
579{
580 m_impl->get_io_service().dispatch(std::bind(
582 std::static_pointer_cast<StatsDEventImpl>(shared_from_this()),
583 value));
584}
585
586void
588{
590 ss << m_impl->prefix() << "." << m_name << ":" << value.count() << "|ms"
591 << "\n";
592 m_impl->post_buffer(ss.str());
593}
594
595//------------------------------------------------------------------------------
596
598 std::string const& name,
600 : m_impl(impl), m_name(name), m_last_value(0), m_value(0), m_dirty(false)
601{
602 m_impl->add(*this);
603}
604
606{
607 m_impl->remove(*this);
608}
609
610void
612{
613 m_impl->get_io_service().dispatch(std::bind(
615 std::static_pointer_cast<StatsDGaugeImpl>(shared_from_this()),
616 value));
617}
618
619void
621{
622 m_impl->get_io_service().dispatch(std::bind(
624 std::static_pointer_cast<StatsDGaugeImpl>(shared_from_this()),
625 amount));
626}
627
628void
630{
631 if (m_dirty)
632 {
633 m_dirty = false;
635 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|g"
636 << "\n";
637 m_impl->post_buffer(ss.str());
638 }
639}
640
641void
643{
644 m_value = value;
645
646 if (m_value != m_last_value)
647 {
649 m_dirty = true;
650 }
651}
652
653void
655{
657
658 if (amount > 0)
659 {
660 GaugeImpl::value_type const d(
661 static_cast<GaugeImpl::value_type>(amount));
662 value +=
665 : d;
666 }
667 else if (amount < 0)
668 {
669 GaugeImpl::value_type const d(
670 static_cast<GaugeImpl::value_type>(-amount));
671 value = (d >= value) ? 0 : value - d;
672 }
673
674 do_set(value);
675}
676
677void
679{
680 flush();
681}
682
683//------------------------------------------------------------------------------
684
686 std::string const& name,
688 : m_impl(impl), m_name(name), m_value(0), m_dirty(false)
689{
690 m_impl->add(*this);
691}
692
694{
695 m_impl->remove(*this);
696}
697
698void
700{
701 m_impl->get_io_service().dispatch(std::bind(
703 std::static_pointer_cast<StatsDMeterImpl>(shared_from_this()),
704 amount));
705}
706
707void
709{
710 if (m_dirty)
711 {
712 m_dirty = false;
714 ss << m_impl->prefix() << "." << m_name << ":" << m_value << "|m"
715 << "\n";
716 m_value = 0;
717 m_impl->post_buffer(ss.str());
718 }
719}
720
721void
723{
724 m_value += amount;
725 m_dirty = true;
726}
727
728void
730{
731 flush();
732}
733
734} // namespace detail
735
736//------------------------------------------------------------------------------
737
740 IP::Endpoint const& address,
741 std::string const& prefix,
742 Journal journal)
743{
744 return std::make_shared<detail::StatsDCollectorImp>(
745 address, prefix, journal);
746}
747
748} // namespace insight
749} // namespace beast
T bind(T... args)
A version-independent IP address and port combination.
Definition: IPEndpoint.h:39
Address const & address() const
Returns the address portion of this endpoint.
Definition: IPEndpoint.h:76
Port port() const
Returns the port number on the endpoint.
Definition: IPEndpoint.h:62
A generic endpoint for log messages.
Definition: Journal.h:59
Stream error() const
Definition: Journal.h:335
Intrusive doubly linked list.
Definition: List.h:280
A metric for measuring an integral value.
Definition: Counter.h:39
A metric for reporting event timing.
Definition: Event.h:41
A metric for measuring an integral value.
Definition: Gauge.h:40
A reference to a handler for performing polled collection.
Definition: Hook.h:32
A metric for measuring an integral value.
Definition: Meter.h:38
A Collector that reports metrics to a StatsD server.
static std::shared_ptr< StatsDCollector > New(IP::Endpoint const &address, std::string const &prefix, Journal journal)
Create a StatsD collector.
void remove(StatsDMetricBase &metric)
static boost::asio::ip::udp::endpoint to_endpoint(IP::Endpoint const &ep)
Counter make_counter(std::string const &name) override
Create a counter with the specified name.
void do_post_buffer(std::string const &buffer)
std::optional< boost::asio::io_service::work > m_work
StatsDCollectorImp(IP::Endpoint const &address, std::string const &prefix, Journal journal)
boost::asio::io_service & get_io_service()
Meter make_meter(std::string const &name) override
Create a meter with the specified name.
Event make_event(std::string const &name) override
Create an event with the specified name.
void on_timer(boost::system::error_code ec)
Hook make_hook(HookImpl::HandlerType const &handler) override
boost::asio::basic_waitable_timer< std::chrono::steady_clock > m_timer
void log(std::vector< boost::asio::const_buffer > const &buffers)
Gauge make_gauge(std::string const &name) override
Create a gauge with the specified name.
void on_send(std::shared_ptr< std::deque< std::string > >, boost::system::error_code ec, std::size_t)
boost::asio::io_service::strand m_strand
void do_increment(CounterImpl::value_type amount)
StatsDCounterImpl & operator=(StatsDCounterImpl const &)
void increment(CounterImpl::value_type amount) override
StatsDCounterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
void notify(EventImpl::value_type const &value) override
StatsDEventImpl & operator=(StatsDEventImpl const &)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDEventImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
void do_notify(EventImpl::value_type const &value)
void do_increment(GaugeImpl::difference_type amount)
void increment(GaugeImpl::difference_type amount) override
std::shared_ptr< StatsDCollectorImp > m_impl
void set(GaugeImpl::value_type value) override
void do_set(GaugeImpl::value_type value)
StatsDGaugeImpl & operator=(StatsDGaugeImpl const &)
StatsDGaugeImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDHookImpl & operator=(StatsDHookImpl const &)
StatsDHookImpl(HandlerType const &handler, std::shared_ptr< StatsDCollectorImp > const &impl)
StatsDMeterImpl & operator=(StatsDMeterImpl const &)
void increment(MeterImpl::value_type amount) override
void do_increment(MeterImpl::value_type amount)
StatsDMeterImpl(std::string const &name, std::shared_ptr< StatsDCollectorImp > const &impl)
std::shared_ptr< StatsDCollectorImp > m_impl
StatsDMetricBase & operator=(StatsDMetricBase const &)=delete
StatsDMetricBase(StatsDMetricBase const &)=delete
T clear(T... args)
T emplace_back(T... args)
T empty(T... args)
T join(T... args)
T max(T... args)
STL namespace.
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)