mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
feat: Published subscription message counters (#1618)
This PR adds counters to track the amount of published messages for each subscription stream.
This commit is contained in:
@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
|
||||
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
|
||||
notified_.clear();
|
||||
signal_.emit(pubMsg);
|
||||
|
||||
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
|
||||
// However, if the same connection subscribe both stream and account, it will still receive the message twice.
|
||||
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
|
||||
// acts like this.
|
||||
notified_.clear();
|
||||
|
||||
for (auto const& account : affectedAccounts)
|
||||
accountSignal_.emit(account, pubMsg);
|
||||
|
||||
++pubCount_.get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
#include "feed/impl/TrackableSignalMap.hpp"
|
||||
#include "feed/impl/Util.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
#include "util/prometheus/Gauge.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
|
||||
|
||||
TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
|
||||
TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
|
||||
@@ -67,7 +69,7 @@ public:
|
||||
: strand_(boost::asio::make_strand(ioContext))
|
||||
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
|
||||
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
|
||||
|
||||
, pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -36,7 +36,10 @@
|
||||
namespace feed::impl {
|
||||
|
||||
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
|
||||
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
|
||||
: strand_(boost::asio::make_strand(ioContext))
|
||||
, subCount_(getSubscriptionsGaugeInt(name))
|
||||
, pubCount_(getPublishedMessagesCounterInt(name))
|
||||
, name_(name)
|
||||
{
|
||||
}
|
||||
|
||||
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
|
||||
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
|
||||
auto const msgPtr = std::make_shared<std::string>(std::move(msg));
|
||||
signal_.emit(msgPtr);
|
||||
++pubCount_.get();
|
||||
});
|
||||
}
|
||||
|
||||
|
||||
@@ -22,6 +22,7 @@
|
||||
#include "feed/Types.hpp"
|
||||
#include "feed/impl/TrackableSignal.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
#include "util/prometheus/Gauge.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -40,6 +41,7 @@ namespace feed::impl {
|
||||
class SingleFeedBase {
|
||||
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
|
||||
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
|
||||
util::Logger logger_{"Subscriptions"};
|
||||
std::string name_;
|
||||
|
||||
@@ -284,23 +284,29 @@ TransactionFeed::pub(
|
||||
affectedBooks = std::move(affectedBooks)]() {
|
||||
notified_.clear();
|
||||
signal_.emit(allVersionsMsgs);
|
||||
|
||||
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
|
||||
// rippled SENDS the same message twice
|
||||
notified_.clear();
|
||||
txProposedsignal_.emit(allVersionsMsgs);
|
||||
notified_.clear();
|
||||
|
||||
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
|
||||
// if it affects multiple accounts watched by the same connection
|
||||
for (auto const& account : affectedAccounts) {
|
||||
accountSignal_.emit(account, allVersionsMsgs);
|
||||
accountProposedSignal_.emit(account, allVersionsMsgs);
|
||||
}
|
||||
|
||||
notified_.clear();
|
||||
|
||||
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
|
||||
// books watched by the same connection
|
||||
for (auto const& book : affectedBooks) {
|
||||
bookSignal_.emit(book, allVersionsMsgs);
|
||||
}
|
||||
|
||||
++pubCount_.get();
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
@@ -26,6 +26,7 @@
|
||||
#include "feed/impl/TrackableSignalMap.hpp"
|
||||
#include "feed/impl/Util.hpp"
|
||||
#include "util/log/Logger.hpp"
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
#include "util/prometheus/Gauge.hpp"
|
||||
|
||||
#include <boost/asio/io_context.hpp>
|
||||
@@ -67,6 +68,7 @@ class TransactionFeed {
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
|
||||
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
|
||||
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
|
||||
|
||||
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
|
||||
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
|
||||
@@ -89,6 +91,7 @@ public:
|
||||
, subAllCount_(getSubscriptionsGaugeInt("tx"))
|
||||
, subAccountCount_(getSubscriptionsGaugeInt("account"))
|
||||
, subBookCount_(getSubscriptionsGaugeInt("book"))
|
||||
, pubCount_(getPublishedMessagesCounterInt("tx"))
|
||||
{
|
||||
}
|
||||
|
||||
|
||||
@@ -19,6 +19,7 @@
|
||||
|
||||
#pragma once
|
||||
|
||||
#include "util/prometheus/Counter.hpp"
|
||||
#include "util/prometheus/Gauge.hpp"
|
||||
#include "util/prometheus/Label.hpp"
|
||||
#include "util/prometheus/Prometheus.hpp"
|
||||
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
|
||||
fmt::format("Current subscribers number on the {} stream", counterName)
|
||||
);
|
||||
}
|
||||
|
||||
inline util::prometheus::CounterInt&
|
||||
getPublishedMessagesCounterInt(std::string const& counterName)
|
||||
{
|
||||
return PrometheusService::counterInt(
|
||||
"subscriptions_published_count",
|
||||
util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
|
||||
fmt::format("Total published messages on the {} stream", counterName)
|
||||
);
|
||||
}
|
||||
} // namespace feed::impl
|
||||
|
||||
Reference in New Issue
Block a user