mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 20:05:51 +00:00
Compare commits
3 Commits
2.0
...
release/2.
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1a7e701ec7 | ||
|
|
4f3b6e98ad | ||
|
|
27279ceb6d |
@@ -22,6 +22,7 @@ class Clio(ConanFile):
|
||||
'boost/1.82.0',
|
||||
'cassandra-cpp-driver/2.16.2',
|
||||
'fmt/10.0.0',
|
||||
'protobuf/3.21.12',
|
||||
'grpc/1.50.1',
|
||||
'openssl/1.1.1u',
|
||||
'xrpl/1.12.0',
|
||||
|
||||
@@ -35,6 +35,12 @@ Subscription::unsubscribe(SessionPtrType const& session)
|
||||
boost::asio::post(strand_, [this, session]() { removeSession(session, subscribers_, subCount_); });
|
||||
}
|
||||
|
||||
bool
|
||||
Subscription::hasSession(SessionPtrType const& session)
|
||||
{
|
||||
return subscribers_.contains(session);
|
||||
}
|
||||
|
||||
void
|
||||
Subscription::publish(std::shared_ptr<std::string> const& message)
|
||||
{
|
||||
@@ -334,6 +340,8 @@ SubscriptionManager::unsubProposedTransactions(SessionPtrType session)
|
||||
void
|
||||
SubscriptionManager::subscribeHelper(SessionPtrType const& session, Subscription& subs, CleanupFunction&& func)
|
||||
{
|
||||
if (subs.hasSession(session))
|
||||
return;
|
||||
subs.subscribe(session);
|
||||
std::scoped_lock lk(cleanupMtx_);
|
||||
cleanupFuncs_[session].push_back(std::move(func));
|
||||
@@ -347,6 +355,8 @@ SubscriptionManager::subscribeHelper(
|
||||
SubscriptionMap<Key>& subs,
|
||||
CleanupFunction&& func)
|
||||
{
|
||||
if (subs.hasSession(session, k))
|
||||
return;
|
||||
subs.subscribe(session, k);
|
||||
std::scoped_lock lk(cleanupMtx_);
|
||||
cleanupFuncs_[session].push_back(std::move(func));
|
||||
|
||||
@@ -139,6 +139,15 @@ public:
|
||||
void
|
||||
unsubscribe(SessionPtrType const& session);
|
||||
|
||||
/**
|
||||
* @brief Check if a session has been in subscribers list.
|
||||
*
|
||||
* @param session The session to check
|
||||
* @return true if the session is in the subscribers list; false otherwise
|
||||
*/
|
||||
bool
|
||||
hasSession(SessionPtrType const& session);
|
||||
|
||||
/**
|
||||
* @brief Sends the given message to all subscribers.
|
||||
*
|
||||
@@ -232,6 +241,22 @@ public:
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Check if a session has been in subscribers list.
|
||||
*
|
||||
* @param session The session to check
|
||||
* @param key The key for the subscription to check
|
||||
* @return true if the session is in the subscribers list; false otherwise
|
||||
*/
|
||||
bool
|
||||
hasSession(SessionPtrType const& session, Key const& key)
|
||||
{
|
||||
if (!subscribers_.contains(key))
|
||||
return false;
|
||||
|
||||
return subscribers_[key].contains(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Sends the given message to all subscribers.
|
||||
*
|
||||
|
||||
@@ -63,7 +63,7 @@ protected:
|
||||
if (!ec_ && ec != boost::asio::error::operation_aborted)
|
||||
{
|
||||
ec_ = ec;
|
||||
LOG(perfLog_.info()) << tag() << ": " << what << ": " << ec.message();
|
||||
LOG(perfLog_.error()) << tag() << ": " << what << ": " << ec.message();
|
||||
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
|
||||
(*handler_)(ec, derived().shared_from_this());
|
||||
}
|
||||
@@ -106,14 +106,14 @@ public:
|
||||
void
|
||||
onWrite(boost::system::error_code ec, std::size_t)
|
||||
{
|
||||
messages_.pop();
|
||||
sending_ = false;
|
||||
if (ec)
|
||||
{
|
||||
wsFail(ec, "Failed to write");
|
||||
}
|
||||
else
|
||||
{
|
||||
messages_.pop();
|
||||
sending_ = false;
|
||||
maybeSendNext();
|
||||
}
|
||||
}
|
||||
|
||||
@@ -161,7 +161,6 @@ TEST_F(SubscriptionManagerSimpleBackendTest, ReportCurrentSubscriber)
|
||||
EXPECT_EQ(reportReturn["books"], result);
|
||||
};
|
||||
checkResult(subManagerPtr->report(), 1);
|
||||
subManagerPtr->cleanup(session2);
|
||||
subManagerPtr->cleanup(session2); // clean a removed session
|
||||
std::this_thread::sleep_for(20ms);
|
||||
checkResult(subManagerPtr->report(), 0);
|
||||
|
||||
@@ -53,6 +53,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
|
||||
ctx.restart();
|
||||
ctx.run();
|
||||
EXPECT_EQ(sub.count(), 2);
|
||||
EXPECT_TRUE(sub.hasSession(session1));
|
||||
EXPECT_TRUE(sub.hasSession(session2));
|
||||
EXPECT_FALSE(sub.empty());
|
||||
sub.unsubscribe(session1);
|
||||
ctx.restart();
|
||||
@@ -67,6 +69,8 @@ TEST_F(SubscriptionTest, SubscriptionCount)
|
||||
ctx.run();
|
||||
EXPECT_EQ(sub.count(), 0);
|
||||
EXPECT_TRUE(sub.empty());
|
||||
EXPECT_FALSE(sub.hasSession(session1));
|
||||
EXPECT_FALSE(sub.hasSession(session2));
|
||||
}
|
||||
|
||||
// send interface will be called when publish called
|
||||
@@ -131,6 +135,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
|
||||
ctx.restart();
|
||||
ctx.run();
|
||||
EXPECT_EQ(subMap.count(), 3);
|
||||
EXPECT_TRUE(subMap.hasSession(session1, "topic1"));
|
||||
EXPECT_TRUE(subMap.hasSession(session2, "topic1"));
|
||||
EXPECT_TRUE(subMap.hasSession(session3, "topic2"));
|
||||
subMap.unsubscribe(session1, "topic1");
|
||||
ctx.restart();
|
||||
ctx.run();
|
||||
@@ -139,6 +146,9 @@ TEST_F(SubscriptionMapTest, SubscriptionMapCount)
|
||||
subMap.unsubscribe(session3, "topic2");
|
||||
ctx.restart();
|
||||
ctx.run();
|
||||
EXPECT_FALSE(subMap.hasSession(session1, "topic1"));
|
||||
EXPECT_FALSE(subMap.hasSession(session2, "topic1"));
|
||||
EXPECT_FALSE(subMap.hasSession(session3, "topic2"));
|
||||
EXPECT_EQ(subMap.count(), 0);
|
||||
subMap.unsubscribe(session3, "topic2");
|
||||
subMap.unsubscribe(session3, "no exist");
|
||||
|
||||
Reference in New Issue
Block a user