refactor: Remove SubscriptionManagerRunner (#1623)

This commit is contained in:
cyan317
2024-09-06 10:34:23 +01:00
committed by GitHub
parent dbb8d9eedd
commit 44c07e7332
5 changed files with 35 additions and 47 deletions

View File

@@ -101,9 +101,7 @@ ClioApplication::run()
auto backend = data::make_Backend(config_); auto backend = data::make_Backend(config_);
// Manages clients subscribed to streams // Manages clients subscribed to streams
auto subscriptionsRunner = feed::SubscriptionManagerRunner(config_, backend); auto subscriptions = feed::SubscriptionManager::make_SubscriptionManager(config_, backend);
auto const subscriptions = subscriptionsRunner.getManager();
// Tracks which ledgers have been validated by the network // Tracks which ledgers have been validated by the network
auto ledgers = etl::NetworkValidatedLedgers::make_ValidatedLedgers(); auto ledgers = etl::NetworkValidatedLedgers::make_ValidatedLedgers();

View File

@@ -30,6 +30,7 @@
#include "feed/impl/TransactionFeed.hpp" #include "feed/impl/TransactionFeed.hpp"
#include "util/async/AnyExecutionContext.hpp" #include "util/async/AnyExecutionContext.hpp"
#include "util/async/context/BasicExecutionContext.hpp" #include "util/async/context/BasicExecutionContext.hpp"
#include "util/config/Config.hpp"
#include "util/log/Logger.hpp" #include "util/log/Logger.hpp"
#include <boost/asio/executor_work_guard.hpp> #include <boost/asio/executor_work_guard.hpp>
@@ -44,6 +45,7 @@
#include <cstdint> #include <cstdint>
#include <memory> #include <memory>
#include <string> #include <string>
#include <utility>
#include <vector> #include <vector>
/** /**
@@ -67,16 +69,36 @@ class SubscriptionManager : public SubscriptionManagerInterface {
impl::ProposedTransactionFeed proposedTransactionFeed_; impl::ProposedTransactionFeed proposedTransactionFeed_;
public: public:
/**
* @brief Factory function to create a new SubscriptionManager with a PoolExecutionContext.
*
* @param config The configuration to use
* @param backend The backend to use
* @return A shared pointer to a new instance of SubscriptionManager
*/
static std::shared_ptr<SubscriptionManager>
make_SubscriptionManager(util::Config const& config, std::shared_ptr<data::BackendInterface const> const& backend)
{
auto const workersNum = config.valueOr<std::uint64_t>("subscription_workers", 1);
util::Logger logger{"Subscriptions"};
LOG(logger.info()) << "Starting subscription manager with " << workersNum << " workers";
return std::make_shared<feed::SubscriptionManager>(util::async::PoolExecutionContext(workersNum), backend);
}
/** /**
* @brief Construct a new Subscription Manager object * @brief Construct a new Subscription Manager object
* *
* @param executor The executor to use to publish the feeds * @param executor The executor to use to publish the feeds
* @param backend The backend to use * @param backend The backend to use
*/ */
template <class ExecutorCtx> SubscriptionManager(
SubscriptionManager(ExecutorCtx& executor, std::shared_ptr<data::BackendInterface const> const& backend) util::async::AnyExecutionContext&& executor,
std::shared_ptr<data::BackendInterface const> const& backend
)
: backend_(backend) : backend_(backend)
, ctx_(executor) , ctx_(std::move(executor))
, manifestFeed_(ctx_, "manifest") , manifestFeed_(ctx_, "manifest")
, validationsFeed_(ctx_, "validations") , validationsFeed_(ctx_, "validations")
, ledgerFeed_(ctx_) , ledgerFeed_(ctx_)
@@ -291,41 +313,4 @@ public:
report() const final; report() const final;
}; };
/**
* @brief The help class to run the subscription manager. The container of PoolExecutionContext which is used to publish
* the feeds.
*/
class SubscriptionManagerRunner {
std::uint64_t workersNum_;
using ActualExecutionCtx = util::async::PoolExecutionContext;
ActualExecutionCtx ctx_;
std::shared_ptr<SubscriptionManager> subscriptionManager_;
util::Logger logger_{"Subscriptions"};
public:
/**
* @brief Construct a new Subscription Manager Runner object
*
* @param config The configuration
* @param backend The backend to use
*/
SubscriptionManagerRunner(util::Config const& config, std::shared_ptr<data::BackendInterface> const& backend)
: workersNum_(config.valueOr<std::uint64_t>("subscription_workers", 1))
, ctx_(workersNum_)
, subscriptionManager_(std::make_shared<SubscriptionManager>(ctx_, backend))
{
LOG(logger_.info()) << "Starting subscription manager with " << workersNum_ << " workers";
}
/**
* @brief Get the subscription manager
*
* @return The subscription manager
*/
std::shared_ptr<SubscriptionManager>
getManager()
{
return subscriptionManager_;
}
};
} // namespace feed } // namespace feed

View File

@@ -369,7 +369,7 @@ public:
* @brief Block until all operations are completed * @brief Block until all operations are completed
*/ */
void void
join() noexcept join() const noexcept
{ {
context_.join(); context_.join();
} }

View File

@@ -21,6 +21,7 @@
#include "feed/impl/LedgerFeed.hpp" #include "feed/impl/LedgerFeed.hpp"
#include "util/TestObject.hpp" #include "util/TestObject.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/json/parse.hpp> #include <boost/json/parse.hpp>
#include <gmock/gmock.h> #include <gmock/gmock.h>
@@ -57,11 +58,13 @@ TEST_F(FeedLedgerTest, SubPub)
"reserve_base":3, "reserve_base":3,
"reserve_inc":2 "reserve_inc":2
})"; })";
boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) { boost::asio::io_context ioContext;
boost::asio::spawn(ioContext, [this](boost::asio::yield_context yield) {
auto res = testFeedPtr->sub(yield, backend, sessionPtr); auto res = testFeedPtr->sub(yield, backend, sessionPtr);
// check the response // check the response
EXPECT_EQ(res, json::parse(LedgerResponse)); EXPECT_EQ(res, json::parse(LedgerResponse));
}); });
ioContext.run();
EXPECT_EQ(testFeedPtr->count(), 1); EXPECT_EQ(testFeedPtr->count(), 1);
constexpr static auto ledgerPub = constexpr static auto ledgerPub =

View File

@@ -28,6 +28,7 @@
#include "util/async/context/SyncExecutionContext.hpp" #include "util/async/context/SyncExecutionContext.hpp"
#include "web/interface/ConnectionBase.hpp" #include "web/interface/ConnectionBase.hpp"
#include <boost/asio/io_context.hpp>
#include <boost/asio/spawn.hpp> #include <boost/asio/spawn.hpp>
#include <boost/json/object.hpp> #include <boost/json/object.hpp>
#include <boost/json/parse.hpp> #include <boost/json/parse.hpp>
@@ -57,13 +58,12 @@ class SubscriptionManagerBaseTest : public util::prometheus::WithPrometheus, pub
protected: protected:
std::shared_ptr<SubscriptionManager> subscriptionManagerPtr; std::shared_ptr<SubscriptionManager> subscriptionManagerPtr;
std::shared_ptr<web::ConnectionBase> session; std::shared_ptr<web::ConnectionBase> session;
Execution ctx{2};
MockSession* sessionPtr = nullptr; MockSession* sessionPtr = nullptr;
void void
SetUp() override SetUp() override
{ {
subscriptionManagerPtr = std::make_shared<SubscriptionManager>(ctx, backend); subscriptionManagerPtr = std::make_shared<SubscriptionManager>(Execution(2), backend);
session = std::make_shared<MockSession>(); session = std::make_shared<MockSession>();
session->apiSubVersion = 1; session->apiSubVersion = 1;
sessionPtr = dynamic_cast<MockSession*>(session.get()); sessionPtr = dynamic_cast<MockSession*>(session.get());
@@ -264,11 +264,13 @@ TEST_F(SubscriptionManagerTest, LedgerTest)
"reserve_base":3, "reserve_base":3,
"reserve_inc":2 "reserve_inc":2
})"; })";
boost::asio::io_context ctx;
boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) { boost::asio::spawn(ctx, [this](boost::asio::yield_context yield) {
auto const res = subscriptionManagerPtr->subLedger(yield, session); auto const res = subscriptionManagerPtr->subLedger(yield, session);
// check the response // check the response
EXPECT_EQ(res, json::parse(LedgerResponse)); EXPECT_EQ(res, json::parse(LedgerResponse));
}); });
ctx.run();
EXPECT_EQ(subscriptionManagerPtr->report()["ledger"], 1); EXPECT_EQ(subscriptionManagerPtr->report()["ledger"], 1);
// test publish // test publish