helper function for subscribe to ensure cleanup (#402)

This commit is contained in:
CJ Cobb
2022-11-22 13:39:14 -05:00
committed by GitHub
parent cf7a6ecc89
commit 0f7e1d5517
2 changed files with 77 additions and 23 deletions

View File

@@ -155,7 +155,9 @@ SubscriptionManager::subLedger(
boost::asio::yield_context& yield,
std::shared_ptr<WsBase> session)
{
ledgerSubscribers_.subscribe(session);
subscribeHelper(session, ledgerSubscribers_, [this](session_ptr session) {
unsubLedger(session);
});
auto ledgerRange = backend_->fetchLedgerRange();
assert(ledgerRange);
@@ -185,7 +187,9 @@ SubscriptionManager::unsubLedger(std::shared_ptr<WsBase> session)
void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase> session)
{
txSubscribers_.subscribe(session);
subscribeHelper(session, txSubscribers_, [this](session_ptr session) {
unsubTransactions(session);
});
}
void
@@ -199,12 +203,13 @@ SubscriptionManager::subAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase>& session)
{
accountSubscribers_.subscribe(session, account);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back([this, account](session_ptr session) {
unsubAccount(account, session);
});
subscribeHelper(
session,
account,
accountSubscribers_,
[this, account](session_ptr session) {
unsubAccount(account, session);
});
}
void
@@ -220,11 +225,10 @@ SubscriptionManager::subBook(
ripple::Book const& book,
std::shared_ptr<WsBase> session)
{
bookSubscribers_.subscribe(session, book);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back(
[this, book](session_ptr session) { unsubBook(book, session); });
subscribeHelper(
session, book, bookSubscribers_, [this, book](session_ptr session) {
unsubBook(book, session);
});
}
void
@@ -238,11 +242,10 @@ SubscriptionManager::unsubBook(
void
SubscriptionManager::subBookChanges(std::shared_ptr<WsBase> session)
{
bookChangesSubscribers_.subscribe(session);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].emplace_back(
[this](session_ptr session) { unsubBookChanges(session); });
subscribeHelper(
session, bookChangesSubscribers_, [this](session_ptr session) {
unsubBookChanges(session);
});
}
void
@@ -408,13 +411,21 @@ SubscriptionManager::subProposedAccount(
ripple::AccountID const& account,
std::shared_ptr<WsBase> session)
{
accountProposedSubscribers_.subscribe(session, account);
subscribeHelper(
session,
account,
accountProposedSubscribers_,
[this, account](session_ptr session) {
unsubProposedAccount(account, session);
});
}
void
SubscriptionManager::subManifest(std::shared_ptr<WsBase> session)
{
manifestSubscribers_.subscribe(session);
subscribeHelper(session, manifestSubscribers_, [this](session_ptr session) {
unsubManifest(session);
});
}
void
@@ -426,7 +437,10 @@ SubscriptionManager::unsubManifest(std::shared_ptr<WsBase> session)
void
SubscriptionManager::subValidation(std::shared_ptr<WsBase> session)
{
validationsSubscribers_.subscribe(session);
subscribeHelper(
session, validationsSubscribers_, [this](session_ptr session) {
unsubValidation(session);
});
}
void
@@ -446,7 +460,10 @@ SubscriptionManager::unsubProposedAccount(
void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase> session)
{
txProposedSubscribers_.subscribe(session);
subscribeHelper(
session, txProposedSubscribers_, [this](session_ptr session) {
unsubProposedTransactions(session);
});
}
void
@@ -454,6 +471,28 @@ SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase> session)
{
txProposedSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subscribeHelper(
std::shared_ptr<WsBase>& session,
Subscription& subs,
CleanupFunction&& func)
{
subs.subscribe(session);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
}
template <typename Key>
void
SubscriptionManager::subscribeHelper(
std::shared_ptr<WsBase>& session,
Key const& k,
SubscriptionMap<Key>& subs,
CleanupFunction&& func)
{
subs.subscribe(session, k);
std::unique_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
}
void
SubscriptionManager::cleanup(std::shared_ptr<WsBase> session)