diff --git a/src/feed/SubscriptionManager.cpp b/src/feed/SubscriptionManager.cpp index 859bf25e..0fec96ce 100644 --- a/src/feed/SubscriptionManager.cpp +++ b/src/feed/SubscriptionManager.cpp @@ -35,6 +35,12 @@ Subscription::unsubscribe(SessionPtrType const& session) boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); }); } +bool +Subscription::hasSession(SessionPtrType const& session) +{ + return subscribers_.contains(session); +} + void Subscription::publish(std::shared_ptr const& message) { @@ -334,6 +340,8 @@ SubscriptionManager::unsubProposedTransactions(SessionPtrType session) void SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func) { + if (subs.hasSession(session)) + return; subs.subscribe(session); std::scoped_lock lk(cleanupMtx_); cleanupFuncs_[session].push_back(std::move(func)); @@ -347,6 +355,8 @@ SubscriptionManager::subscribeHelper( SubscriptionMap& subs, CleanupFunction&& func) { + if (subs.hasSession(session, k)) + return; subs.subscribe(session, k); std::scoped_lock lk(cleanupMtx_); cleanupFuncs_[session].push_back(std::move(func)); diff --git a/src/feed/SubscriptionManager.h b/src/feed/SubscriptionManager.h index cc255597..fe7e4670 100644 --- a/src/feed/SubscriptionManager.h +++ b/src/feed/SubscriptionManager.h @@ -139,6 +139,15 @@ public: void unsubscribe(SessionPtrType const& session); + /** + * @brief Check if a session has been in subscribers list. + * + * @param session The session to check + * @return true if the session is in the subscribers list; false otherwise + */ + bool + hasSession(SessionPtrType const& session); + /** * @brief Sends the given message to all subscribers. * @@ -232,6 +241,22 @@ public: }); } + /** + * @brief Check if a session has been in subscribers list. + * + * @param session The session to check + * @param key The key for the subscription to check + * @return true if the session is in the subscribers list; false otherwise + */ + bool + hasSession(SessionPtrType const& session, Key const& key) + { + if (!subscribers_.contains(key)) + return false; + + return subscribers_[key].contains(session); + } + /** * @brief Sends the given message to all subscribers. * diff --git a/unittests/SubscriptionManagerTests.cpp b/unittests/SubscriptionManagerTests.cpp index 5e053961..83b05ad1 100644 --- a/unittests/SubscriptionManagerTests.cpp +++ b/unittests/SubscriptionManagerTests.cpp @@ -161,7 +161,6 @@ TEST_F(SubscriptionManagerSimpleBackendTest, ReportCurrentSubscriber) EXPECT_EQ(reportReturn["books"], result); }; checkResult(subManagerPtr->report(), 1); - subManagerPtr->cleanup(session2); subManagerPtr->cleanup(session2); // clean a removed session std::this_thread::sleep_for(20ms); checkResult(subManagerPtr->report(), 0); diff --git a/unittests/SubscriptionTests.cpp b/unittests/SubscriptionTests.cpp index c0f0ecf8..0bc1597b 100644 --- a/unittests/SubscriptionTests.cpp +++ b/unittests/SubscriptionTests.cpp @@ -53,6 +53,8 @@ TEST_F(SubscriptionTest, SubscriptionCount) ctx.restart(); ctx.run(); EXPECT_EQ(sub.count(), 2); + EXPECT_TRUE(sub.hasSession(session1)); + EXPECT_TRUE(sub.hasSession(session2)); EXPECT_FALSE(sub.empty()); sub.unsubscribe(session1); ctx.restart(); @@ -67,6 +69,8 @@ TEST_F(SubscriptionTest, SubscriptionCount) ctx.run(); EXPECT_EQ(sub.count(), 0); EXPECT_TRUE(sub.empty()); + EXPECT_FALSE(sub.hasSession(session1)); + EXPECT_FALSE(sub.hasSession(session2)); } // send interface will be called when publish called @@ -131,6 +135,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount) ctx.restart(); ctx.run(); EXPECT_EQ(subMap.count(), 3); + EXPECT_TRUE(subMap.hasSession(session1, "topic1")); + EXPECT_TRUE(subMap.hasSession(session2, "topic1")); + EXPECT_TRUE(subMap.hasSession(session3, "topic2")); subMap.unsubscribe(session1, "topic1"); ctx.restart(); ctx.run(); @@ -139,6 +146,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount) subMap.unsubscribe(session3, "topic2"); ctx.restart(); ctx.run(); + EXPECT_FALSE(subMap.hasSession(session1, "topic1")); + EXPECT_FALSE(subMap.hasSession(session2, "topic1")); + EXPECT_FALSE(subMap.hasSession(session3, "topic2")); EXPECT_EQ(subMap.count(), 0); subMap.unsubscribe(session3, "topic2"); subMap.unsubscribe(session3, "no exist");