diff --git a/src/app/ClioApplication.cpp b/src/app/ClioApplication.cpp index fdb9dbe6..54b4bb70 100644 --- a/src/app/ClioApplication.cpp +++ b/src/app/ClioApplication.cpp @@ -101,9 +101,7 @@ ClioApplication::run() auto backend = data::make_Backend(config_); // Manages clients subscribed to streams - auto subscriptionsRunner = feed::SubscriptionManagerRunner(config_, backend); - - auto const subscriptions = subscriptionsRunner.getManager(); + auto subscriptions = feed::SubscriptionManager::make_SubscriptionManager(config_, backend); // Tracks which ledgers have been validated by the network auto ledgers = etl::NetworkValidatedLedgers::make_ValidatedLedgers(); diff --git a/src/feed/SubscriptionManager.hpp b/src/feed/SubscriptionManager.hpp index e79cc0c3..d4d3ff14 100644 --- a/src/feed/SubscriptionManager.hpp +++ b/src/feed/SubscriptionManager.hpp @@ -30,6 +30,7 @@ #include "feed/impl/TransactionFeed.hpp" #include "util/async/AnyExecutionContext.hpp" #include "util/async/context/BasicExecutionContext.hpp" +#include "util/config/Config.hpp" #include "util/log/Logger.hpp" #include @@ -44,6 +45,7 @@ #include #include #include +#include #include /** @@ -67,16 +69,36 @@ class SubscriptionManager : public SubscriptionManagerInterface { impl::ProposedTransactionFeed proposedTransactionFeed_; public: + /** + * @brief Factory function to create a new SubscriptionManager with a PoolExecutionContext. + * + * @param config The configuration to use + * @param backend The backend to use + * @return A shared pointer to a new instance of SubscriptionManager + */ + static std::shared_ptr + make_SubscriptionManager(util::Config const& config, std::shared_ptr const& backend) + { + auto const workersNum = config.valueOr("subscription_workers", 1); + + util::Logger logger{"Subscriptions"}; + LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers"; + + return std::make_shared(util::async::PoolExecutionContext(workersNum), backend); + } + /** * @brief Construct a new Subscription Manager object * * @param executor The executor to use to publish the feeds * @param backend The backend to use */ - template - SubscriptionManager(ExecutorCtx& executor, std::shared_ptr const& backend) + SubscriptionManager( + util::async::AnyExecutionContext&& executor, + std::shared_ptr const& backend + ) : backend_(backend) - , ctx_(executor) + , ctx_(std::move(executor)) , manifestFeed_(ctx_, "manifest") , validationsFeed_(ctx_, "validations") , ledgerFeed_(ctx_) @@ -291,41 +313,4 @@ public: report() const final; }; -/** - * @brief The help class to run the subscription manager. The container of PoolExecutionContext which is used to publish - * the feeds. - */ -class SubscriptionManagerRunner { - std::uint64_t workersNum_; - using ActualExecutionCtx = util::async::PoolExecutionContext; - ActualExecutionCtx ctx_; - std::shared_ptr subscriptionManager_; - util::Logger logger_{"Subscriptions"}; - -public: - /** - * @brief Construct a new Subscription Manager Runner object - * - * @param config The configuration - * @param backend The backend to use - */ - SubscriptionManagerRunner(util::Config const& config, std::shared_ptr const& backend) - : workersNum_(config.valueOr("subscription_workers", 1)) - , ctx_(workersNum_) - , subscriptionManager_(std::make_shared(ctx_, backend)) - { - LOG(logger_.info()) << "Starting subscription manager with " << workersNum_ << " workers"; - } - - /** - * @brief Get the subscription manager - * - * @return The subscription manager - */ - std::shared_ptr - getManager() - { - return subscriptionManager_; - } -}; } // namespace feed diff --git a/src/util/async/context/BasicExecutionContext.hpp b/src/util/async/context/BasicExecutionContext.hpp index 05ad1597..53370db4 100644 --- a/src/util/async/context/BasicExecutionContext.hpp +++ b/src/util/async/context/BasicExecutionContext.hpp @@ -369,7 +369,7 @@ public: * @brief Block until all operations are completed */ void - join() noexcept + join() const noexcept { context_.join(); } diff --git a/tests/unit/feed/LedgerFeedTests.cpp b/tests/unit/feed/LedgerFeedTests.cpp index 316dad30..2e53ee16 100644 --- a/tests/unit/feed/LedgerFeedTests.cpp +++ b/tests/unit/feed/LedgerFeedTests.cpp @@ -21,6 +21,7 @@ #include "feed/impl/LedgerFeed.hpp" #include "util/TestObject.hpp" +#include #include #include #include @@ -57,11 +58,13 @@ TEST_F(FeedLedgerTest, SubPub) "reserve_base":3, "reserve_inc":2 })"; - boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) { + boost::asio::io_context ioContext; + boost::asio::spawn(ioContext, [this](boost::asio::yield_context yield) { auto res = testFeedPtr->sub(yield, backend, sessionPtr); // check the response EXPECT_EQ(res, json::parse(LedgerResponse)); }); + ioContext.run(); EXPECT_EQ(testFeedPtr->count(), 1); constexpr static auto ledgerPub = diff --git a/tests/unit/feed/SubscriptionManagerTests.cpp b/tests/unit/feed/SubscriptionManagerTests.cpp index f63bfe10..75ad194a 100644 --- a/tests/unit/feed/SubscriptionManagerTests.cpp +++ b/tests/unit/feed/SubscriptionManagerTests.cpp @@ -28,6 +28,7 @@ #include "util/async/context/SyncExecutionContext.hpp" #include "web/interface/ConnectionBase.hpp" +#include #include #include #include @@ -57,13 +58,12 @@ class SubscriptionManagerBaseTest : public util::prometheus::WithPrometheus, pub protected: std::shared_ptr subscriptionManagerPtr; std::shared_ptr session; - Execution ctx{2}; MockSession* sessionPtr = nullptr; void SetUp() override { - subscriptionManagerPtr = std::make_shared(ctx, backend); + subscriptionManagerPtr = std::make_shared(Execution(2), backend); session = std::make_shared(); session->apiSubVersion = 1; sessionPtr = dynamic_cast(session.get()); @@ -264,11 +264,13 @@ TEST_F(SubscriptionManagerTest, LedgerTest) "reserve_base":3, "reserve_inc":2 })"; + boost::asio::io_context ctx; boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) { auto const res = subscriptionManagerPtr->subLedger(yield, session); // check the response EXPECT_EQ(res, json::parse(LedgerResponse)); }); + ctx.run(); EXPECT_EQ(subscriptionManagerPtr->report()["ledger"], 1); // test publish