Subscribe handler (#591)

Fixes #593
This commit is contained in:
cyan317
2023-04-13 14:14:11 +01:00
committed by GitHub
parent 36bb20806e
commit 0bc84fefbf
17 changed files with 1365 additions and 46 deletions

View File

@@ -66,7 +66,7 @@ getLedgerPubMessage(
boost::json::object
SubscriptionManager::subLedger(boost::asio::yield_context& yield, std::shared_ptr<WsBase> session)
{
subscribeHelper(session, ledgerSubscribers_, [this](session_ptr session) { unsubLedger(session); });
subscribeHelper(session, ledgerSubscribers_, [this](SessionPtrType session) { unsubLedger(session); });
auto ledgerRange = backend_->fetchLedgerRange();
assert(ledgerRange);
@@ -94,7 +94,7 @@ SubscriptionManager::unsubLedger(std::shared_ptr<WsBase> session)
void
SubscriptionManager::subTransactions(std::shared_ptr<WsBase> session)
{
subscribeHelper(session, txSubscribers_, [this](session_ptr session) { unsubTransactions(session); });
subscribeHelper(session, txSubscribers_, [this](SessionPtrType session) { unsubTransactions(session); });
}
void
@@ -104,15 +104,15 @@ SubscriptionManager::unsubTransactions(std::shared_ptr<WsBase> session)
}
void
SubscriptionManager::subAccount(ripple::AccountID const& account, std::shared_ptr<WsBase>& session)
SubscriptionManager::subAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> const& session)
{
subscribeHelper(session, account, accountSubscribers_, [this, account](session_ptr session) {
subscribeHelper(session, account, accountSubscribers_, [this, account](SessionPtrType session) {
unsubAccount(account, session);
});
}
void
SubscriptionManager::unsubAccount(ripple::AccountID const& account, std::shared_ptr<WsBase>& session)
SubscriptionManager::unsubAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> const& session)
{
accountSubscribers_.unsubscribe(session, account);
}
@@ -120,7 +120,8 @@ SubscriptionManager::unsubAccount(ripple::AccountID const& account, std::shared_
void
SubscriptionManager::subBook(ripple::Book const& book, std::shared_ptr<WsBase> session)
{
subscribeHelper(session, book, bookSubscribers_, [this, book](session_ptr session) { unsubBook(book, session); });
subscribeHelper(
session, book, bookSubscribers_, [this, book](SessionPtrType session) { unsubBook(book, session); });
}
void
@@ -132,7 +133,7 @@ SubscriptionManager::unsubBook(ripple::Book const& book, std::shared_ptr<WsBase>
void
SubscriptionManager::subBookChanges(std::shared_ptr<WsBase> session)
{
subscribeHelper(session, bookChangesSubscribers_, [this](session_ptr session) { unsubBookChanges(session); });
subscribeHelper(session, bookChangesSubscribers_, [this](SessionPtrType session) { unsubBookChanges(session); });
}
void
@@ -281,7 +282,7 @@ SubscriptionManager::forwardValidation(boost::json::object const& response)
void
SubscriptionManager::subProposedAccount(ripple::AccountID const& account, std::shared_ptr<WsBase> session)
{
subscribeHelper(session, account, accountProposedSubscribers_, [this, account](session_ptr session) {
subscribeHelper(session, account, accountProposedSubscribers_, [this, account](SessionPtrType session) {
unsubProposedAccount(account, session);
});
}
@@ -289,7 +290,7 @@ SubscriptionManager::subProposedAccount(ripple::AccountID const& account, std::s
void
SubscriptionManager::subManifest(std::shared_ptr<WsBase> session)
{
subscribeHelper(session, manifestSubscribers_, [this](session_ptr session) { unsubManifest(session); });
subscribeHelper(session, manifestSubscribers_, [this](SessionPtrType session) { unsubManifest(session); });
}
void
@@ -301,7 +302,7 @@ SubscriptionManager::unsubManifest(std::shared_ptr<WsBase> session)
void
SubscriptionManager::subValidation(std::shared_ptr<WsBase> session)
{
subscribeHelper(session, validationsSubscribers_, [this](session_ptr session) { unsubValidation(session); });
subscribeHelper(session, validationsSubscribers_, [this](SessionPtrType session) { unsubValidation(session); });
}
void
@@ -320,7 +321,7 @@ void
SubscriptionManager::subProposedTransactions(std::shared_ptr<WsBase> session)
{
subscribeHelper(
session, txProposedSubscribers_, [this](session_ptr session) { unsubProposedTransactions(session); });
session, txProposedSubscribers_, [this](SessionPtrType session) { unsubProposedTransactions(session); });
}
void
@@ -328,17 +329,19 @@ SubscriptionManager::unsubProposedTransactions(std::shared_ptr<WsBase> session)
{
txProposedSubscribers_.unsubscribe(session);
}
void
SubscriptionManager::subscribeHelper(std::shared_ptr<WsBase>& session, Subscription& subs, CleanupFunction&& func)
SubscriptionManager::subscribeHelper(std::shared_ptr<WsBase> const& session, Subscription& subs, CleanupFunction&& func)
{
subs.subscribe(session);
std::scoped_lock lk(cleanupMtx_);
cleanupFuncs_[session].push_back(std::move(func));
}
template <typename Key>
void
SubscriptionManager::subscribeHelper(
std::shared_ptr<WsBase>& session,
std::shared_ptr<WsBase> const& session,
Key const& k,
SubscriptionMap<Key>& subs,
CleanupFunction&& func)