chore: Add counter for total messages waiting to be sent (#1691)

This commit is contained in:
cyan317
2024-10-16 17:06:27 +01:00
committed by Alex Kremer
parent 2951b4aaa0
commit e2aeaa0956
2 changed files with 76 additions and 16 deletions

View File

@@ -23,6 +23,9 @@
#include "rpc/common/Types.hpp"
#include "util/Taggable.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
#include "web/dosguard/DOSGuardInterface.hpp"
#include "web/interface/Concepts.hpp"
#include "web/interface/ConnectionBase.hpp"
@@ -71,6 +74,8 @@ template <template <typename> typename Derived, SomeServerHandler HandlerType>
class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
std::reference_wrapper<util::prometheus::GaugeInt> messagesLength_;
boost::beast::flat_buffer buffer_;
std::reference_wrapper<dosguard::DOSGuardInterface> dosGuard_;
bool sending_ = false;
@@ -103,15 +108,26 @@ public:
std::shared_ptr<HandlerType> const& handler,
boost::beast::flat_buffer&& buffer
)
: ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler)
: ConnectionBase(tagFactory, ip)
, messagesLength_(PrometheusService::gaugeInt(
"ws_messages_length",
util::prometheus::Labels(),
"The total length of messages in the queue"
))
, buffer_(std::move(buffer))
, dosGuard_(dosGuard)
, handler_(handler)
{
upgraded = true; // NOLINT (cppcoreguidelines-pro-type-member-init)
LOG(perfLog_.debug()) << tag() << "session created";
}
~WsBase() override
{
LOG(perfLog_.debug()) << tag() << "session closed";
if (!messages_.empty())
messagesLength_.get() -= messages_.size();
dosGuard_.get().decrement(clientIp);
}
@@ -134,7 +150,9 @@ public:
void
onWrite(boost::system::error_code ec, std::size_t)
{
LOG(perfLog_.info()) << tag() << "xinmeng sent message";
messages_.pop();
--messagesLength_.get();
sending_ = false;
if (ec) {
wsFail(ec, "Failed to write");
@@ -165,6 +183,7 @@ public:
derived().ws().get_executor(),
[this, self = derived().shared_from_this(), msg = std::move(msg)]() {
messages_.push(msg);
++messagesLength_.get();
maybeSendNext();
}
);