mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	Compare commits
	
		
			10 Commits
		
	
	
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					8b0e68f48e | ||
| 
						 | 
					189098d092 | ||
| 
						 | 
					854749a05e | ||
| 
						 | 
					f57706be3d | ||
| 
						 | 
					bb0d912f2b | ||
| 
						 | 
					d02d6affdb | ||
| 
						 | 
					0054e4b64c | ||
| 
						 | 
					9fe9e7c9d2 | ||
| 
						 | 
					2e2740d4c5 | ||
| 
						 | 
					5004dc4e15 | 
							
								
								
									
										4
									
								
								.github/workflows/nightly.yml
									
									
									
									
										vendored
									
									
								
							
							
						
						
									
										4
									
								
								.github/workflows/nightly.yml
									
									
									
									
										vendored
									
									
								
							@@ -13,12 +13,15 @@ jobs:
 | 
			
		||||
        include:
 | 
			
		||||
          - os: macos14
 | 
			
		||||
            build_type: Release
 | 
			
		||||
            static: false
 | 
			
		||||
          - os: heavy
 | 
			
		||||
            build_type: Release
 | 
			
		||||
            static: true
 | 
			
		||||
            container:
 | 
			
		||||
              image: rippleci/clio_ci:latest
 | 
			
		||||
          - os: heavy
 | 
			
		||||
            build_type: Debug
 | 
			
		||||
            static: true
 | 
			
		||||
            container:
 | 
			
		||||
              image: rippleci/clio_ci:latest
 | 
			
		||||
    runs-on: [self-hosted, "${{ matrix.os }}"]
 | 
			
		||||
@@ -50,6 +53,7 @@ jobs:
 | 
			
		||||
          conan_profile: ${{ steps.conan.outputs.conan_profile }}
 | 
			
		||||
          conan_cache_hit: ${{ steps.restore_cache.outputs.conan_cache_hit }}
 | 
			
		||||
          build_type: ${{ matrix.build_type }}
 | 
			
		||||
          static: ${{ matrix.static }}
 | 
			
		||||
 | 
			
		||||
      - name: Build Clio
 | 
			
		||||
        uses: ./.github/actions/build_clio
 | 
			
		||||
 
 | 
			
		||||
@@ -109,10 +109,13 @@ LoadBalancer::LoadBalancer(
 | 
			
		||||
            validatedLedgers,
 | 
			
		||||
            forwardingTimeout,
 | 
			
		||||
            [this]() {
 | 
			
		||||
                if (not hasForwardingSource_)
 | 
			
		||||
                if (not hasForwardingSource_.lock().get())
 | 
			
		||||
                    chooseForwardingSource();
 | 
			
		||||
            },
 | 
			
		||||
            [this](bool wasForwarding) {
 | 
			
		||||
                if (wasForwarding)
 | 
			
		||||
                    chooseForwardingSource();
 | 
			
		||||
            },
 | 
			
		||||
            [this]() { chooseForwardingSource(); },
 | 
			
		||||
            [this]() {
 | 
			
		||||
                if (forwardingCache_.has_value())
 | 
			
		||||
                    forwardingCache_->invalidate();
 | 
			
		||||
@@ -314,11 +317,13 @@ LoadBalancer::getETLState() noexcept
 | 
			
		||||
void
 | 
			
		||||
LoadBalancer::chooseForwardingSource()
 | 
			
		||||
{
 | 
			
		||||
    hasForwardingSource_ = false;
 | 
			
		||||
    LOG(log_.info()) << "Choosing a new source to forward subscriptions";
 | 
			
		||||
    auto hasForwardingSourceLock = hasForwardingSource_.lock();
 | 
			
		||||
    hasForwardingSourceLock.get() = false;
 | 
			
		||||
    for (auto& source : sources_) {
 | 
			
		||||
        if (not hasForwardingSource_ and source->isConnected()) {
 | 
			
		||||
        if (not hasForwardingSourceLock.get() and source->isConnected()) {
 | 
			
		||||
            source->setForwarding(true);
 | 
			
		||||
            hasForwardingSource_ = true;
 | 
			
		||||
            hasForwardingSourceLock.get() = true;
 | 
			
		||||
        } else {
 | 
			
		||||
            source->setForwarding(false);
 | 
			
		||||
        }
 | 
			
		||||
 
 | 
			
		||||
@@ -25,6 +25,7 @@
 | 
			
		||||
#include "etl/Source.hpp"
 | 
			
		||||
#include "etl/impl/ForwardingCache.hpp"
 | 
			
		||||
#include "feed/SubscriptionManagerInterface.hpp"
 | 
			
		||||
#include "util/Mutex.hpp"
 | 
			
		||||
#include "util/config/Config.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
 | 
			
		||||
@@ -38,7 +39,6 @@
 | 
			
		||||
#include <org/xrpl/rpc/v1/ledger.pb.h>
 | 
			
		||||
#include <ripple/proto/org/xrpl/rpc/v1/xrp_ledger.grpc.pb.h>
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <memory>
 | 
			
		||||
@@ -74,7 +74,10 @@ private:
 | 
			
		||||
    std::optional<ETLState> etlState_;
 | 
			
		||||
    std::uint32_t downloadRanges_ =
 | 
			
		||||
        DEFAULT_DOWNLOAD_RANGES; /*< The number of markers to use when downloading initial ledger */
 | 
			
		||||
    std::atomic_bool hasForwardingSource_{false};
 | 
			
		||||
 | 
			
		||||
    // Using mutext instead of atomic_bool because choosing a new source to
 | 
			
		||||
    // forward messages should be done with a mutual exclusion otherwise there will be a race condition
 | 
			
		||||
    util::Mutex<bool> hasForwardingSource_{false};
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    /**
 | 
			
		||||
 
 | 
			
		||||
@@ -51,7 +51,7 @@ namespace etl {
 | 
			
		||||
class SourceBase {
 | 
			
		||||
public:
 | 
			
		||||
    using OnConnectHook = std::function<void()>;
 | 
			
		||||
    using OnDisconnectHook = std::function<void()>;
 | 
			
		||||
    using OnDisconnectHook = std::function<void(bool)>;
 | 
			
		||||
    using OnLedgerClosedHook = std::function<void()>;
 | 
			
		||||
 | 
			
		||||
    virtual ~SourceBase() = default;
 | 
			
		||||
 
 | 
			
		||||
@@ -46,7 +46,7 @@
 | 
			
		||||
namespace etl::impl {
 | 
			
		||||
 | 
			
		||||
GrpcSource::GrpcSource(std::string const& ip, std::string const& grpcPort, std::shared_ptr<BackendInterface> backend)
 | 
			
		||||
    : log_(fmt::format("ETL_Grpc[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
 | 
			
		||||
    : log_(fmt::format("GrpcSource[{}:{}]", ip, grpcPort)), backend_(std::move(backend))
 | 
			
		||||
{
 | 
			
		||||
    try {
 | 
			
		||||
        boost::asio::ip::tcp::endpoint const endpoint{boost::asio::ip::make_address(ip), std::stoi(grpcPort)};
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,8 @@
 | 
			
		||||
#include "rpc/JS.hpp"
 | 
			
		||||
#include "util/Retry.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
#include "util/requests/Types.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/algorithm/string/classification.hpp>
 | 
			
		||||
@@ -66,22 +68,28 @@ SubscriptionSource::SubscriptionSource(
 | 
			
		||||
    OnConnectHook onConnect,
 | 
			
		||||
    OnDisconnectHook onDisconnect,
 | 
			
		||||
    OnLedgerClosedHook onLedgerClosed,
 | 
			
		||||
    std::chrono::steady_clock::duration const connectionTimeout,
 | 
			
		||||
    std::chrono::steady_clock::duration const wsTimeout,
 | 
			
		||||
    std::chrono::steady_clock::duration const retryDelay
 | 
			
		||||
)
 | 
			
		||||
    : log_(fmt::format("GrpcSource[{}:{}]", ip, wsPort))
 | 
			
		||||
    : log_(fmt::format("SubscriptionSource[{}:{}]", ip, wsPort))
 | 
			
		||||
    , wsConnectionBuilder_(ip, wsPort)
 | 
			
		||||
    , validatedLedgers_(std::move(validatedLedgers))
 | 
			
		||||
    , subscriptions_(std::move(subscriptions))
 | 
			
		||||
    , strand_(boost::asio::make_strand(ioContext))
 | 
			
		||||
    , wsTimeout_(wsTimeout)
 | 
			
		||||
    , retry_(util::makeRetryExponentialBackoff(retryDelay, RETRY_MAX_DELAY, strand_))
 | 
			
		||||
    , onConnect_(std::move(onConnect))
 | 
			
		||||
    , onDisconnect_(std::move(onDisconnect))
 | 
			
		||||
    , onLedgerClosed_(std::move(onLedgerClosed))
 | 
			
		||||
    , lastMessageTimeSecondsSinceEpoch_(PrometheusService::gaugeInt(
 | 
			
		||||
          "subscription_source_last_message_time",
 | 
			
		||||
          util::prometheus::Labels({{"source", fmt::format("{}:{}", ip, wsPort)}}),
 | 
			
		||||
          "Seconds since epoch of the last message received from rippled subscription streams"
 | 
			
		||||
      ))
 | 
			
		||||
{
 | 
			
		||||
    wsConnectionBuilder_.addHeader({boost::beast::http::field::user_agent, "clio-client"})
 | 
			
		||||
        .addHeader({"X-User", "clio-client"})
 | 
			
		||||
        .setConnectionTimeout(connectionTimeout);
 | 
			
		||||
        .setConnectionTimeout(wsTimeout_);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
SubscriptionSource::~SubscriptionSource()
 | 
			
		||||
@@ -133,6 +141,7 @@ void
 | 
			
		||||
SubscriptionSource::setForwarding(bool isForwarding)
 | 
			
		||||
{
 | 
			
		||||
    isForwarding_ = isForwarding;
 | 
			
		||||
    LOG(log_.info()) << "Forwarding set to " << isForwarding_;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::chrono::steady_clock::time_point
 | 
			
		||||
@@ -166,20 +175,22 @@ SubscriptionSource::subscribe()
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            wsConnection_ = std::move(connection).value();
 | 
			
		||||
            isConnected_ = true;
 | 
			
		||||
            onConnect_();
 | 
			
		||||
 | 
			
		||||
            auto const& subscribeCommand = getSubscribeCommandJson();
 | 
			
		||||
            auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield);
 | 
			
		||||
            auto const writeErrorOpt = wsConnection_->write(subscribeCommand, yield, wsTimeout_);
 | 
			
		||||
            if (writeErrorOpt) {
 | 
			
		||||
                handleError(writeErrorOpt.value(), yield);
 | 
			
		||||
                return;
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            isConnected_ = true;
 | 
			
		||||
            LOG(log_.info()) << "Connected";
 | 
			
		||||
            onConnect_();
 | 
			
		||||
 | 
			
		||||
            retry_.reset();
 | 
			
		||||
 | 
			
		||||
            while (!stop_) {
 | 
			
		||||
                auto const message = wsConnection_->read(yield);
 | 
			
		||||
                auto const message = wsConnection_->read(yield, wsTimeout_);
 | 
			
		||||
                if (not message) {
 | 
			
		||||
                    handleError(message.error(), yield);
 | 
			
		||||
                    return;
 | 
			
		||||
@@ -224,10 +235,11 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
                auto validatedLedgers = boost::json::value_to<std::string>(result.at(JS(validated_ledgers)));
 | 
			
		||||
                setValidatedRange(std::move(validatedLedgers));
 | 
			
		||||
            }
 | 
			
		||||
            LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
 | 
			
		||||
            LOG(log_.debug()) << "Received a message on ledger subscription stream. Message: " << object;
 | 
			
		||||
 | 
			
		||||
        } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_LedgerClosed) {
 | 
			
		||||
            LOG(log_.info()) << "Received a message on ledger subscription stream. Message : " << object;
 | 
			
		||||
            LOG(log_.debug()) << "Received a message of type 'ledgerClosed' on ledger subscription stream. Message: "
 | 
			
		||||
                              << object;
 | 
			
		||||
            if (object.contains(JS(ledger_index))) {
 | 
			
		||||
                ledgerIndex = object.at(JS(ledger_index)).as_int64();
 | 
			
		||||
            }
 | 
			
		||||
@@ -245,10 +257,13 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
                // 2 - Validated transaction
 | 
			
		||||
                // Only forward proposed transaction, validated transactions are sent by Clio itself
 | 
			
		||||
                if (object.contains(JS(transaction)) and !object.contains(JS(meta))) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding proposed transaction: " << object;
 | 
			
		||||
                    subscriptions_->forwardProposedTransaction(object);
 | 
			
		||||
                } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ValidationReceived) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding validation: " << object;
 | 
			
		||||
                    subscriptions_->forwardValidation(object);
 | 
			
		||||
                } else if (object.contains(JS(type)) && object.at(JS(type)) == JS_ManifestReceived) {
 | 
			
		||||
                    LOG(log_.debug()) << "Forwarding manifest: " << object;
 | 
			
		||||
                    subscriptions_->forwardManifest(object);
 | 
			
		||||
                }
 | 
			
		||||
            }
 | 
			
		||||
@@ -261,7 +276,7 @@ SubscriptionSource::handleMessage(std::string const& message)
 | 
			
		||||
 | 
			
		||||
        return std::nullopt;
 | 
			
		||||
    } catch (std::exception const& e) {
 | 
			
		||||
        LOG(log_.error()) << "Exception in handleMessage : " << e.what();
 | 
			
		||||
        LOG(log_.error()) << "Exception in handleMessage: " << e.what();
 | 
			
		||||
        return util::requests::RequestError{fmt::format("Error handling message: {}", e.what())};
 | 
			
		||||
    }
 | 
			
		||||
}
 | 
			
		||||
@@ -270,16 +285,14 @@ void
 | 
			
		||||
SubscriptionSource::handleError(util::requests::RequestError const& error, boost::asio::yield_context yield)
 | 
			
		||||
{
 | 
			
		||||
    isConnected_ = false;
 | 
			
		||||
    isForwarding_ = false;
 | 
			
		||||
    bool const wasForwarding = isForwarding_.exchange(false);
 | 
			
		||||
    if (not stop_) {
 | 
			
		||||
        onDisconnect_();
 | 
			
		||||
        LOG(log_.info()) << "Disconnected";
 | 
			
		||||
        onDisconnect_(wasForwarding);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (wsConnection_ != nullptr) {
 | 
			
		||||
        auto const err = wsConnection_->close(yield);
 | 
			
		||||
        if (err) {
 | 
			
		||||
            LOG(log_.error()) << "Error closing websocket connection: " << err->message();
 | 
			
		||||
        }
 | 
			
		||||
        wsConnection_->close(yield);
 | 
			
		||||
        wsConnection_.reset();
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -306,7 +319,11 @@ SubscriptionSource::logError(util::requests::RequestError const& error) const
 | 
			
		||||
void
 | 
			
		||||
SubscriptionSource::setLastMessageTime()
 | 
			
		||||
{
 | 
			
		||||
    lastMessageTime_.lock().get() = std::chrono::steady_clock::now();
 | 
			
		||||
    lastMessageTimeSecondsSinceEpoch_.get().set(
 | 
			
		||||
        std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch()).count()
 | 
			
		||||
    );
 | 
			
		||||
    auto lock = lastMessageTime_.lock();
 | 
			
		||||
    lock.get() = std::chrono::steady_clock::now();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
void
 | 
			
		||||
 
 | 
			
		||||
@@ -19,12 +19,13 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "etl/ETLHelpers.hpp"
 | 
			
		||||
#include "etl/NetworkValidatedLedgersInterface.hpp"
 | 
			
		||||
#include "etl/Source.hpp"
 | 
			
		||||
#include "feed/SubscriptionManagerInterface.hpp"
 | 
			
		||||
#include "util/Mutex.hpp"
 | 
			
		||||
#include "util/Retry.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
#include "util/requests/Types.hpp"
 | 
			
		||||
#include "util/requests/WsConnection.hpp"
 | 
			
		||||
 | 
			
		||||
@@ -37,6 +38,7 @@
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <functional>
 | 
			
		||||
#include <future>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
@@ -71,6 +73,8 @@ private:
 | 
			
		||||
 | 
			
		||||
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
 | 
			
		||||
 | 
			
		||||
    std::chrono::steady_clock::duration wsTimeout_;
 | 
			
		||||
 | 
			
		||||
    util::Retry retry_;
 | 
			
		||||
 | 
			
		||||
    OnConnectHook onConnect_;
 | 
			
		||||
@@ -83,9 +87,11 @@ private:
 | 
			
		||||
 | 
			
		||||
    util::Mutex<std::chrono::steady_clock::time_point> lastMessageTime_;
 | 
			
		||||
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> lastMessageTimeSecondsSinceEpoch_;
 | 
			
		||||
 | 
			
		||||
    std::future<void> runFuture_;
 | 
			
		||||
 | 
			
		||||
    static constexpr std::chrono::seconds CONNECTION_TIMEOUT{30};
 | 
			
		||||
    static constexpr std::chrono::seconds WS_TIMEOUT{30};
 | 
			
		||||
    static constexpr std::chrono::seconds RETRY_MAX_DELAY{30};
 | 
			
		||||
    static constexpr std::chrono::seconds RETRY_DELAY{1};
 | 
			
		||||
 | 
			
		||||
@@ -103,7 +109,7 @@ public:
 | 
			
		||||
     * @param onNewLedger The onNewLedger hook. Called when a new ledger is received
 | 
			
		||||
     * @param onLedgerClosed The onLedgerClosed hook. Called when the ledger is closed but only if the source is
 | 
			
		||||
     * forwarding
 | 
			
		||||
     * @param connectionTimeout The connection timeout. Defaults to 30 seconds
 | 
			
		||||
     * @param wsTimeout A timeout for websocket operations. Defaults to 30 seconds
 | 
			
		||||
     * @param retryDelay The retry delay. Defaults to 1 second
 | 
			
		||||
     */
 | 
			
		||||
    SubscriptionSource(
 | 
			
		||||
@@ -115,7 +121,7 @@ public:
 | 
			
		||||
        OnConnectHook onConnect,
 | 
			
		||||
        OnDisconnectHook onDisconnect,
 | 
			
		||||
        OnLedgerClosedHook onLedgerClosed,
 | 
			
		||||
        std::chrono::steady_clock::duration const connectionTimeout = CONNECTION_TIMEOUT,
 | 
			
		||||
        std::chrono::steady_clock::duration const wsTimeout = WS_TIMEOUT,
 | 
			
		||||
        std::chrono::steady_clock::duration const retryDelay = RETRY_DELAY
 | 
			
		||||
    );
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -104,13 +104,17 @@ ProposedTransactionFeed::pub(boost::json::object const& receivedTxJson)
 | 
			
		||||
    boost::asio::post(strand_, [this, pubMsg = std::move(pubMsg), affectedAccounts = std::move(affectedAccounts)]() {
 | 
			
		||||
        notified_.clear();
 | 
			
		||||
        signal_.emit(pubMsg);
 | 
			
		||||
 | 
			
		||||
        // Prevent the same connection from receiving the same message twice if it is subscribed to multiple accounts
 | 
			
		||||
        // However, if the same connection subscribe both stream and account, it will still receive the message twice.
 | 
			
		||||
        // notified_ can be cleared before signal_ emit to improve this, but let's keep it as is for now, since rippled
 | 
			
		||||
        // acts like this.
 | 
			
		||||
        notified_.clear();
 | 
			
		||||
 | 
			
		||||
        for (auto const& account : affectedAccounts)
 | 
			
		||||
            accountSignal_.emit(account, pubMsg);
 | 
			
		||||
 | 
			
		||||
        ++pubCount_.get();
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@
 | 
			
		||||
#include "feed/impl/TrackableSignalMap.hpp"
 | 
			
		||||
#include "feed/impl/Util.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -54,6 +55,7 @@ class ProposedTransactionFeed {
 | 
			
		||||
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
 | 
			
		||||
    TrackableSignalMap<ripple::AccountID, Subscriber, std::shared_ptr<std::string>> accountSignal_;
 | 
			
		||||
    TrackableSignal<Subscriber, std::shared_ptr<std::string>> signal_;
 | 
			
		||||
@@ -67,7 +69,7 @@ public:
 | 
			
		||||
        : strand_(boost::asio::make_strand(ioContext))
 | 
			
		||||
        , subAllCount_(getSubscriptionsGaugeInt("tx_proposed"))
 | 
			
		||||
        , subAccountCount_(getSubscriptionsGaugeInt("account_proposed"))
 | 
			
		||||
 | 
			
		||||
        , pubCount_(getPublishedMessagesCounterInt("tx_proposed"))
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -36,7 +36,10 @@
 | 
			
		||||
namespace feed::impl {
 | 
			
		||||
 | 
			
		||||
SingleFeedBase::SingleFeedBase(boost::asio::io_context& ioContext, std::string const& name)
 | 
			
		||||
    : strand_(boost::asio::make_strand(ioContext)), subCount_(getSubscriptionsGaugeInt(name)), name_(name)
 | 
			
		||||
    : strand_(boost::asio::make_strand(ioContext))
 | 
			
		||||
    , subCount_(getSubscriptionsGaugeInt(name))
 | 
			
		||||
    , pubCount_(getPublishedMessagesCounterInt(name))
 | 
			
		||||
    , name_(name)
 | 
			
		||||
{
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -70,6 +73,7 @@ SingleFeedBase::pub(std::string msg) const
 | 
			
		||||
    boost::asio::post(strand_, [this, msg = std::move(msg)]() mutable {
 | 
			
		||||
        auto const msgPtr = std::make_shared<std::string>(std::move(msg));
 | 
			
		||||
        signal_.emit(msgPtr);
 | 
			
		||||
        ++pubCount_.get();
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@
 | 
			
		||||
#include "feed/Types.hpp"
 | 
			
		||||
#include "feed/impl/TrackableSignal.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -40,6 +41,7 @@ namespace feed::impl {
 | 
			
		||||
class SingleFeedBase {
 | 
			
		||||
    boost::asio::strand<boost::asio::io_context::executor_type> strand_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
    TrackableSignal<Subscriber, std::shared_ptr<std::string> const&> signal_;
 | 
			
		||||
    util::Logger logger_{"Subscriptions"};
 | 
			
		||||
    std::string name_;
 | 
			
		||||
 
 | 
			
		||||
@@ -284,23 +284,29 @@ TransactionFeed::pub(
 | 
			
		||||
         affectedBooks = std::move(affectedBooks)]() {
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
            signal_.emit(allVersionsMsgs);
 | 
			
		||||
 | 
			
		||||
            // clear the notified set. If the same connection subscribes both transactions + proposed_transactions,
 | 
			
		||||
            // rippled SENDS the same message twice
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
            txProposedsignal_.emit(allVersionsMsgs);
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
 | 
			
		||||
            // check duplicate for account and proposed_account, this prevents sending the same message multiple times
 | 
			
		||||
            // if it affects multiple accounts watched by the same connection
 | 
			
		||||
            for (auto const& account : affectedAccounts) {
 | 
			
		||||
                accountSignal_.emit(account, allVersionsMsgs);
 | 
			
		||||
                accountProposedSignal_.emit(account, allVersionsMsgs);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            notified_.clear();
 | 
			
		||||
 | 
			
		||||
            // check duplicate for books, this prevents sending the same message multiple times if it affects multiple
 | 
			
		||||
            // books watched by the same connection
 | 
			
		||||
            for (auto const& book : affectedBooks) {
 | 
			
		||||
                bookSignal_.emit(book, allVersionsMsgs);
 | 
			
		||||
            }
 | 
			
		||||
 | 
			
		||||
            ++pubCount_.get();
 | 
			
		||||
        }
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -26,6 +26,7 @@
 | 
			
		||||
#include "feed/impl/TrackableSignalMap.hpp"
 | 
			
		||||
#include "feed/impl/Util.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
@@ -67,6 +68,7 @@ class TransactionFeed {
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAllCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subAccountCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> subBookCount_;
 | 
			
		||||
    std::reference_wrapper<util::prometheus::CounterInt> pubCount_;
 | 
			
		||||
 | 
			
		||||
    TrackableSignalMap<ripple::AccountID, Subscriber, AllVersionTransactionsType const&> accountSignal_;
 | 
			
		||||
    TrackableSignalMap<ripple::Book, Subscriber, AllVersionTransactionsType const&> bookSignal_;
 | 
			
		||||
@@ -89,6 +91,7 @@ public:
 | 
			
		||||
        , subAllCount_(getSubscriptionsGaugeInt("tx"))
 | 
			
		||||
        , subAccountCount_(getSubscriptionsGaugeInt("account"))
 | 
			
		||||
        , subBookCount_(getSubscriptionsGaugeInt("book"))
 | 
			
		||||
        , pubCount_(getPublishedMessagesCounterInt("tx"))
 | 
			
		||||
    {
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -19,6 +19,7 @@
 | 
			
		||||
 | 
			
		||||
#pragma once
 | 
			
		||||
 | 
			
		||||
#include "util/prometheus/Counter.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
@@ -38,4 +39,14 @@ getSubscriptionsGaugeInt(std::string const& counterName)
 | 
			
		||||
        fmt::format("Current subscribers number on the {} stream", counterName)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
inline util::prometheus::CounterInt&
 | 
			
		||||
getPublishedMessagesCounterInt(std::string const& counterName)
 | 
			
		||||
{
 | 
			
		||||
    return PrometheusService::counterInt(
 | 
			
		||||
        "subscriptions_published_count",
 | 
			
		||||
        util::prometheus::Labels({util::prometheus::Label{"stream", counterName}}),
 | 
			
		||||
        fmt::format("Total published messages on the {} stream", counterName)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
}  // namespace feed::impl
 | 
			
		||||
 
 | 
			
		||||
@@ -11,18 +11,16 @@ target_sources(clio_server PRIVATE Main.cpp)
 | 
			
		||||
target_link_libraries(clio_server PRIVATE clio)
 | 
			
		||||
 | 
			
		||||
if (static)
 | 
			
		||||
  target_link_options(clio_server PRIVATE -static)
 | 
			
		||||
 | 
			
		||||
  if (is_gcc AND NOT san)
 | 
			
		||||
  if (san)
 | 
			
		||||
    message(FATAL_ERROR "Static linkage not allowed when using sanitizers")
 | 
			
		||||
  elseif (is_appleclang)
 | 
			
		||||
    message(FATAL_ERROR "Static linkage not supported on AppleClang")
 | 
			
		||||
  else ()
 | 
			
		||||
    target_link_options(
 | 
			
		||||
      # For now let's assume that we only using libstdc++ under gcc.
 | 
			
		||||
      # Note: -static-libstdc++ can statically link both libstdc++ and libc++
 | 
			
		||||
      clio_server PRIVATE -static-libstdc++ -static-libgcc
 | 
			
		||||
    )
 | 
			
		||||
  endif ()
 | 
			
		||||
 | 
			
		||||
  if (is_appleclang)
 | 
			
		||||
    message(FATAL_ERROR "Static linkage not supported on AppleClang")
 | 
			
		||||
  endif ()
 | 
			
		||||
endif ()
 | 
			
		||||
 | 
			
		||||
set_target_properties(clio_server PROPERTIES RUNTIME_OUTPUT_DIRECTORY ${CMAKE_BINARY_DIR})
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@
 | 
			
		||||
#include "data/BackendInterface.hpp"
 | 
			
		||||
#include "rpc/Counters.hpp"
 | 
			
		||||
#include "rpc/Errors.hpp"
 | 
			
		||||
#include "rpc/RPCHelpers.hpp"
 | 
			
		||||
#include "rpc/WorkQueue.hpp"
 | 
			
		||||
#include "rpc/common/HandlerProvider.hpp"
 | 
			
		||||
#include "rpc/common/Types.hpp"
 | 
			
		||||
@@ -134,8 +135,13 @@ public:
 | 
			
		||||
    Result
 | 
			
		||||
    buildResponse(web::Context const& ctx)
 | 
			
		||||
    {
 | 
			
		||||
        if (forwardingProxy_.shouldForward(ctx))
 | 
			
		||||
        if (forwardingProxy_.shouldForward(ctx)) {
 | 
			
		||||
            // Disallow forwarding of the admin api, only user api is allowed for security reasons.
 | 
			
		||||
            if (isAdminCmd(ctx.method, ctx.params))
 | 
			
		||||
                return Result{Status{RippledError::rpcNO_PERMISSION}};
 | 
			
		||||
 | 
			
		||||
            return forwardingProxy_.forward(ctx);
 | 
			
		||||
        }
 | 
			
		||||
 | 
			
		||||
        if (backend_->isTooBusy()) {
 | 
			
		||||
            LOG(log_.error()) << "Database is too busy. Rejecting request";
 | 
			
		||||
 
 | 
			
		||||
@@ -36,6 +36,7 @@
 | 
			
		||||
#include <boost/json/array.hpp>
 | 
			
		||||
#include <boost/json/object.hpp>
 | 
			
		||||
#include <boost/json/parse.hpp>
 | 
			
		||||
#include <boost/json/serialize.hpp>
 | 
			
		||||
#include <boost/json/string.hpp>
 | 
			
		||||
#include <boost/json/value.hpp>
 | 
			
		||||
#include <boost/json/value_to.hpp>
 | 
			
		||||
@@ -49,6 +50,7 @@
 | 
			
		||||
#include <ripple/basics/chrono.h>
 | 
			
		||||
#include <ripple/basics/strHex.h>
 | 
			
		||||
#include <ripple/beast/utility/Zero.h>
 | 
			
		||||
#include <ripple/json/json_reader.h>
 | 
			
		||||
#include <ripple/json/json_value.h>
 | 
			
		||||
#include <ripple/protocol/AccountID.h>
 | 
			
		||||
#include <ripple/protocol/Book.h>
 | 
			
		||||
@@ -1273,6 +1275,31 @@ specifiesCurrentOrClosedLedger(boost::json::object const& request)
 | 
			
		||||
    return false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
bool
 | 
			
		||||
isAdminCmd(std::string const& method, boost::json::object const& request)
 | 
			
		||||
{
 | 
			
		||||
    if (method == JS(ledger)) {
 | 
			
		||||
        auto const requestStr = boost::json::serialize(request);
 | 
			
		||||
        Json::Value jv;
 | 
			
		||||
        Json::Reader{}.parse(requestStr, jv);
 | 
			
		||||
        // rippled considers string/non-zero int/non-empty array/ non-empty json as true.
 | 
			
		||||
        // Use rippled's API asBool to get the same result.
 | 
			
		||||
        // https://github.com/XRPLF/rippled/issues/5119
 | 
			
		||||
        auto const isFieldSet = [&jv](auto const field) { return jv.isMember(field) and jv[field].asBool(); };
 | 
			
		||||
 | 
			
		||||
        // According to doc
 | 
			
		||||
        // https://xrpl.org/docs/references/http-websocket-apis/public-api-methods/ledger-methods/ledger,
 | 
			
		||||
        // full/accounts/type are admin only, but type only works when full/accounts are set, so we don't need to check
 | 
			
		||||
        // type.
 | 
			
		||||
        if (isFieldSet(JS(full)) or isFieldSet(JS(accounts)))
 | 
			
		||||
            return true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    if (method == JS(feature) and request.contains(JS(vetoed)))
 | 
			
		||||
        return true;
 | 
			
		||||
    return false;
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
std::variant<ripple::uint256, Status>
 | 
			
		||||
getNFTID(boost::json::object const& request)
 | 
			
		||||
{
 | 
			
		||||
 
 | 
			
		||||
@@ -557,6 +557,16 @@ parseIssue(boost::json::object const& issue);
 | 
			
		||||
bool
 | 
			
		||||
specifiesCurrentOrClosedLedger(boost::json::object const& request);
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Check whether a request requires administrative privileges on rippled side.
 | 
			
		||||
 *
 | 
			
		||||
 * @param method The method name to check
 | 
			
		||||
 * @param request The request to check
 | 
			
		||||
 * @return true if the request requires ADMIN role
 | 
			
		||||
 */
 | 
			
		||||
bool
 | 
			
		||||
isAdminCmd(std::string const& method, boost::json::object const& request);
 | 
			
		||||
 | 
			
		||||
/**
 | 
			
		||||
 * @brief Get the NFTID from the request
 | 
			
		||||
 *
 | 
			
		||||
 
 | 
			
		||||
@@ -39,7 +39,6 @@
 | 
			
		||||
#include <ripple/protocol/jss.h>
 | 
			
		||||
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <limits>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <string>
 | 
			
		||||
@@ -57,8 +56,8 @@ class AccountTxHandler {
 | 
			
		||||
    std::shared_ptr<BackendInterface> sharedPtrBackend_;
 | 
			
		||||
 | 
			
		||||
public:
 | 
			
		||||
    // no max limit
 | 
			
		||||
    static auto constexpr LIMIT_MIN = 1;
 | 
			
		||||
    static auto constexpr LIMIT_MAX = 1000;
 | 
			
		||||
    static auto constexpr LIMIT_DEFAULT = 200;
 | 
			
		||||
 | 
			
		||||
    /**
 | 
			
		||||
@@ -133,7 +132,7 @@ public:
 | 
			
		||||
            {JS(limit),
 | 
			
		||||
             validation::Type<uint32_t>{},
 | 
			
		||||
             validation::Min(1u),
 | 
			
		||||
             modifiers::Clamp<int32_t>{LIMIT_MIN, std::numeric_limits<int32_t>::max()}},
 | 
			
		||||
             modifiers::Clamp<int32_t>{LIMIT_MIN, LIMIT_MAX}},
 | 
			
		||||
            {JS(marker),
 | 
			
		||||
             meta::WithCustomError{
 | 
			
		||||
                 validation::Type<boost::json::object>{},
 | 
			
		||||
 
 | 
			
		||||
@@ -39,8 +39,10 @@
 | 
			
		||||
#include <boost/beast/websocket/stream_base.hpp>
 | 
			
		||||
#include <boost/system/errc.hpp>
 | 
			
		||||
 | 
			
		||||
#include <atomic>
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <expected>
 | 
			
		||||
#include <memory>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <utility>
 | 
			
		||||
@@ -123,15 +125,20 @@ private:
 | 
			
		||||
    static void
 | 
			
		||||
    withTimeout(Operation&& operation, boost::asio::yield_context yield, std::chrono::steady_clock::duration timeout)
 | 
			
		||||
    {
 | 
			
		||||
        auto isCompleted = std::make_shared<bool>(false);
 | 
			
		||||
        boost::asio::cancellation_signal cancellationSignal;
 | 
			
		||||
        auto cyield = boost::asio::bind_cancellation_slot(cancellationSignal.slot(), yield);
 | 
			
		||||
 | 
			
		||||
        boost::asio::steady_timer timer{boost::asio::get_associated_executor(cyield), timeout};
 | 
			
		||||
        timer.async_wait([&cancellationSignal](boost::system::error_code errorCode) {
 | 
			
		||||
            if (!errorCode)
 | 
			
		||||
 | 
			
		||||
        // The timer below can be called with no error code even if the operation is completed before the timeout, so we
 | 
			
		||||
        // need an additional flag here
 | 
			
		||||
        timer.async_wait([&cancellationSignal, isCompleted](boost::system::error_code errorCode) {
 | 
			
		||||
            if (!errorCode and not*isCompleted)
 | 
			
		||||
                cancellationSignal.emit(boost::asio::cancellation_type::terminal);
 | 
			
		||||
        });
 | 
			
		||||
        operation(cyield);
 | 
			
		||||
        *isCompleted = true;
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    static boost::system::error_code
 | 
			
		||||
 
 | 
			
		||||
@@ -69,10 +69,8 @@ namespace web {
 | 
			
		||||
 * @tparam HandlerType The executor to handle the requests
 | 
			
		||||
 */
 | 
			
		||||
template <
 | 
			
		||||
    template <typename>
 | 
			
		||||
    class PlainSessionType,
 | 
			
		||||
    template <typename>
 | 
			
		||||
    class SslSessionType,
 | 
			
		||||
    template <typename> class PlainSessionType,
 | 
			
		||||
    template <typename> class SslSessionType,
 | 
			
		||||
    SomeServerHandler HandlerType>
 | 
			
		||||
class Detector : public std::enable_shared_from_this<Detector<PlainSessionType, SslSessionType, HandlerType>> {
 | 
			
		||||
    using std::enable_shared_from_this<Detector<PlainSessionType, SslSessionType, HandlerType>>::shared_from_this;
 | 
			
		||||
@@ -191,10 +189,8 @@ public:
 | 
			
		||||
 * @tparam HandlerType The handler to process the request and return response.
 | 
			
		||||
 */
 | 
			
		||||
template <
 | 
			
		||||
    template <typename>
 | 
			
		||||
    class PlainSessionType,
 | 
			
		||||
    template <typename>
 | 
			
		||||
    class SslSessionType,
 | 
			
		||||
    template <typename> class PlainSessionType,
 | 
			
		||||
    template <typename> class SslSessionType,
 | 
			
		||||
    SomeServerHandler HandlerType>
 | 
			
		||||
class Server : public std::enable_shared_from_this<Server<PlainSessionType, SslSessionType, HandlerType>> {
 | 
			
		||||
    using std::enable_shared_from_this<Server<PlainSessionType, SslSessionType, HandlerType>>::shared_from_this;
 | 
			
		||||
 
 | 
			
		||||
@@ -23,6 +23,9 @@
 | 
			
		||||
#include "rpc/common/Types.hpp"
 | 
			
		||||
#include "util/Taggable.hpp"
 | 
			
		||||
#include "util/log/Logger.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
#include "web/DOSGuard.hpp"
 | 
			
		||||
#include "web/interface/Concepts.hpp"
 | 
			
		||||
#include "web/interface/ConnectionBase.hpp"
 | 
			
		||||
@@ -71,6 +74,8 @@ template <template <typename> typename Derived, SomeServerHandler HandlerType>
 | 
			
		||||
class WsBase : public ConnectionBase, public std::enable_shared_from_this<WsBase<Derived, HandlerType>> {
 | 
			
		||||
    using std::enable_shared_from_this<WsBase<Derived, HandlerType>>::shared_from_this;
 | 
			
		||||
 | 
			
		||||
    std::reference_wrapper<util::prometheus::GaugeInt> messagesLength_;
 | 
			
		||||
 | 
			
		||||
    boost::beast::flat_buffer buffer_;
 | 
			
		||||
    std::reference_wrapper<web::DOSGuard> dosGuard_;
 | 
			
		||||
    bool sending_ = false;
 | 
			
		||||
@@ -103,15 +108,26 @@ public:
 | 
			
		||||
        std::shared_ptr<HandlerType> const& handler,
 | 
			
		||||
        boost::beast::flat_buffer&& buffer
 | 
			
		||||
    )
 | 
			
		||||
        : ConnectionBase(tagFactory, ip), buffer_(std::move(buffer)), dosGuard_(dosGuard), handler_(handler)
 | 
			
		||||
        : ConnectionBase(tagFactory, ip)
 | 
			
		||||
        , messagesLength_(PrometheusService::gaugeInt(
 | 
			
		||||
              "ws_messages_length",
 | 
			
		||||
              util::prometheus::Labels(),
 | 
			
		||||
              "The total length of messages in the queue"
 | 
			
		||||
          ))
 | 
			
		||||
        , buffer_(std::move(buffer))
 | 
			
		||||
        , dosGuard_(dosGuard)
 | 
			
		||||
        , handler_(handler)
 | 
			
		||||
    {
 | 
			
		||||
        upgraded = true;  // NOLINT (cppcoreguidelines-pro-type-member-init)
 | 
			
		||||
 | 
			
		||||
        LOG(perfLog_.debug()) << tag() << "session created";
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
    ~WsBase() override
 | 
			
		||||
    {
 | 
			
		||||
        LOG(perfLog_.debug()) << tag() << "session closed";
 | 
			
		||||
        if (!messages_.empty())
 | 
			
		||||
            messagesLength_.get() -= messages_.size();
 | 
			
		||||
        dosGuard_.get().decrement(clientIp);
 | 
			
		||||
    }
 | 
			
		||||
 | 
			
		||||
@@ -135,6 +151,7 @@ public:
 | 
			
		||||
    onWrite(boost::system::error_code ec, std::size_t)
 | 
			
		||||
    {
 | 
			
		||||
        messages_.pop();
 | 
			
		||||
        --messagesLength_.get();
 | 
			
		||||
        sending_ = false;
 | 
			
		||||
        if (ec) {
 | 
			
		||||
            wsFail(ec, "Failed to write");
 | 
			
		||||
@@ -165,6 +182,7 @@ public:
 | 
			
		||||
            derived().ws().get_executor(),
 | 
			
		||||
            [this, self = derived().shared_from_this(), msg = std::move(msg)]() {
 | 
			
		||||
                messages_.push(msg);
 | 
			
		||||
                ++messagesLength_.get();
 | 
			
		||||
                maybeSendNext();
 | 
			
		||||
            }
 | 
			
		||||
        );
 | 
			
		||||
 
 | 
			
		||||
@@ -271,15 +271,12 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0Disconnects)
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect();
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect(true);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerOnDisconnectHookTests, source1Disconnects)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect();
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect(false);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
 | 
			
		||||
@@ -288,29 +285,25 @@ TEST_F(LoadBalancerOnDisconnectHookTests, source0DisconnectsAndConnectsBack)
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect();
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect(true);
 | 
			
		||||
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onConnect();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerOnDisconnectHookTests, source1DisconnectsAndConnectsBack)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect();
 | 
			
		||||
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect(false);
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onConnect();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(LoadBalancerOnConnectHookTests, bothSourcesDisconnectAndConnectBack)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).Times(2).WillRepeatedly(Return(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false)).Times(2);
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).Times(2).WillRepeatedly(Return(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false)).Times(2);
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect();
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect();
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(false));
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect(true);
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onDisconnect(false);
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(true));
 | 
			
		||||
@@ -353,12 +346,7 @@ TEST_F(LoadBalancer3SourcesTests, forwardingUpdate)
 | 
			
		||||
    sourceFactory_.callbacksAt(1).onConnect();
 | 
			
		||||
 | 
			
		||||
    // Source 0 got disconnected
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), isConnected()).WillOnce(Return(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(0), setForwarding(false));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), isConnected()).WillOnce(Return(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(1), setForwarding(true));
 | 
			
		||||
    EXPECT_CALL(sourceFactory_.sourceAt(2), setForwarding(false));  // only source 1 must be forwarding
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect();
 | 
			
		||||
    sourceFactory_.callbacksAt(0).onDisconnect(false);
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct LoadBalancerLoadInitialLedgerTests : LoadBalancerOnConnectHookTests {
 | 
			
		||||
 
 | 
			
		||||
@@ -18,31 +18,35 @@
 | 
			
		||||
//==============================================================================
 | 
			
		||||
 | 
			
		||||
#include "etl/impl/SubscriptionSource.hpp"
 | 
			
		||||
#include "util/AssignRandomPort.hpp"
 | 
			
		||||
#include "util/Fixtures.hpp"
 | 
			
		||||
#include "util/MockNetworkValidatedLedgers.hpp"
 | 
			
		||||
#include "util/MockPrometheus.hpp"
 | 
			
		||||
#include "util/MockSubscriptionManager.hpp"
 | 
			
		||||
#include "util/TestWsServer.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/io_context.hpp>
 | 
			
		||||
#include <boost/asio/spawn.hpp>
 | 
			
		||||
#include <boost/json/object.hpp>
 | 
			
		||||
#include <boost/json/serialize.hpp>
 | 
			
		||||
#include <fmt/core.h>
 | 
			
		||||
#include <gmock/gmock.h>
 | 
			
		||||
#include <gtest/gtest.h>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
#include <cstdint>
 | 
			
		||||
#include <cstdlib>
 | 
			
		||||
#include <optional>
 | 
			
		||||
#include <string>
 | 
			
		||||
#include <thread>
 | 
			
		||||
#include <utility>
 | 
			
		||||
 | 
			
		||||
using namespace etl::impl;
 | 
			
		||||
using testing::MockFunction;
 | 
			
		||||
using testing::StrictMock;
 | 
			
		||||
 | 
			
		||||
struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
 | 
			
		||||
    SubscriptionSourceConnectionTests()
 | 
			
		||||
struct SubscriptionSourceConnectionTestsBase : public NoLoggerFixture {
 | 
			
		||||
    SubscriptionSourceConnectionTestsBase()
 | 
			
		||||
    {
 | 
			
		||||
        subscriptionSource_.run();
 | 
			
		||||
    }
 | 
			
		||||
@@ -54,7 +58,7 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
 | 
			
		||||
    StrictMockSubscriptionManagerSharedPtr subscriptionManager_;
 | 
			
		||||
 | 
			
		||||
    StrictMock<MockFunction<void()>> onConnectHook_;
 | 
			
		||||
    StrictMock<MockFunction<void()>> onDisconnectHook_;
 | 
			
		||||
    StrictMock<MockFunction<void(bool)>> onDisconnectHook_;
 | 
			
		||||
    StrictMock<MockFunction<void()>> onLedgerClosedHook_;
 | 
			
		||||
 | 
			
		||||
    SubscriptionSource subscriptionSource_{
 | 
			
		||||
@@ -66,8 +70,8 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
 | 
			
		||||
        onConnectHook_.AsStdFunction(),
 | 
			
		||||
        onDisconnectHook_.AsStdFunction(),
 | 
			
		||||
        onLedgerClosedHook_.AsStdFunction(),
 | 
			
		||||
        std::chrono::milliseconds(1),
 | 
			
		||||
        std::chrono::milliseconds(1)
 | 
			
		||||
        std::chrono::milliseconds(5),
 | 
			
		||||
        std::chrono::milliseconds(5)
 | 
			
		||||
    };
 | 
			
		||||
 | 
			
		||||
    [[maybe_unused]] TestWsConnection
 | 
			
		||||
@@ -92,15 +96,17 @@ struct SubscriptionSourceConnectionTests : public NoLoggerFixture {
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct SubscriptionSourceConnectionTests : util::prometheus::WithPrometheus, SubscriptionSourceConnectionTestsBase {};
 | 
			
		||||
 | 
			
		||||
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(SubscriptionSourceConnectionTests, ConnectionFailed_Retry_ConnectionFailed)
 | 
			
		||||
{
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -112,7 +118,19 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(SubscriptionSourceConnectionTests, ReadTimeout)
 | 
			
		||||
{
 | 
			
		||||
    boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
 | 
			
		||||
        auto connection = serverConnection(yield);
 | 
			
		||||
        std::this_thread::sleep_for(std::chrono::milliseconds{10});
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -126,7 +144,7 @@ TEST_F(SubscriptionSourceConnectionTests, ReadError_Reconnect)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -139,14 +157,14 @@ TEST_F(SubscriptionSourceConnectionTests, IsConnected)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).WillOnce([this]() { EXPECT_TRUE(subscriptionSource_.isConnected()); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() {
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() {
 | 
			
		||||
        EXPECT_FALSE(subscriptionSource_.isConnected());
 | 
			
		||||
        subscriptionSource_.stop();
 | 
			
		||||
    });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
 | 
			
		||||
struct SubscriptionSourceReadTestsBase : public SubscriptionSourceConnectionTestsBase {
 | 
			
		||||
    [[maybe_unused]] TestWsConnection
 | 
			
		||||
    connectAndSendMessage(std::string const message, boost::asio::yield_context yield)
 | 
			
		||||
    {
 | 
			
		||||
@@ -157,6 +175,8 @@ struct SubscriptionSourceReadTests : public SubscriptionSourceConnectionTests {
 | 
			
		||||
    }
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct SubscriptionSourceReadTests : util::prometheus::WithPrometheus, SubscriptionSourceReadTestsBase {};
 | 
			
		||||
 | 
			
		||||
TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
 | 
			
		||||
{
 | 
			
		||||
    boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
 | 
			
		||||
@@ -167,7 +187,7 @@ TEST_F(SubscriptionSourceReadTests, GotWrongMessage_Reconnect)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -179,7 +199,7 @@ TEST_F(SubscriptionSourceReadTests, GotResult)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -191,7 +211,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndex)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*networkValidatedLedgers_, push(123));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -206,7 +226,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAsString_Reconnect)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -220,7 +240,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersAsNumber_Reconn
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -242,7 +262,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgers)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
 | 
			
		||||
    EXPECT_TRUE(subscriptionSource_.hasLedger(123));
 | 
			
		||||
@@ -268,7 +288,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithValidatedLedgersWrongValue_Reco
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -286,7 +306,7 @@ TEST_F(SubscriptionSourceReadTests, GotResultWithLedgerIndexAndValidatedLedgers)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*networkValidatedLedgers_, push(123));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
 | 
			
		||||
@@ -306,7 +326,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosed)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -321,7 +341,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedForwardingIsSet)
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onLedgerClosedHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() {
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() {
 | 
			
		||||
        EXPECT_FALSE(subscriptionSource_.isForwarding());
 | 
			
		||||
        subscriptionSource_.stop();
 | 
			
		||||
    });
 | 
			
		||||
@@ -336,7 +356,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndex)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*networkValidatedLedgers_, push(123));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -351,7 +371,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAsString_Recon
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -365,7 +385,7 @@ TEST_F(SubscriptionSourceReadTests, GorLedgerClosedWithValidatedLedgersAsNumber_
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call()).Times(2);
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([]() {}).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -382,7 +402,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithValidatedLedgers)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
 | 
			
		||||
    EXPECT_FALSE(subscriptionSource_.hasLedger(0));
 | 
			
		||||
@@ -406,7 +426,7 @@ TEST_F(SubscriptionSourceReadTests, GotLedgerClosedWithLedgerIndexAndValidatedLe
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*networkValidatedLedgers_, push(123));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
 | 
			
		||||
@@ -425,7 +445,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingFalse)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -440,7 +460,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionIsForwardingTrue)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -456,7 +476,7 @@ TEST_F(SubscriptionSourceReadTests, GotTransactionWithMetaIsForwardingFalse)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*subscriptionManager_, forwardProposedTransaction(message)).Times(0);
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -469,7 +489,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingFalse)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -484,7 +504,7 @@ TEST_F(SubscriptionSourceReadTests, GotValidationReceivedIsForwardingTrue)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*subscriptionManager_, forwardValidation(message));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -497,7 +517,7 @@ TEST_F(SubscriptionSourceReadTests, GotManiefstReceivedIsForwardingFalse)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
@@ -512,7 +532,7 @@ TEST_F(SubscriptionSourceReadTests, GotManifestReceivedIsForwardingTrue)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(true)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(*subscriptionManager_, forwardManifest(message));
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
@@ -525,7 +545,7 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call()).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
 | 
			
		||||
    auto const actualLastTimeMessage = subscriptionSource_.lastMessageTime();
 | 
			
		||||
@@ -533,3 +553,27 @@ TEST_F(SubscriptionSourceReadTests, LastMessageTime)
 | 
			
		||||
    auto const diff = std::chrono::duration_cast<std::chrono::milliseconds>(now - actualLastTimeMessage);
 | 
			
		||||
    EXPECT_LT(diff, std::chrono::milliseconds(100));
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct SubscriptionSourcePrometheusCounterTests : util::prometheus::WithMockPrometheus,
 | 
			
		||||
                                                  SubscriptionSourceReadTestsBase {};
 | 
			
		||||
 | 
			
		||||
TEST_F(SubscriptionSourcePrometheusCounterTests, LastMessageTime)
 | 
			
		||||
{
 | 
			
		||||
    auto& lastMessageTimeMock = makeMock<util::prometheus::GaugeInt>(
 | 
			
		||||
        "subscription_source_last_message_time", fmt::format("{{source=\"127.0.0.1:{}\"}}", wsServer_.port())
 | 
			
		||||
    );
 | 
			
		||||
    boost::asio::spawn(ioContext_, [this](boost::asio::yield_context yield) {
 | 
			
		||||
        auto connection = connectAndSendMessage("some_message", yield);
 | 
			
		||||
        connection.close(yield);
 | 
			
		||||
    });
 | 
			
		||||
 | 
			
		||||
    EXPECT_CALL(onConnectHook_, Call());
 | 
			
		||||
    EXPECT_CALL(onDisconnectHook_, Call(false)).WillOnce([this]() { subscriptionSource_.stop(); });
 | 
			
		||||
    EXPECT_CALL(lastMessageTimeMock, set).WillOnce([](int64_t value) {
 | 
			
		||||
        auto const now =
 | 
			
		||||
            std::chrono::duration_cast<std::chrono::seconds>(std::chrono::system_clock::now().time_since_epoch())
 | 
			
		||||
                .count();
 | 
			
		||||
        EXPECT_LE(now - value, 1);
 | 
			
		||||
    });
 | 
			
		||||
    ioContext_.run();
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -24,6 +24,7 @@
 | 
			
		||||
#include "rpc/common/Types.hpp"
 | 
			
		||||
#include "util/Fixtures.hpp"
 | 
			
		||||
#include "util/MockPrometheus.hpp"
 | 
			
		||||
#include "util/NameGenerator.hpp"
 | 
			
		||||
#include "util/TestObject.hpp"
 | 
			
		||||
 | 
			
		||||
#include <boost/asio/impl/spawn.hpp>
 | 
			
		||||
@@ -538,3 +539,67 @@ TEST_F(RPCHelpersTest, ParseIssue)
 | 
			
		||||
        std::runtime_error
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
struct IsAdminCmdParamTestCaseBundle {
 | 
			
		||||
    std::string testName;
 | 
			
		||||
    std::string method;
 | 
			
		||||
    std::string testJson;
 | 
			
		||||
    bool expected;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct IsAdminCmdParameterTest : public TestWithParam<IsAdminCmdParamTestCaseBundle> {};
 | 
			
		||||
 | 
			
		||||
static auto
 | 
			
		||||
generateTestValuesForParametersTest()
 | 
			
		||||
{
 | 
			
		||||
    return std::vector<IsAdminCmdParamTestCaseBundle>{
 | 
			
		||||
        {"ledgerEntry", "ledger_entry", R"({"type": false})", false},
 | 
			
		||||
 | 
			
		||||
        {"featureVetoedTrue", "feature", R"({"vetoed": true, "feature": "foo"})", true},
 | 
			
		||||
        {"featureVetoedFalse", "feature", R"({"vetoed": false, "feature": "foo"})", true},
 | 
			
		||||
        {"featureVetoedIsStr", "feature", R"({"vetoed": "String"})", true},
 | 
			
		||||
 | 
			
		||||
        {"ledger", "ledger", R"({})", false},
 | 
			
		||||
        {"ledgerWithType", "ledger", R"({"type": "fee"})", false},
 | 
			
		||||
        {"ledgerFullTrue", "ledger", R"({"full": true})", true},
 | 
			
		||||
        {"ledgerFullFalse", "ledger", R"({"full": false})", false},
 | 
			
		||||
        {"ledgerFullIsStr", "ledger", R"({"full": "String"})", true},
 | 
			
		||||
        {"ledgerFullIsEmptyStr", "ledger", R"({"full": ""})", false},
 | 
			
		||||
        {"ledgerFullIsNumber1", "ledger", R"({"full": 1})", true},
 | 
			
		||||
        {"ledgerFullIsNumber0", "ledger", R"({"full": 0})", false},
 | 
			
		||||
        {"ledgerFullIsNull", "ledger", R"({"full": null})", false},
 | 
			
		||||
        {"ledgerFullIsFloat0", "ledger", R"({"full": 0.0})", false},
 | 
			
		||||
        {"ledgerFullIsFloat1", "ledger", R"({"full": 0.1})", true},
 | 
			
		||||
        {"ledgerFullIsArray", "ledger", R"({"full": [1]})", true},
 | 
			
		||||
        {"ledgerFullIsEmptyArray", "ledger", R"({"full": []})", false},
 | 
			
		||||
        {"ledgerFullIsObject", "ledger", R"({"full": {"key": 1}})", true},
 | 
			
		||||
        {"ledgerFullIsEmptyObject", "ledger", R"({"full": {}})", false},
 | 
			
		||||
 | 
			
		||||
        {"ledgerAccountsTrue", "ledger", R"({"accounts": true})", true},
 | 
			
		||||
        {"ledgerAccountsFalse", "ledger", R"({"accounts": false})", false},
 | 
			
		||||
        {"ledgerAccountsIsStr", "ledger", R"({"accounts": "String"})", true},
 | 
			
		||||
        {"ledgerAccountsIsEmptyStr", "ledger", R"({"accounts": ""})", false},
 | 
			
		||||
        {"ledgerAccountsIsNumber1", "ledger", R"({"accounts": 1})", true},
 | 
			
		||||
        {"ledgerAccountsIsNumber0", "ledger", R"({"accounts": 0})", false},
 | 
			
		||||
        {"ledgerAccountsIsNull", "ledger", R"({"accounts": null})", false},
 | 
			
		||||
        {"ledgerAccountsIsFloat0", "ledger", R"({"accounts": 0.0})", false},
 | 
			
		||||
        {"ledgerAccountsIsFloat1", "ledger", R"({"accounts": 0.1})", true},
 | 
			
		||||
        {"ledgerAccountsIsArray", "ledger", R"({"accounts": [1]})", true},
 | 
			
		||||
        {"ledgerAccountsIsEmptyArray", "ledger", R"({"accounts": []})", false},
 | 
			
		||||
        {"ledgerAccountsIsObject", "ledger", R"({"accounts": {"key": 1}})", true},
 | 
			
		||||
        {"ledgerAccountsIsEmptyObject", "ledger", R"({"accounts": {}})", false},
 | 
			
		||||
    };
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
INSTANTIATE_TEST_CASE_P(
 | 
			
		||||
    IsAdminCmdTest,
 | 
			
		||||
    IsAdminCmdParameterTest,
 | 
			
		||||
    ValuesIn(generateTestValuesForParametersTest()),
 | 
			
		||||
    tests::util::NameGenerator
 | 
			
		||||
);
 | 
			
		||||
 | 
			
		||||
TEST_P(IsAdminCmdParameterTest, Test)
 | 
			
		||||
{
 | 
			
		||||
    auto const testBundle = GetParam();
 | 
			
		||||
    EXPECT_EQ(isAdminCmd(testBundle.method, boost::json::parse(testBundle.testJson).as_object()), testBundle.expected);
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -769,14 +769,13 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
 | 
			
		||||
 | 
			
		||||
    auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
 | 
			
		||||
    auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
 | 
			
		||||
    ON_CALL(*backend, fetchAccountTransactions).WillByDefault(Return(transCursor));
 | 
			
		||||
    EXPECT_CALL(
 | 
			
		||||
        *backend,
 | 
			
		||||
        fetchAccountTransactions(
 | 
			
		||||
            testing::_, testing::_, false, testing::Optional(testing::Eq(TransactionsCursor{10, 11})), testing::_
 | 
			
		||||
        )
 | 
			
		||||
    )
 | 
			
		||||
        .Times(1);
 | 
			
		||||
        .WillOnce(Return(transCursor));
 | 
			
		||||
 | 
			
		||||
    runSpawn([&, this](auto yield) {
 | 
			
		||||
        auto const handler = AnyHandler{AccountTxHandler{backend}};
 | 
			
		||||
@@ -804,6 +803,73 @@ TEST_F(RPCAccountTxHandlerTest, LimitAndMarker)
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(RPCAccountTxHandlerTest, LimitIsCapped)
 | 
			
		||||
{
 | 
			
		||||
    backend->setRange(MINSEQ, MAXSEQ);
 | 
			
		||||
 | 
			
		||||
    auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
 | 
			
		||||
    auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
 | 
			
		||||
    EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
 | 
			
		||||
        .WillOnce(Return(transCursor));
 | 
			
		||||
 | 
			
		||||
    runSpawn([&, this](auto yield) {
 | 
			
		||||
        auto const handler = AnyHandler{AccountTxHandler{backend}};
 | 
			
		||||
        auto static const input = json::parse(fmt::format(
 | 
			
		||||
            R"({{
 | 
			
		||||
                "account": "{}",
 | 
			
		||||
                "ledger_index_min": {},
 | 
			
		||||
                "ledger_index_max": {},
 | 
			
		||||
                "limit": 100000,
 | 
			
		||||
                "forward": false
 | 
			
		||||
            }})",
 | 
			
		||||
            ACCOUNT,
 | 
			
		||||
            -1,
 | 
			
		||||
            -1
 | 
			
		||||
        ));
 | 
			
		||||
        auto const output = handler.process(input, Context{yield});
 | 
			
		||||
        ASSERT_TRUE(output);
 | 
			
		||||
        EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
 | 
			
		||||
        EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
 | 
			
		||||
        EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
 | 
			
		||||
        EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX);
 | 
			
		||||
        EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(RPCAccountTxHandlerTest, LimitAllowedUpToCap)
 | 
			
		||||
{
 | 
			
		||||
    backend->setRange(MINSEQ, MAXSEQ);
 | 
			
		||||
 | 
			
		||||
    auto const transactions = genTransactions(MINSEQ + 1, MAXSEQ - 1);
 | 
			
		||||
    auto const transCursor = TransactionsAndCursor{transactions, TransactionsCursor{12, 34}};
 | 
			
		||||
    EXPECT_CALL(*backend, fetchAccountTransactions(testing::_, testing::_, false, testing::_, testing::_))
 | 
			
		||||
        .WillOnce(Return(transCursor));
 | 
			
		||||
 | 
			
		||||
    runSpawn([&, this](auto yield) {
 | 
			
		||||
        auto const handler = AnyHandler{AccountTxHandler{backend}};
 | 
			
		||||
        auto static const input = json::parse(fmt::format(
 | 
			
		||||
            R"({{
 | 
			
		||||
                "account": "{}",
 | 
			
		||||
                "ledger_index_min": {},
 | 
			
		||||
                "ledger_index_max": {},
 | 
			
		||||
                "limit": {},
 | 
			
		||||
                "forward": false
 | 
			
		||||
            }})",
 | 
			
		||||
            ACCOUNT,
 | 
			
		||||
            -1,
 | 
			
		||||
            -1,
 | 
			
		||||
            AccountTxHandler::LIMIT_MAX - 1
 | 
			
		||||
        ));
 | 
			
		||||
        auto const output = handler.process(input, Context{yield});
 | 
			
		||||
        ASSERT_TRUE(output);
 | 
			
		||||
        EXPECT_EQ(output.result->at("account").as_string(), ACCOUNT);
 | 
			
		||||
        EXPECT_EQ(output.result->at("ledger_index_min").as_uint64(), MINSEQ);
 | 
			
		||||
        EXPECT_EQ(output.result->at("ledger_index_max").as_uint64(), MAXSEQ);
 | 
			
		||||
        EXPECT_EQ(output.result->at("limit").as_uint64(), AccountTxHandler::LIMIT_MAX - 1);
 | 
			
		||||
        EXPECT_EQ(output.result->at("transactions").as_array().size(), 2);
 | 
			
		||||
    });
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(RPCAccountTxHandlerTest, SpecificLedgerIndex)
 | 
			
		||||
{
 | 
			
		||||
    backend->setRange(MINSEQ, MAXSEQ);
 | 
			
		||||
 
 | 
			
		||||
@@ -22,6 +22,7 @@
 | 
			
		||||
#include "util/MockPrometheus.hpp"
 | 
			
		||||
#include "util/TestHttpSyncClient.hpp"
 | 
			
		||||
#include "util/config/Config.hpp"
 | 
			
		||||
#include "util/prometheus/Gauge.hpp"
 | 
			
		||||
#include "util/prometheus/Label.hpp"
 | 
			
		||||
#include "util/prometheus/Prometheus.hpp"
 | 
			
		||||
#include "web/DOSGuard.hpp"
 | 
			
		||||
@@ -42,6 +43,7 @@
 | 
			
		||||
#include <boost/json/parse.hpp>
 | 
			
		||||
#include <boost/system/system_error.hpp>
 | 
			
		||||
#include <fmt/core.h>
 | 
			
		||||
#include <gmock/gmock.h>
 | 
			
		||||
#include <gtest/gtest.h>
 | 
			
		||||
 | 
			
		||||
#include <chrono>
 | 
			
		||||
@@ -202,6 +204,8 @@ private:
 | 
			
		||||
    std::optional<std::thread> runner;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
struct WebServerTestsWithMockPrometheus : WebServerTest, prometheus::WithMockPrometheus {};
 | 
			
		||||
 | 
			
		||||
class EchoExecutor {
 | 
			
		||||
public:
 | 
			
		||||
    void
 | 
			
		||||
@@ -263,7 +267,7 @@ makeServerSync(
 | 
			
		||||
 | 
			
		||||
}  // namespace
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, Http)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, Http)
 | 
			
		||||
{
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
 | 
			
		||||
@@ -271,8 +275,13 @@ TEST_F(WebServerTest, Http)
 | 
			
		||||
    EXPECT_EQ(res, R"({"Hello":1})");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, Ws)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, Ws)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
 | 
			
		||||
    WebSocketSyncClient wsClient;
 | 
			
		||||
@@ -282,7 +291,7 @@ TEST_F(WebServerTest, Ws)
 | 
			
		||||
    wsClient.disconnect();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, HttpInternalError)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, HttpInternalError)
 | 
			
		||||
{
 | 
			
		||||
    auto e = std::make_shared<ExceptionExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
 | 
			
		||||
@@ -293,8 +302,13 @@ TEST_F(WebServerTest, HttpInternalError)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, WsInternalError)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, WsInternalError)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<ExceptionExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
 | 
			
		||||
    WebSocketSyncClient wsClient;
 | 
			
		||||
@@ -307,8 +321,13 @@ TEST_F(WebServerTest, WsInternalError)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, WsInternalErrorNotJson)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, WsInternalErrorNotJson)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<ExceptionExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuard, e);
 | 
			
		||||
    WebSocketSyncClient wsClient;
 | 
			
		||||
@@ -321,7 +340,7 @@ TEST_F(WebServerTest, WsInternalErrorNotJson)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, Https)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, Https)
 | 
			
		||||
{
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto sslCtx = parseCertsForTest();
 | 
			
		||||
@@ -331,8 +350,13 @@ TEST_F(WebServerTest, Https)
 | 
			
		||||
    EXPECT_EQ(res, R"({"Hello":1})");
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, Wss)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, Wss)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto sslCtx = parseCertsForTest();
 | 
			
		||||
    auto const ctxSslRef = sslCtx ? std::optional<std::reference_wrapper<ssl::context>>{sslCtx.value()} : std::nullopt;
 | 
			
		||||
@@ -345,7 +369,7 @@ TEST_F(WebServerTest, Wss)
 | 
			
		||||
    wsClient.disconnect();
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, HttpRequestOverload)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, HttpRequestOverload)
 | 
			
		||||
{
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
 | 
			
		||||
@@ -358,8 +382,13 @@ TEST_F(WebServerTest, HttpRequestOverload)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, WsRequestOverload)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, WsRequestOverload)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1)).Times(2);
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1)).Times(2);
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto const server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
 | 
			
		||||
    WebSocketSyncClient wsClient;
 | 
			
		||||
@@ -377,7 +406,7 @@ TEST_F(WebServerTest, WsRequestOverload)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, HttpPayloadOverload)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, HttpPayloadOverload)
 | 
			
		||||
{
 | 
			
		||||
    std::string const s100(100, 'a');
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
@@ -389,8 +418,13 @@ TEST_F(WebServerTest, HttpPayloadOverload)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, WsPayloadOverload)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, WsPayloadOverload)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    std::string const s100(100, 'a');
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
 | 
			
		||||
@@ -404,7 +438,7 @@ TEST_F(WebServerTest, WsPayloadOverload)
 | 
			
		||||
    );
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
TEST_F(WebServerTest, WsTooManyConnection)
 | 
			
		||||
TEST_F(WebServerTestsWithMockPrometheus, WsTooManyConnection)
 | 
			
		||||
{
 | 
			
		||||
    auto e = std::make_shared<EchoExecutor>();
 | 
			
		||||
    auto server = makeServerSync(cfg, ctx, std::nullopt, dosGuardOverload, e);
 | 
			
		||||
@@ -510,10 +544,17 @@ struct WebServerAdminTestParams {
 | 
			
		||||
    std::string expectedResponse;
 | 
			
		||||
};
 | 
			
		||||
 | 
			
		||||
class WebServerAdminTest : public WebServerTest, public ::testing::WithParamInterface<WebServerAdminTestParams> {};
 | 
			
		||||
class WebServerAdminTest : public WebServerTest,
 | 
			
		||||
                           public ::testing::WithParamInterface<WebServerAdminTestParams>,
 | 
			
		||||
                           public prometheus::WithMockPrometheus {};
 | 
			
		||||
 | 
			
		||||
TEST_P(WebServerAdminTest, WsAdminCheck)
 | 
			
		||||
{
 | 
			
		||||
    ::testing::StrictMock<util::prometheus::MockCounterImplInt>& wsMessagesCounterMock =
 | 
			
		||||
        makeMock<util::prometheus::GaugeInt>("ws_messages_length", "");
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(1));
 | 
			
		||||
    EXPECT_CALL(wsMessagesCounterMock, add(-1));
 | 
			
		||||
 | 
			
		||||
    auto e = std::make_shared<AdminCheckExecutor>();
 | 
			
		||||
    Config const serverConfig{parse(GetParam().config)};
 | 
			
		||||
    auto server = makeServerSync(serverConfig, ctx, std::nullopt, dosGuardOverload, e);
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user