Compare commits

...

2 Commits

Author SHA1 Message Date
cyan317
4f3b6e98ad Subscribe cleanup (#940)
Fix #939
2023-12-19 15:52:12 +00:00
cyan317
27279ceb6d Fix messages pile up (#921)
Fix #924
2023-12-19 15:51:39 +00:00
5 changed files with 48 additions and 4 deletions

View File

@@ -35,6 +35,12 @@ Subscription::unsubscribe(SessionPtrType const& session)
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); }); boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
} }
bool
Subscription::hasSession(SessionPtrType const& session)
{
return subscribers_.contains(session);
}
void void
Subscription::publish(std::shared_ptr<std::string> const& message) Subscription::publish(std::shared_ptr<std::string> const& message)
{ {
@@ -334,6 +340,8 @@ SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
void void
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func) SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
{ {
if (subs.hasSession(session))
return;
subs.subscribe(session); subs.subscribe(session);
std::scoped_lock lk(cleanupMtx_); std::scoped_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func)); cleanupFuncs_[session].push_back(std::move(func));
@@ -347,6 +355,8 @@ SubscriptionManager::subscribeHelper(
SubscriptionMap<Key>& subs, SubscriptionMap<Key>& subs,
CleanupFunction&& func) CleanupFunction&& func)
{ {
if (subs.hasSession(session, k))
return;
subs.subscribe(session, k); subs.subscribe(session, k);
std::scoped_lock lk(cleanupMtx_); std::scoped_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func)); cleanupFuncs_[session].push_back(std::move(func));

View File

@@ -139,6 +139,15 @@ public:
void void
unsubscribe(SessionPtrType const& session); unsubscribe(SessionPtrType const& session);
/**
* @brief Check if a session has been in subscribers list.
*
* @param session The session to check
* @return true if the session is in the subscribers list; false otherwise
*/
bool
hasSession(SessionPtrType const& session);
/** /**
* @brief Sends the given message to all subscribers. * @brief Sends the given message to all subscribers.
* *
@@ -232,6 +241,22 @@ public:
}); });
} }
/**
* @brief Check if a session has been in subscribers list.
*
* @param session The session to check
* @param key The key for the subscription to check
* @return true if the session is in the subscribers list; false otherwise
*/
bool
hasSession(SessionPtrType const& session, Key const& key)
{
if (!subscribers_.contains(key))
return false;
return subscribers_[key].contains(session);
}
/** /**
* @brief Sends the given message to all subscribers. * @brief Sends the given message to all subscribers.
* *

View File

@@ -63,7 +63,7 @@ protected:
if (!ec_ && ec != boost::asio::error::operation_aborted) if (!ec_ && ec != boost::asio::error::operation_aborted)
{ {
ec_ = ec; ec_ = ec;
LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message(); LOG(perfLog_.error()) << tag() << ": " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec); boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
(*handler_)(ec, derived().shared_from_this()); (*handler_)(ec, derived().shared_from_this());
} }
@@ -106,14 +106,14 @@ public:
void void
onWrite(boost::system::error_code ec, std::size_t) onWrite(boost::system::error_code ec, std::size_t)
{ {
messages_.pop();
sending_ = false;
if (ec) if (ec)
{ {
wsFail(ec, "Failed to write"); wsFail(ec, "Failed to write");
} }
else else
{ {
messages_.pop();
sending_ = false;
maybeSendNext(); maybeSendNext();
} }
} }

View File

@@ -161,7 +161,6 @@ TEST_F(SubscriptionManagerSimpleBackendTest, ReportCurrentSubscriber)
EXPECT_EQ(reportReturn["books"], result); EXPECT_EQ(reportReturn["books"], result);
}; };
checkResult(subManagerPtr->report(), 1); checkResult(subManagerPtr->report(), 1);
subManagerPtr->cleanup(session2);
subManagerPtr->cleanup(session2); // clean a removed session subManagerPtr->cleanup(session2); // clean a removed session
std::this_thread::sleep_for(20ms); std::this_thread::sleep_for(20ms);
checkResult(subManagerPtr->report(), 0); checkResult(subManagerPtr->report(), 0);

View File

@@ -53,6 +53,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
ctx.restart(); ctx.restart();
ctx.run(); ctx.run();
EXPECT_EQ(sub.count(), 2); EXPECT_EQ(sub.count(), 2);
EXPECT_TRUE(sub.hasSession(session1));
EXPECT_TRUE(sub.hasSession(session2));
EXPECT_FALSE(sub.empty()); EXPECT_FALSE(sub.empty());
sub.unsubscribe(session1); sub.unsubscribe(session1);
ctx.restart(); ctx.restart();
@@ -67,6 +69,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
ctx.run(); ctx.run();
EXPECT_EQ(sub.count(), 0); EXPECT_EQ(sub.count(), 0);
EXPECT_TRUE(sub.empty()); EXPECT_TRUE(sub.empty());
EXPECT_FALSE(sub.hasSession(session1));
EXPECT_FALSE(sub.hasSession(session2));
} }
// send interface will be called when publish called // send interface will be called when publish called
@@ -131,6 +135,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
ctx.restart(); ctx.restart();
ctx.run(); ctx.run();
EXPECT_EQ(subMap.count(), 3); EXPECT_EQ(subMap.count(), 3);
EXPECT_TRUE(subMap.hasSession(session1, "topic1"));
EXPECT_TRUE(subMap.hasSession(session2, "topic1"));
EXPECT_TRUE(subMap.hasSession(session3, "topic2"));
subMap.unsubscribe(session1, "topic1"); subMap.unsubscribe(session1, "topic1");
ctx.restart(); ctx.restart();
ctx.run(); ctx.run();
@@ -139,6 +146,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
subMap.unsubscribe(session3, "topic2"); subMap.unsubscribe(session3, "topic2");
ctx.restart(); ctx.restart();
ctx.run(); ctx.run();
EXPECT_FALSE(subMap.hasSession(session1, "topic1"));
EXPECT_FALSE(subMap.hasSession(session2, "topic1"));
EXPECT_FALSE(subMap.hasSession(session3, "topic2"));
EXPECT_EQ(subMap.count(), 0); EXPECT_EQ(subMap.count(), 0);
subMap.unsubscribe(session3, "topic2"); subMap.unsubscribe(session3, "topic2");
subMap.unsubscribe(session3, "no exist"); subMap.unsubscribe(session3, "no exist");