mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					2e2740d4c5 | ||
| 
						 | 
					5004dc4e15 | 
@@ -314,6 +314,7 @@ LoadBalancer::getETLState() noexcept
 | 
			
		||||
void
 | 
			
		||||
LoadBalancer::chooseForwardingSource()
 | 
			
		||||
{
 | 
			
		||||
    LOG(log_.info()) << "Choosing a new source to forward subscriptions";
 | 
			
		||||
    hasForwardingSource_ = false;
 | 
			
		||||
    for (auto& source : sources_) {
 | 
			
		||||
        if (not hasForwardingSource_ and source->isConnected()) {
 | 
			
		||||
 
 | 
			
		||||
@@ -46,7 +46,7 @@
 | 
			
		||||
namespace etl::impl {
 | 
			
		||||
 | 
			
		||||
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
 | 
			
		||||
    : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
 | 
			
		||||
    : log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
 | 
			
		||||
{
 | 
			
		||||
    try {
 | 
			
		||||
        boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};
 | 
			
		||||
 
 | 
			
		||||
@@ -69,7 +69,7 @@ SubscriptionSource::SubscriptionSource(
 | 
			
		||||
    std::chrono::steady_clock::duration const connectionTimeout,
 | 
			
		||||
    std::chrono::steady_clock::duration const retryDelay
 | 
			
		||||
)
 | 
			
		||||
    : log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
 | 
			
		||||
    : log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
 | 
			
		||||
    , wsConnectionBuilder_(ip, wsPort)
 | 
			
		||||
    , validatedLedgers_(std::move(validatedLedgers))
 | 
			
		||||
    , subscriptions_(std::move(subscriptions))
 | 
			
		||||
@@ -133,6 +133,7 @@ void
 | 
			
		||||
SubscriptionSource::setForwarding(bool isForwarding)
 | 
			
		||||
{
 | 
			
		||||
    isForwarding_ = isForwarding;
 | 
			
		||||
    LOG(log_.info()) << "Forwarding set to " << isForwarding_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::chrono::steady_clock::time_point
 | 
			
		||||
@@ -168,6 +169,7 @@ SubscriptionSource::subscribe()
 | 
			
		||||
            wsConnection_ = std::move(connection).value();
 | 
			
		||||
            isConnected_ = true;
 | 
			
		||||
            onConnect_();
 | 
			
		||||
            LOG(log_.info()) << "Connected";
 | 
			
		||||
 | 
			
		||||
            auto const& subscribeCommand = getSubscribeCommandJson();
 | 
			
		||||
            auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
 | 
			
		||||
@@ -224,10 +226,11 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
                auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
 | 
			
		||||
                setValidatedRange(std::move(validatedLedgers));
 | 
			
		||||
            }
 | 
			
		||||
            LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
 | 
			
		||||
            LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
 | 
			
		||||
 | 
			
		||||
        } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
 | 
			
		||||
            LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
 | 
			
		||||
            LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
 | 
			
		||||
                              << object;
 | 
			
		||||
            if (object.contains(JS(ledger_index))) {
 | 
			
		||||
                ledgerIndex = object.at(JS(ledger_index)).as_int64();
 | 
			
		||||
            }
 | 
			
		||||
@@ -245,11 +248,16 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
                // 2 - Validated transaction
 | 
			
		||||
                // Only forward proposed transaction, validated transactions are sent by Clio itself
 | 
			
		||||
                if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
 | 
			
		||||
                    subscriptions_->forwardProposedTransaction(object);
 | 
			
		||||
                } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding validation: " << object;
 | 
			
		||||
                    subscriptions_->forwardValidation(object);
 | 
			
		||||
                } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding manifest: " << object;
 | 
			
		||||
                    subscriptions_->forwardManifest(object);
 | 
			
		||||
                } else {
 | 
			
		||||
                    LOG(log_.error()) << "Unknown message: " << object;
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
        }
 | 
			
		||||
@@ -261,7 +269,7 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
 | 
			
		||||
        return std::nullopt;
 | 
			
		||||
    } catch (std::exception const& e) {
 | 
			
		||||
        LOG(log_.error()) << "Exception in handleMessage : " << e.what();
 | 
			
		||||
        LOG(log_.error()) << "Exception in handleMessage: " << e.what();
 | 
			
		||||
        return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -273,13 +281,11 @@ SubscriptionSource::handleError(util::requests::RequestError const& error, boost
 | 
			
		||||
    isForwarding_ = false;
 | 
			
		||||
    if (not stop_) {
 | 
			
		||||
        onDisconnect_();
 | 
			
		||||
        LOG(log_.info()) << "Disconnected";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (wsConnection_ != nullptr) {
 | 
			
		||||
        auto const err = wsConnection_->close(yield);
 | 
			
		||||
        if (err) {
 | 
			
		||||
            LOG(log_.error()) << "Error closing websocket connection: " << err->message();
 | 
			
		||||
        }
 | 
			
		||||
        wsConnection_->close(yield);
 | 
			
		||||
        wsConnection_.reset();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
 | 
			
		||||
    boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
 | 
			
		||||
        notified_.clear();
 | 
			
		||||
        signal_.emit(pubMsg);
 | 
			
		||||
 | 
			
		||||
        // Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
 | 
			
		||||
        // However, if the same connection subscribe both stream and account, it will still receive the message twice.
 | 
			
		||||
        // notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
 | 
			
		||||
        // acts like this.
 | 
			
		||||
        notified_.clear();
 | 
			
		||||
 | 
			
		||||
        for (auto const& account : affectedAccounts)
 | 
			
		||||
            accountSignal_.emit(account, pubMsg);
 | 
			
		||||
 | 
			
		||||
        ++pubCount_.get();
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@
 | 
			
		||||
#include "feed/impl/TrackableSignalMap.hpp"
 | 
			
		||||
#include "feed/impl/Util.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
 | 
			
		||||
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
 | 
			
		||||
    TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
 | 
			
		||||
    TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
 | 
			
		||||
@@ -67,7 +69,7 @@ public:
 | 
			
		||||
        : strand_(boost::asio::make_strand(ioContext))
 | 
			
		||||
        , subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
 | 
			
		||||
        , subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
 | 
			
		||||
 | 
			
		||||
        , pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,10 @@
 | 
			
		||||
namespace feed::impl {
 | 
			
		||||
 | 
			
		||||
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
 | 
			
		||||
    : strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
 | 
			
		||||
    : strand_(boost::asio::make_strand(ioContext))
 | 
			
		||||
    , subCount_(getSubscriptionsGaugeInt(name))
 | 
			
		||||
    , pubCount_(getPublishedMessagesCounterInt(name))
 | 
			
		||||
    , name_(name)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
 | 
			
		||||
    boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
 | 
			
		||||
        auto const msgPtr = std::make_shared<std::string>(std::move(msg));
 | 
			
		||||
        signal_.emit(msgPtr);
 | 
			
		||||
        ++pubCount_.get();
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@
 | 
			
		||||
#include "feed/Types.hpp"
 | 
			
		||||
#include "feed/impl/TrackableSignal.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -40,6 +41,7 @@ namespace feed::impl {
 | 
			
		||||
class SingleFeedBase {
 | 
			
		||||
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
    TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
 | 
			
		||||
    util::Logger logger_{"Subscriptions"};
 | 
			
		||||
    std::string name_;
 | 
			
		||||
 
 | 
			
		||||
@@ -284,23 +284,29 @@ TransactionFeed::pub(
 | 
			
		||||
         affectedBooks = std::move(affectedBooks)]() {
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
            signal_.emit(allVersionsMsgs);
 | 
			
		||||
 | 
			
		||||
            // clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
 | 
			
		||||
            // rippled SENDS the same message twice
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
            txProposedsignal_.emit(allVersionsMsgs);
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
 | 
			
		||||
            // check duplicate for account and proposed_account, this prevents sending the same message multiple times
 | 
			
		||||
            // if it affects multiple accounts watched by the same connection
 | 
			
		||||
            for (auto const& account : affectedAccounts) {
 | 
			
		||||
                accountSignal_.emit(account, allVersionsMsgs);
 | 
			
		||||
                accountProposedSignal_.emit(account, allVersionsMsgs);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
 | 
			
		||||
            // check duplicate for books, this prevents sending the same message multiple times if it affects multiple
 | 
			
		||||
            // books watched by the same connection
 | 
			
		||||
            for (auto const& book : affectedBooks) {
 | 
			
		||||
                bookSignal_.emit(book, allVersionsMsgs);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            ++pubCount_.get();
 | 
			
		||||
        }
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@
 | 
			
		||||
#include "feed/impl/TrackableSignalMap.hpp"
 | 
			
		||||
#include "feed/impl/Util.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -67,6 +68,7 @@ class TransactionFeed {
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
 | 
			
		||||
    TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
 | 
			
		||||
    TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
 | 
			
		||||
@@ -89,6 +91,7 @@ public:
 | 
			
		||||
        , subAllCount_(getSubscriptionsGaugeInt("tx"))
 | 
			
		||||
        , subAccountCount_(getSubscriptionsGaugeInt("account"))
 | 
			
		||||
        , subBookCount_(getSubscriptionsGaugeInt("book"))
 | 
			
		||||
        , pubCount_(getPublishedMessagesCounterInt("tx"))
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,7 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
 | 
			
		||||
        fmt::format("Current subscribers number on the {} stream", counterName)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
inline util::prometheus::CounterInt&
 | 
			
		||||
getPublishedMessagesCounterInt(std::string const& counterName)
 | 
			
		||||
{
 | 
			
		||||
    return PrometheusService::counterInt(
 | 
			
		||||
        "subscriptions_published_count",
 | 
			
		||||
        util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
 | 
			
		||||
        fmt::format("Total published messages on the {} stream", counterName)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
}  // namespace feed::impl
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user