Add Unittests for subscription module (#488)

Fix #492
This commit is contained in:
cyan317
2023-02-03 09:07:02 +00:00
committed by GitHub
parent 1186622e58
commit 19455b4d6c
11 changed files with 1858 additions and 107 deletions

View File

@@ -21,57 +21,6 @@
#include <subscriptions/SubscriptionManager.h>
#include <webserver/WsBase.h>
template <class T>
inline void
sendToSubscribers(
std::shared_ptr<Message> const& message,
T& subscribers,
std::atomic_uint64_t& counter)
{
for (auto it = subscribers.begin(); it != subscribers.end();)
{
auto& session = *it;
if (session->dead())
{
it = subscribers.erase(it);
--counter;
}
else
{
session->send(message);
++it;
}
}
}
template <class T>
inline void
addSession(
std::shared_ptr<WsBase> session,
T& subscribers,
std::atomic_uint64_t& counter)
{
if (!subscribers.contains(session))
{
subscribers.insert(session);
++counter;
}
}
template <class T>
inline void
removeSession(
std::shared_ptr<WsBase> session,
T& subscribers,
std::atomic_uint64_t& counter)
{
if (subscribers.contains(session))
{
subscribers.erase(session);
--counter;
}
}
void
Subscription::subscribe(std::shared_ptr<WsBase> const& session)
{
@@ -96,55 +45,6 @@ Subscription::publish(std::shared_ptr<Message> const& message)
});
}
template <class Key>
void
SubscriptionMap<Key>::subscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
boost::asio::post(strand_, [this, session, account]() {
addSession(session, subscribers_[account], subCount_);
});
}
template <class Key>
void
SubscriptionMap<Key>::unsubscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
boost::asio::post(strand_, [this, account, session]() {
if (!subscribers_.contains(account))
return;
if (!subscribers_[account].contains(session))
return;
--subCount_;
subscribers_[account].erase(session);
if (subscribers_[account].size() == 0)
{
subscribers_.erase(account);
}
});
}
template <class Key>
void
SubscriptionMap<Key>::publish(
std::shared_ptr<Message>& message,
Key const& account)
{
boost::asio::post(strand_, [this, account, message]() {
if (!subscribers_.contains(account))
return;
sendToSubscribers(message, subscribers_[account], subCount_);
});
}
boost::json::object
getLedgerPubMessage(
ripple::LedgerInfo const& lgrInfo,
@@ -345,8 +245,6 @@ SubscriptionManager::pubTransaction(
for (auto const& node : meta->getNodes())
{
if (!node.isFieldPresent(ripple::sfLedgerEntryType))
assert(false);
if (node.getFieldU16(ripple::sfLedgerEntryType) == ripple::ltOFFER)
{
ripple::SField const* field = nullptr;
@@ -388,9 +286,6 @@ SubscriptionManager::pubBookChanges(
ripple::LedgerInfo const& lgrInfo,
std::vector<Backend::TransactionAndMetadata> const& transactions)
{
if (bookChangesSubscribers_.empty())
return;
auto const json = RPC::computeBookChanges(lgrInfo, transactions);
auto const bookChangesMsg =
std::make_shared<Message>(boost::json::serialize(json));

View File

@@ -95,7 +95,7 @@ public:
unsubscribe(std::shared_ptr<WsBase> const& session, Key const& key);
void
publish(std::shared_ptr<Message>& message, Key const& key);
publish(std::shared_ptr<Message> const& message, Key const& key);
std::uint64_t
count()
@@ -104,6 +104,106 @@ public:
}
};
template <class T>
inline void
sendToSubscribers(
std::shared_ptr<Message> const& message,
T& subscribers,
std::atomic_uint64_t& counter)
{
for (auto it = subscribers.begin(); it != subscribers.end();)
{
auto& session = *it;
if (session->dead())
{
it = subscribers.erase(it);
--counter;
}
else
{
session->send(message);
++it;
}
}
}
template <class T>
inline void
addSession(
std::shared_ptr<WsBase> session,
T& subscribers,
std::atomic_uint64_t& counter)
{
if (!subscribers.contains(session))
{
subscribers.insert(session);
++counter;
}
}
template <class T>
inline void
removeSession(
std::shared_ptr<WsBase> session,
T& subscribers,
std::atomic_uint64_t& counter)
{
if (subscribers.contains(session))
{
subscribers.erase(session);
--counter;
}
}
template <class Key>
void
SubscriptionMap<Key>::subscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
boost::asio::post(strand_, [this, session, account]() {
addSession(session, subscribers_[account], subCount_);
});
}
template <class Key>
void
SubscriptionMap<Key>::unsubscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
boost::asio::post(strand_, [this, account, session]() {
if (!subscribers_.contains(account))
return;
if (!subscribers_[account].contains(session))
return;
--subCount_;
subscribers_[account].erase(session);
if (subscribers_[account].size() == 0)
{
subscribers_.erase(account);
}
});
}
template <class Key>
void
SubscriptionMap<Key>::publish(
std::shared_ptr<Message> const& message,
Key const& account)
{
boost::asio::post(strand_, [this, account, message]() {
if (!subscribers_.contains(account))
return;
sendToSubscribers(message, subscribers_[account], subCount_);
});
}
class SubscriptionManager
{
using session_ptr = std::shared_ptr<WsBase>;