mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-20 19:56:00 +00:00
@@ -35,6 +35,12 @@ Subscription::unsubscribe(SessionPtrType const& session)
|
|||||||
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
|
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool
|
||||||
|
Subscription::hasSession(SessionPtrType const& session)
|
||||||
|
{
|
||||||
|
return subscribers_.contains(session);
|
||||||
|
}
|
||||||
|
|
||||||
void
|
void
|
||||||
Subscription::publish(std::shared_ptr<std::string> const& message)
|
Subscription::publish(std::shared_ptr<std::string> const& message)
|
||||||
{
|
{
|
||||||
@@ -341,6 +347,8 @@ SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
|
|||||||
void
|
void
|
||||||
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
|
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
|
||||||
{
|
{
|
||||||
|
if (subs.hasSession(session))
|
||||||
|
return;
|
||||||
subs.subscribe(session);
|
subs.subscribe(session);
|
||||||
std::scoped_lock const lk(cleanupMtx_);
|
std::scoped_lock const lk(cleanupMtx_);
|
||||||
cleanupFuncs_[session].push_back(std::move(func));
|
cleanupFuncs_[session].push_back(std::move(func));
|
||||||
@@ -354,6 +362,8 @@ SubscriptionManager::subscribeHelper(
|
|||||||
SubscriptionMap<Key>& subs,
|
SubscriptionMap<Key>& subs,
|
||||||
CleanupFunction&& func)
|
CleanupFunction&& func)
|
||||||
{
|
{
|
||||||
|
if (subs.hasSession(session, k))
|
||||||
|
return;
|
||||||
subs.subscribe(session, k);
|
subs.subscribe(session, k);
|
||||||
std::scoped_lock const lk(cleanupMtx_);
|
std::scoped_lock const lk(cleanupMtx_);
|
||||||
cleanupFuncs_[session].push_back(std::move(func));
|
cleanupFuncs_[session].push_back(std::move(func));
|
||||||
|
|||||||
@@ -139,6 +139,15 @@ public:
|
|||||||
void
|
void
|
||||||
unsubscribe(SessionPtrType const& session);
|
unsubscribe(SessionPtrType const& session);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check if a session has been in subscribers list.
|
||||||
|
*
|
||||||
|
* @param session The session to check
|
||||||
|
* @return true if the session is in the subscribers list; false otherwise
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
hasSession(SessionPtrType const& session);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Sends the given message to all subscribers.
|
* @brief Sends the given message to all subscribers.
|
||||||
*
|
*
|
||||||
@@ -232,6 +241,22 @@ public:
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Check if a session has been in subscribers list.
|
||||||
|
*
|
||||||
|
* @param session The session to check
|
||||||
|
* @param key The key for the subscription to check
|
||||||
|
* @return true if the session is in the subscribers list; false otherwise
|
||||||
|
*/
|
||||||
|
bool
|
||||||
|
hasSession(SessionPtrType const& session, Key const& key)
|
||||||
|
{
|
||||||
|
if (!subscribers_.contains(key))
|
||||||
|
return false;
|
||||||
|
|
||||||
|
return subscribers_[key].contains(session);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Sends the given message to all subscribers.
|
* @brief Sends the given message to all subscribers.
|
||||||
*
|
*
|
||||||
|
|||||||
@@ -162,7 +162,6 @@ TEST_F(SubscriptionManagerSimpleBackendTest, ReportCurrentSubscriber)
|
|||||||
EXPECT_EQ(reportReturn["books"], result);
|
EXPECT_EQ(reportReturn["books"], result);
|
||||||
};
|
};
|
||||||
checkResult(subManagerPtr->report(), 1);
|
checkResult(subManagerPtr->report(), 1);
|
||||||
subManagerPtr->cleanup(session2);
|
|
||||||
subManagerPtr->cleanup(session2); // clean a removed session
|
subManagerPtr->cleanup(session2); // clean a removed session
|
||||||
std::this_thread::sleep_for(20ms);
|
std::this_thread::sleep_for(20ms);
|
||||||
checkResult(subManagerPtr->report(), 0);
|
checkResult(subManagerPtr->report(), 0);
|
||||||
|
|||||||
@@ -53,6 +53,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
|
|||||||
ctx.restart();
|
ctx.restart();
|
||||||
ctx.run();
|
ctx.run();
|
||||||
EXPECT_EQ(sub.count(), 2);
|
EXPECT_EQ(sub.count(), 2);
|
||||||
|
EXPECT_TRUE(sub.hasSession(session1));
|
||||||
|
EXPECT_TRUE(sub.hasSession(session2));
|
||||||
EXPECT_FALSE(sub.empty());
|
EXPECT_FALSE(sub.empty());
|
||||||
sub.unsubscribe(session1);
|
sub.unsubscribe(session1);
|
||||||
ctx.restart();
|
ctx.restart();
|
||||||
@@ -67,6 +69,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
|
|||||||
ctx.run();
|
ctx.run();
|
||||||
EXPECT_EQ(sub.count(), 0);
|
EXPECT_EQ(sub.count(), 0);
|
||||||
EXPECT_TRUE(sub.empty());
|
EXPECT_TRUE(sub.empty());
|
||||||
|
EXPECT_FALSE(sub.hasSession(session1));
|
||||||
|
EXPECT_FALSE(sub.hasSession(session2));
|
||||||
}
|
}
|
||||||
|
|
||||||
// send interface will be called when publish called
|
// send interface will be called when publish called
|
||||||
@@ -133,6 +137,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
|
|||||||
ctx.restart();
|
ctx.restart();
|
||||||
ctx.run();
|
ctx.run();
|
||||||
EXPECT_EQ(subMap.count(), 3);
|
EXPECT_EQ(subMap.count(), 3);
|
||||||
|
EXPECT_TRUE(subMap.hasSession(session1, "topic1"));
|
||||||
|
EXPECT_TRUE(subMap.hasSession(session2, "topic1"));
|
||||||
|
EXPECT_TRUE(subMap.hasSession(session3, "topic2"));
|
||||||
subMap.unsubscribe(session1, "topic1");
|
subMap.unsubscribe(session1, "topic1");
|
||||||
ctx.restart();
|
ctx.restart();
|
||||||
ctx.run();
|
ctx.run();
|
||||||
@@ -141,6 +148,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
|
|||||||
subMap.unsubscribe(session3, "topic2");
|
subMap.unsubscribe(session3, "topic2");
|
||||||
ctx.restart();
|
ctx.restart();
|
||||||
ctx.run();
|
ctx.run();
|
||||||
|
EXPECT_FALSE(subMap.hasSession(session1, "topic1"));
|
||||||
|
EXPECT_FALSE(subMap.hasSession(session2, "topic1"));
|
||||||
|
EXPECT_FALSE(subMap.hasSession(session3, "topic2"));
|
||||||
EXPECT_EQ(subMap.count(), 0);
|
EXPECT_EQ(subMap.count(), 0);
|
||||||
subMap.unsubscribe(session3, "topic2");
|
subMap.unsubscribe(session3, "topic2");
|
||||||
subMap.unsubscribe(session3, "no exist");
|
subMap.unsubscribe(session3, "no exist");
|
||||||
|
|||||||
Reference in New Issue
Block a user