From 0f7e1d55175a2e9746e07361d5c6c5f51a92c8a2 Mon Sep 17 00:00:00 2001 From: CJ Cobb <46455409+cjcobb23@users.noreply.github.com> Date: Tue, 22 Nov 2022 13:39:14 -0500 Subject: [PATCH] helper function for subscribe to ensure cleanup (#402) --- src/subscriptions/SubscriptionManager.cpp | 83 +++++++++++++++++------ src/subscriptions/SubscriptionManager.h | 17 ++++- 2 files changed, 77 insertions(+), 23 deletions(-) diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index 3856d3212..d826d4ba1 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -155,7 +155,9 @@ SubscriptionManager::subLedger( boost::asio::yield_context& yield, std::shared_ptr session) { - ledgerSubscribers_.subscribe(session); + subscribeHelper(session, ledgerSubscribers_, [this](session_ptr session) { + unsubLedger(session); + }); auto ledgerRange = backend_->fetchLedgerRange(); assert(ledgerRange); @@ -185,7 +187,9 @@ SubscriptionManager::unsubLedger(std::shared_ptr session) void SubscriptionManager::subTransactions(std::shared_ptr session) { - txSubscribers_.subscribe(session); + subscribeHelper(session, txSubscribers_, [this](session_ptr session) { + unsubTransactions(session); + }); } void @@ -199,12 +203,13 @@ SubscriptionManager::subAccount( ripple::AccountID const& account, std::shared_ptr& session) { - accountSubscribers_.subscribe(session, account); - - std::unique_lock lk(cleanupMtx_); - cleanupFuncs_[session].emplace_back([this, account](session_ptr session) { - unsubAccount(account, session); - }); + subscribeHelper( + session, + account, + accountSubscribers_, + [this, account](session_ptr session) { + unsubAccount(account, session); + }); } void @@ -220,11 +225,10 @@ SubscriptionManager::subBook( ripple::Book const& book, std::shared_ptr session) { - bookSubscribers_.subscribe(session, book); - - std::unique_lock lk(cleanupMtx_); - cleanupFuncs_[session].emplace_back( - [this, book](session_ptr session) { unsubBook(book, session); }); + subscribeHelper( + session, book, bookSubscribers_, [this, book](session_ptr session) { + unsubBook(book, session); + }); } void @@ -238,11 +242,10 @@ SubscriptionManager::unsubBook( void SubscriptionManager::subBookChanges(std::shared_ptr session) { - bookChangesSubscribers_.subscribe(session); - - std::unique_lock lk(cleanupMtx_); - cleanupFuncs_[session].emplace_back( - [this](session_ptr session) { unsubBookChanges(session); }); + subscribeHelper( + session, bookChangesSubscribers_, [this](session_ptr session) { + unsubBookChanges(session); + }); } void @@ -408,13 +411,21 @@ SubscriptionManager::subProposedAccount( ripple::AccountID const& account, std::shared_ptr session) { - accountProposedSubscribers_.subscribe(session, account); + subscribeHelper( + session, + account, + accountProposedSubscribers_, + [this, account](session_ptr session) { + unsubProposedAccount(account, session); + }); } void SubscriptionManager::subManifest(std::shared_ptr session) { - manifestSubscribers_.subscribe(session); + subscribeHelper(session, manifestSubscribers_, [this](session_ptr session) { + unsubManifest(session); + }); } void @@ -426,7 +437,10 @@ SubscriptionManager::unsubManifest(std::shared_ptr session) void SubscriptionManager::subValidation(std::shared_ptr session) { - validationsSubscribers_.subscribe(session); + subscribeHelper( + session, validationsSubscribers_, [this](session_ptr session) { + unsubValidation(session); + }); } void @@ -446,7 +460,10 @@ SubscriptionManager::unsubProposedAccount( void SubscriptionManager::subProposedTransactions(std::shared_ptr session) { - txProposedSubscribers_.subscribe(session); + subscribeHelper( + session, txProposedSubscribers_, [this](session_ptr session) { + unsubProposedTransactions(session); + }); } void @@ -454,6 +471,28 @@ SubscriptionManager::unsubProposedTransactions(std::shared_ptr session) { txProposedSubscribers_.unsubscribe(session); } +void +SubscriptionManager::subscribeHelper( + std::shared_ptr& session, + Subscription& subs, + CleanupFunction&& func) +{ + subs.subscribe(session); + std::unique_lock lk(cleanupMtx_); + cleanupFuncs_[session].push_back(std::move(func)); +} +template +void +SubscriptionManager::subscribeHelper( + std::shared_ptr& session, + Key const& k, + SubscriptionMap& subs, + CleanupFunction&& func) +{ + subs.subscribe(session, k); + std::unique_lock lk(cleanupMtx_); + cleanupFuncs_[session].push_back(std::move(func)); +} void SubscriptionManager::cleanup(std::shared_ptr session) diff --git a/src/subscriptions/SubscriptionManager.h b/src/subscriptions/SubscriptionManager.h index 01de599ba..bcc268edb 100644 --- a/src/subscriptions/SubscriptionManager.h +++ b/src/subscriptions/SubscriptionManager.h @@ -259,13 +259,28 @@ private: void sendAll(std::string const& pubMsg, std::unordered_set& subs); + using CleanupFunction = std::function; + + void + subscribeHelper( + std::shared_ptr& session, + Subscription& subs, + CleanupFunction&& func); + + template + void + subscribeHelper( + std::shared_ptr& session, + Key const& k, + SubscriptionMap& subs, + CleanupFunction&& func); + /** * This is how we chose to cleanup subscriptions that have been closed. * Each time we add a subscriber, we add the opposite lambda that * unsubscribes that subscriber when cleanup is called with the session that * closed. */ - using CleanupFunction = std::function; std::mutex cleanupMtx_; std::unordered_map> cleanupFuncs_ = {};