Compare commits

...

2 Commits

Author SHA1 Message Date
Alex Kremer
2e2740d4c5 feat: Published subscription message counters (#1618)
This PR adds counters to track the amount of published messages for each
subscription stream.
2024-08-29 16:48:04 +01:00
Sergey Kuznetsov
5004dc4e15 fix: Fix logging in SubscriptionSource (#1617)
For #1616. Later should be ported to develop as well.
2024-08-29 15:59:02 +01:00
10 changed files with 50 additions and 11 deletions

View File

@@ -314,6 +314,7 @@ LoadBalancer::getETLState() noexcept
void
LoadBalancer::chooseForwardingSource()
{
LOG(log_.info()) << "Choosing a new source to forward subscriptions";
hasForwardingSource_ = false;
for (auto& source : sources_) {
if (not hasForwardingSource_ and source->isConnected()) {

View File

@@ -46,7 +46,7 @@
namespace etl::impl {
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
: log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
: log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
{
try {
boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};

View File

@@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource(
std::chrono::steady_clock::duration const connectionTimeout,
std::chrono::steady_clock::duration const retryDelay
)
: log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
: log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
, wsConnectionBuilder_(ip, wsPort)
, validatedLedgers_(std::move(validatedLedgers))
, subscriptions_(std::move(subscriptions))
@@ -133,6 +133,7 @@ void
SubscriptionSource::setForwarding(bool isForwarding)
{
isForwarding_ = isForwarding;
LOG(log_.info()) << "Forwarding set to " << isForwarding_;
}
std::chrono::steady_clock::time_point
@@ -168,6 +169,7 @@ SubscriptionSource::subscribe()
wsConnection_ = std::move(connection).value();
isConnected_ = true;
onConnect_();
LOG(log_.info()) << "Connected";
auto const& subscribeCommand = getSubscribeCommandJson();
auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
@@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message)
auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
setValidatedRange(std::move(validatedLedgers));
}
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
<< object;
if (object.contains(JS(ledger_index))) {
ledgerIndex = object.at(JS(ledger_index)).as_int64();
}
@@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message)
// 2 - Validated transaction
// Only forward proposed transaction, validated transactions are sent by Clio itself
if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
subscriptions_->forwardProposedTransaction(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
LOG(log_.debug()) << "Forwarding validation: " << object;
subscriptions_->forwardValidation(object);
} else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
LOG(log_.debug()) << "Forwarding manifest: " << object;
subscriptions_->forwardManifest(object);
} else {
LOG(log_.error()) << "Unknown message: " << object;
}
}
}
@@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message)
return std::nullopt;
} catch (std::exception const& e) {
LOG(log_.error()) << "Exception in handleMessage : " << e.what();
LOG(log_.error()) << "Exception in handleMessage: " << e.what();
return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
}
}
@@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
isForwarding_ = false;
if (not stop_) {
onDisconnect_();
LOG(log_.info()) << "Disconnected";
}
if (wsConnection_ != nullptr) {
auto const err = wsConnection_->close(yield);
if (err) {
LOG(log_.error()) << "Error closing websocket connection: " << err->message();
}
wsConnection_->close(yield);
wsConnection_.reset();
}

View File

@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
notified_.clear();
signal_.emit(pubMsg);
// Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
// However, if the same connection subscribe both stream and account, it will still receive the message twice.
// notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
// acts like this.
notified_.clear();
for (auto const& account : affectedAccounts)
accountSignal_.emit(account, pubMsg);
++pubCount_.get();
});
}

View File

@@ -24,6 +24,7 @@
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
@@ -67,7 +69,7 @@ public:
: strand_(boost::asio::make_strand(ioContext))
, subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
, subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
, pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
{
}

View File

@@ -36,7 +36,10 @@
namespace feed::impl {
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
: strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
: strand_(boost::asio::make_strand(ioContext))
, subCount_(getSubscriptionsGaugeInt(name))
, pubCount_(getPublishedMessagesCounterInt(name))
, name_(name)
{
}
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
auto const msgPtr = std::make_shared<std::string>(std::move(msg));
signal_.emit(msgPtr);
++pubCount_.get();
});
}

View File

@@ -22,6 +22,7 @@
#include "feed/Types.hpp"
#include "feed/impl/TrackableSignal.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -40,6 +41,7 @@ namespace feed::impl {
class SingleFeedBase {
boost::asio::strand<boost::asio::io_context::executor_type> strand_;
std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
util::Logger logger_{"Subscriptions"};
std::string name_;

View File

@@ -284,23 +284,29 @@ TransactionFeed::pub(
affectedBooks = std::move(affectedBooks)]() {
notified_.clear();
signal_.emit(allVersionsMsgs);
// clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
// rippled SENDS the same message twice
notified_.clear();
txProposedsignal_.emit(allVersionsMsgs);
notified_.clear();
// check duplicate for account and proposed_account, this prevents sending the same message multiple times
// if it affects multiple accounts watched by the same connection
for (auto const& account : affectedAccounts) {
accountSignal_.emit(account, allVersionsMsgs);
accountProposedSignal_.emit(account, allVersionsMsgs);
}
notified_.clear();
// check duplicate for books, this prevents sending the same message multiple times if it affects multiple
// books watched by the same connection
for (auto const& book : affectedBooks) {
bookSignal_.emit(book, allVersionsMsgs);
}
++pubCount_.get();
}
);
}

View File

@@ -26,6 +26,7 @@
#include "feed/impl/TrackableSignalMap.hpp"
#include "feed/impl/Util.hpp"
#include "util/log/Logger.hpp"
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include <boost/asio/io_context.hpp>
@@ -67,6 +68,7 @@ class TransactionFeed {
std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
@@ -89,6 +91,7 @@ public:
, subAllCount_(getSubscriptionsGaugeInt("tx"))
, subAccountCount_(getSubscriptionsGaugeInt("account"))
, subBookCount_(getSubscriptionsGaugeInt("book"))
, pubCount_(getPublishedMessagesCounterInt("tx"))
{
}

View File

@@ -19,6 +19,7 @@
#pragma once
#include "util/prometheus/Counter.hpp"
#include "util/prometheus/Gauge.hpp"
#include "util/prometheus/Label.hpp"
#include "util/prometheus/Prometheus.hpp"
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
fmt::format("Current subscribers number on the {} stream", counterName)
);
}
inline util::prometheus::CounterInt&
getPublishedMessagesCounterInt(std::string const& counterName)
{
return PrometheusService::counterInt(
"subscriptions_published_count",
util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
fmt::format("Total published messages on the {} stream", counterName)
);
}
} // namespace feed::impl