cleanup websocket sessions that are subscribed to books or accounts (#146)

This commit is contained in:
Nathan Nichols
2022-06-03 12:46:45 -05:00
committed by GitHub
parent 458fac776c
commit f16a05ae7a
8 changed files with 228 additions and 74 deletions

View File

@@ -30,6 +30,7 @@ SimpleCache::update(
{
if (isBackground && deletes_.count(obj.key))
continue;
auto& e = map_[obj.key];
if (seq > e.seq)
{
@@ -45,6 +46,7 @@ SimpleCache::update(
}
}
}
std::optional<LedgerObject>
SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
{
@@ -58,6 +60,7 @@ SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const
return {};
return {{e->first, e->second.blob}};
}
std::optional<LedgerObject>
SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const
{

View File

@@ -17,6 +17,7 @@ class SimpleCache
uint32_t seq = 0;
Blob blob;
};
std::map<ripple::uint256, CacheEntry> map_;
mutable std::shared_mutex mtx_;
uint32_t latestSeq_ = 0;

View File

@@ -45,6 +45,8 @@ doServerInfo(Context const& context)
info[JS(counters)] = boost::json::object{};
info[JS(counters)].as_object()[JS(rpc)] = context.counters.report();
info[JS(counters)].as_object()["subscriptions"] =
context.subscriptions->report();
auto serverInfoRippled = context.balancer->forwardToRippled(
{{"counters", "server_info"}}, context.clientIp, context.yield);

View File

@@ -275,11 +275,29 @@ subscribeToBooks(
}
}
void
unsubscribeToBooks(
std::vector<ripple::Book> const& books,
std::shared_ptr<WsBase> session,
SubscriptionManager& manager)
{
for (auto const& book : books)
{
manager.unsubBook(book, session);
}
}
Result
doSubscribe(Context const& context)
{
auto request = context.params;
if (!request.contains(JS(streams)) && !request.contains(JS(accounts)) &&
!request.contains(JS(accounts_proposed)) &&
!request.contains(JS(books)))
return Status{
Error::rpcINVALID_PARAMS, "does not contain valid subscription"};
if (request.contains(JS(streams)))
{
if (!request.at(JS(streams)).is_array())
@@ -315,6 +333,7 @@ doSubscribe(Context const& context)
if (status)
return status;
}
std::vector<ripple::Book> books;
boost::json::array snapshot;
if (request.contains(JS(books)))
@@ -355,6 +374,12 @@ doUnsubscribe(Context const& context)
{
auto request = context.params;
if (!request.contains(JS(streams)) && !request.contains(JS(accounts)) &&
!request.contains(JS(accounts_proposed)) &&
!request.contains(JS(books)))
return Status{
Error::rpcINVALID_PARAMS, "does not contain valid subscription"};
if (request.contains(JS(streams)))
{
if (!request.at(JS(streams)).is_array())
@@ -391,6 +416,22 @@ doUnsubscribe(Context const& context)
return status;
}
std::vector<ripple::Book> books;
if (request.contains(JS(books)))
{
auto parsed =
validateAndGetBooks(context.yield, request, context.backend);
if (auto status = std::get_if<Status>(&parsed))
return *status;
auto [bks, snap] =
std::get<std::pair<std::vector<ripple::Book>, boost::json::array>>(
parsed);
books = std::move(bks);
}
if (request.contains(JS(streams)))
unsubscribeToStreams(request, context.session, *context.subscriptions);
@@ -401,6 +442,9 @@ doUnsubscribe(Context const& context)
unsubscribeToAccountsProposed(
request, context.session, *context.subscriptions);
if (request.contains("books"))
unsubscribeToBooks(books, context.session, *context.subscriptions);
boost::json::object response = {{"status", "success"}};
return response;
}

View File

@@ -5,17 +5,17 @@
template <class T>
inline void
sendToSubscribers(
std::shared_ptr<Message>& message,
std::shared_ptr<Message> const& message,
T& subscribers,
boost::asio::io_context::strand& strand)
std::atomic_uint64_t& counter)
{
boost::asio::post(strand, [&subscribers, message]() {
for (auto it = subscribers.begin(); it != subscribers.end();)
{
auto& session = *it;
if (session->dead())
{
it = subscribers.erase(it);
--counter;
}
else
{
@@ -23,7 +23,6 @@ sendToSubscribers(
++it;
}
}
});
}
template <class T>
@@ -31,11 +30,13 @@ inline void
addSession(
std::shared_ptr<WsBase> session,
T& subscribers,
boost::asio::io_context::strand& strand)
std::atomic_uint64_t& counter)
{
boost::asio::post(strand, [&subscribers, s = std::move(session)]() {
subscribers.emplace(s);
});
if (!subscribers.contains(session))
{
subscribers.insert(session);
++counter;
}
}
template <class T>
@@ -43,29 +44,37 @@ inline void
removeSession(
std::shared_ptr<WsBase> session,
T& subscribers,
boost::asio::io_context::strand& strand)
std::atomic_uint64_t& counter)
{
boost::asio::post(strand, [&subscribers, s = std::move(session)]() {
subscribers.erase(s);
});
if (subscribers.contains(session))
{
subscribers.erase(session);
--counter;
}
}
void
Subscription::subscribe(std::shared_ptr<WsBase> const& session)
{
addSession(session, subscribers_, strand_);
boost::asio::post(strand_, [this, session]() {
addSession(session, subscribers_, subCount_);
});
}
void
Subscription::unsubscribe(std::shared_ptr<WsBase> const& session)
{
removeSession(session, subscribers_, strand_);
boost::asio::post(strand_, [this, session]() {
removeSession(session, subscribers_, subCount_);
});
}
void
Subscription::publish(std::shared_ptr<Message>& message)
{
sendToSubscribers(message, subscribers_, strand_);
boost::asio::post(strand_, [this, message]() {
sendToSubscribers(message, subscribers_, subCount_);
});
}
template <class Key>
@@ -74,7 +83,9 @@ SubscriptionMap<Key>::subscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
addSession(session, subscribers_[account], strand_);
boost::asio::post(strand_, [this, session, account]() {
addSession(session, subscribers_[account], subCount_);
});
}
template <class Key>
@@ -83,7 +94,22 @@ SubscriptionMap<Key>::unsubscribe(
std::shared_ptr<WsBase> const& session,
Key const& account)
{
removeSession(session, subscribers_[account], strand_);
boost::asio::post(strand_, [this, account, session]() {
if (!subscribers_.contains(account))
return;
if (!subscribers_[account].contains(session))
return;
--subCount_;
subscribers_[account].erase(session);
if (subscribers_[account].size() == 0)
{
subscribers_.erase(account);
}
});
}
template <class Key>
@@ -92,7 +118,12 @@ SubscriptionMap<Key>::publish(
std::shared_ptr<Message>& message,
Key const& account)
{
sendToSubscribers(message, subscribers_[account], strand_);
boost::asio::post(strand_, [this, account, message]() {
if (!subscribers_.contains(account))
return;
sendToSubscribers(message, subscribers_[account], subCount_);
});
}
boost::json::object
@@ -122,7 +153,7 @@ getLedgerPubMessage(
boost::json::object
SubscriptionManager::subLedger(
boost::asio::yield_context& yield,
std::shared_ptr<WsBase>& session)
std::shared_ptr<WsBase> session)
{
ledgerSubscribers_.subscribe(session);
@@ -146,19 +177,19 @@ SubscriptionManager::subLedger(
}
void
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubLedger(std::shared_ptr<WsBase> session)
{
ledgerSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase>& session)
SubscriptionManager::subTransactions(std::shared_ptr<WsBase> session)
{
txSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase> session)
{
txSubscribers_.unsubscribe(session);
}
@@ -169,6 +200,11 @@ SubscriptionManager::subAccount(
std::shared_ptr<WsBase>& session)
{
accountSubscribers_.subscribe(session, account);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back([this, account](session_ptr session) {
unsubAccount(account, session);
});
}
void
@@ -182,15 +218,19 @@ SubscriptionManager::unsubAccount(
void
SubscriptionManager::subBook(
ripple::Book const& book,
std::shared_ptr<WsBase>& session)
std::shared_ptr<WsBase> session)
{
bookSubscribers_.subscribe(session, book);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back(
[this, book](session_ptr session) { unsubBook(book, session); });
}
void
SubscriptionManager::unsubBook(
ripple::Book const& book,
std::shared_ptr<WsBase>& session)
std::shared_ptr<WsBase> session)
{
bookSubscribers_.unsubscribe(session, book);
}
@@ -335,31 +375,31 @@ SubscriptionManager::forwardValidation(boost::json::object const& response)
void
SubscriptionManager::subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session)
std::shared_ptr<WsBase> session)
{
accountProposedSubscribers_.subscribe(session, account);
}
void
SubscriptionManager::subManifest(std::shared_ptr<WsBase>& session)
SubscriptionManager::subManifest(std::shared_ptr<WsBase> session)
{
manifestSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubManifest(std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubManifest(std::shared_ptr<WsBase> session)
{
manifestSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subValidation(std::shared_ptr<WsBase>& session)
SubscriptionManager::subValidation(std::shared_ptr<WsBase> session)
{
validationsSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubValidation(std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubValidation(std::shared_ptr<WsBase> session)
{
validationsSubscribers_.unsubscribe(session);
}
@@ -367,19 +407,34 @@ SubscriptionManager::unsubValidation(std::shared_ptr<WsBase>& session)
void
SubscriptionManager::unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session)
std::shared_ptr<WsBase> session)
{
accountProposedSubscribers_.unsubscribe(session, account);
}
void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase>& session)
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase> session)
{
txProposedSubscribers_.subscribe(session);
}
void
SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase> session)
{
txProposedSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::cleanup(std::shared_ptr<WsBase> session)
{
std::unique_lock lk(cleanupMtx_);
if (!cleanupFuncs_.contains(session))
return;
for (auto f : cleanupFuncs_[session])
{
f(session);
}
cleanupFuncs_.erase(session);
}

View File

@@ -11,6 +11,7 @@ class Subscription
{
boost::asio::io_context::strand strand_;
std::unordered_set<std::shared_ptr<WsBase>> subscribers_ = {};
std::atomic_uint64_t subCount_ = 0;
public:
Subscription() = delete;
@@ -31,15 +32,23 @@ public:
void
publish(std::shared_ptr<Message>& message);
std::uint64_t
count()
{
return subCount_.load();
}
};
template <class Key>
class SubscriptionMap
{
using subscribers = std::unordered_set<std::shared_ptr<WsBase>>;
using ptr = std::shared_ptr<WsBase>;
using subscribers = std::set<ptr>;
boost::asio::io_context::strand strand_;
std::unordered_map<Key, subscribers> subscribers_ = {};
std::atomic_uint64_t subCount_ = 0;
public:
SubscriptionMap() = delete;
@@ -60,10 +69,18 @@ public:
void
publish(std::shared_ptr<Message>& message, Key const& key);
std::uint64_t
count()
{
return subCount_.load();
}
};
class SubscriptionManager
{
using session_ptr = std::shared_ptr<WsBase>;
std::vector<std::thread> workers_;
boost::asio::io_context ioc_;
std::optional<boost::asio::io_context::work> work_;
@@ -133,9 +150,7 @@ public:
}
boost::json::object
subLedger(
boost::asio::yield_context& yield,
std::shared_ptr<WsBase>& session);
subLedger(boost::asio::yield_context& yield, session_ptr session);
void
pubLedger(
@@ -145,13 +160,13 @@ public:
std::uint32_t txnCount);
void
unsubLedger(std::shared_ptr<WsBase>& session);
unsubLedger(session_ptr session);
void
subTransactions(std::shared_ptr<WsBase>& session);
subTransactions(session_ptr session);
void
unsubTransactions(std::shared_ptr<WsBase>& session);
unsubTransactions(session_ptr session);
void
pubTransaction(
@@ -159,32 +174,28 @@ public:
ripple::LedgerInfo const& lgrInfo);
void
subAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
subAccount(ripple::AccountID const& account, session_ptr& session);
void
unsubAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
unsubAccount(ripple::AccountID const& account, session_ptr& session);
void
subBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
subBook(ripple::Book const& book, session_ptr session);
void
unsubBook(ripple::Book const& book, std::shared_ptr<WsBase>& session);
unsubBook(ripple::Book const& book, session_ptr session);
void
subManifest(std::shared_ptr<WsBase>& session);
subManifest(session_ptr session);
void
unsubManifest(std::shared_ptr<WsBase>& session);
unsubManifest(session_ptr session);
void
subValidation(std::shared_ptr<WsBase>& session);
subValidation(session_ptr session);
void
unsubValidation(std::shared_ptr<WsBase>& session);
unsubValidation(session_ptr session);
void
forwardProposedTransaction(boost::json::object const& response);
@@ -196,26 +207,51 @@ public:
forwardValidation(boost::json::object const& response);
void
subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
subProposedAccount(ripple::AccountID const& account, session_ptr session);
void
unsubProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session);
unsubProposedAccount(ripple::AccountID const& account, session_ptr session);
void
subProposedTransactions(std::shared_ptr<WsBase>& session);
subProposedTransactions(session_ptr session);
void
unsubProposedTransactions(std::shared_ptr<WsBase>& session);
unsubProposedTransactions(session_ptr session);
void
cleanup(session_ptr session);
boost::json::object
report()
{
boost::json::object counts = {};
counts["ledger"] = ledgerSubscribers_.count();
counts["transactions"] = txSubscribers_.count();
counts["transactions_proposed"] = txProposedSubscribers_.count();
counts["manifests"] = manifestSubscribers_.count();
counts["validations"] = validationsSubscribers_.count();
counts["account"] = accountSubscribers_.count();
counts["accounts_proposed"] = accountProposedSubscribers_.count();
counts["books"] = bookSubscribers_.count();
return counts;
}
private:
void
sendAll(
std::string const& pubMsg,
std::unordered_set<std::shared_ptr<WsBase>>& subs);
sendAll(std::string const& pubMsg, std::unordered_set<session_ptr>& subs);
/**
* This is how we chose to cleanup subscriptions that have been closed.
* Each time we add a subscriber, we add the opposite lambda that
* unsubscribes that subscriber when cleanup is called with the session that
* closed.
*/
using CleanupFunction = std::function<void(session_ptr)>;
std::mutex cleanupMtx_;
std::unordered_map<session_ptr, std::vector<CleanupFunction>>
cleanupFuncs_ = {};
};
#endif // SUBSCRIPTION_MANAGER_H

View File

@@ -228,6 +228,7 @@ public:
std::move(req_),
lambda_,
backend_,
subscriptions_,
balancer_,
etl_,
dosGuard_,
@@ -275,6 +276,7 @@ handle_request(
request<Body, boost::beast::http::basic_fields<Allocator>>&& req,
Send&& send,
std::shared_ptr<BackendInterface const> backend,
std::shared_ptr<SubscriptionManager> subscriptions,
std::shared_ptr<ETLLoadBalancer> balancer,
std::shared_ptr<ReportingETL const> etl,
DOSGuard& dosGuard,
@@ -349,7 +351,15 @@ handle_request(
RPC::make_error(RPC::Error::rpcNOT_READY))));
std::optional<RPC::Context> context = RPC::make_HttpContext(
yc, request, backend, nullptr, balancer, etl, *range, counters, ip);
yc,
request,
backend,
subscriptions,
balancer,
etl,
*range,
counters,
ip);
if (!context)
return send(httpResponse(

View File

@@ -99,6 +99,9 @@ class WsSession : public WsBase,
BOOST_LOG_TRIVIAL(info)
<< "wsFail: " << what << ": " << ec.message();
boost::beast::get_lowest_layer(derived().ws()).socket().close(ec);
if (auto manager = subscriptions_.lock(); manager)
manager->cleanup(derived().shared_from_this());
}
}