Compare commits

...

2 Commits

Author SHA1 Message Date
cyan317
4f3b6e98ad Subscribe cleanup (#940)
Fix #939
2023-12-19 15:52:12 +00:00
cyan317
27279ceb6d Fix messages pile up (#921)
Fix #924
2023-12-19 15:51:39 +00:00
5 changed files with 48 additions and 4 deletions

View File

@@ -35,6 +35,12 @@ Subscription::unsubscribe(SessionPtrType const& session)
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
}
bool
Subscription::hasSession(SessionPtrType const& session)
{
return subscribers_.contains(session);
}
void
Subscription::publish(std::shared_ptr<std::string> const& message)
{
@@ -334,6 +340,8 @@ SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
void
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
{
if (subs.hasSession(session))
return;
subs.subscribe(session);
std::scoped_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
@@ -347,6 +355,8 @@ SubscriptionManager::subscribeHelper(
SubscriptionMap<Key>& subs,
CleanupFunction&& func)
{
if (subs.hasSession(session, k))
return;
subs.subscribe(session, k);
std::scoped_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));

View File

@@ -139,6 +139,15 @@ public:
void
unsubscribe(SessionPtrType const& session);
/**
* @brief Check if a session has been in subscribers list.
*
* @param session The session to check
* @return true if the session is in the subscribers list; false otherwise
*/
bool
hasSession(SessionPtrType const& session);
/**
* @brief Sends the given message to all subscribers.
*
@@ -232,6 +241,22 @@ public:
});
}
/**
* @brief Check if a session has been in subscribers list.
*
* @param session The session to check
* @param key The key for the subscription to check
* @return true if the session is in the subscribers list; false otherwise
*/
bool
hasSession(SessionPtrType const& session, Key const& key)
{
if (!subscribers_.contains(key))
return false;
return subscribers_[key].contains(session);
}
/**
* @brief Sends the given message to all subscribers.
*

View File

@@ -63,7 +63,7 @@ protected:
if (!ec_ && ec != boost::asio::error::operation_aborted)
{
ec_ = ec;
LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message();
LOG(perfLog_.error()) << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
(*handler_)(ec, derived().shared_from_this());
}
@@ -106,14 +106,14 @@ public:
void
onWrite(boost::system::error_code ec, std::size_t)
{
messages_.pop();
sending_ = false;
if (ec)
{
wsFail(ec, "Failed to write");
}
else
{
messages_.pop();
sending_ = false;
maybeSendNext();
}
}

View File

@@ -161,7 +161,6 @@ TEST_F(SubscriptionManagerSimpleBackendTest, ReportCurrentSubscriber)
EXPECT_EQ(reportReturn["books"], result);
};
checkResult(subManagerPtr->report(), 1);
subManagerPtr->cleanup(session2);
subManagerPtr->cleanup(session2); // clean a removed session
std::this_thread::sleep_for(20ms);
checkResult(subManagerPtr->report(), 0);

View File

@@ -53,6 +53,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
ctx.restart();
ctx.run();
EXPECT_EQ(sub.count(), 2);
EXPECT_TRUE(sub.hasSession(session1));
EXPECT_TRUE(sub.hasSession(session2));
EXPECT_FALSE(sub.empty());
sub.unsubscribe(session1);
ctx.restart();
@@ -67,6 +69,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
ctx.run();
EXPECT_EQ(sub.count(), 0);
EXPECT_TRUE(sub.empty());
EXPECT_FALSE(sub.hasSession(session1));
EXPECT_FALSE(sub.hasSession(session2));
}
// send interface will be called when publish called
@@ -131,6 +135,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
ctx.restart();
ctx.run();
EXPECT_EQ(subMap.count(), 3);
EXPECT_TRUE(subMap.hasSession(session1, "topic1"));
EXPECT_TRUE(subMap.hasSession(session2, "topic1"));
EXPECT_TRUE(subMap.hasSession(session3, "topic2"));
subMap.unsubscribe(session1, "topic1");
ctx.restart();
ctx.run();
@@ -139,6 +146,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
subMap.unsubscribe(session3, "topic2");
ctx.restart();
ctx.run();
EXPECT_FALSE(subMap.hasSession(session1, "topic1"));
EXPECT_FALSE(subMap.hasSession(session2, "topic1"));
EXPECT_FALSE(subMap.hasSession(session3, "topic2"));
EXPECT_EQ(subMap.count(), 0);
subMap.unsubscribe(session3, "topic2");
subMap.unsubscribe(session3, "no exist");