diff --git a/src/backend/SimpleCache.cpp b/src/backend/SimpleCache.cpp index 01c46572..b2f890d3 100644 --- a/src/backend/SimpleCache.cpp +++ b/src/backend/SimpleCache.cpp @@ -30,6 +30,7 @@ SimpleCache::update( { if (isBackground && deletes_.count(obj.key)) continue; + auto& e = map_[obj.key]; if (seq > e.seq) { @@ -45,6 +46,7 @@ SimpleCache::update( } } } + std::optional SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const { @@ -58,6 +60,7 @@ SimpleCache::getSuccessor(ripple::uint256 const& key, uint32_t seq) const return {}; return {{e->first, e->second.blob}}; } + std::optional SimpleCache::getPredecessor(ripple::uint256 const& key, uint32_t seq) const { diff --git a/src/backend/SimpleCache.h b/src/backend/SimpleCache.h index db78d7f8..21347e20 100644 --- a/src/backend/SimpleCache.h +++ b/src/backend/SimpleCache.h @@ -17,6 +17,7 @@ class SimpleCache uint32_t seq = 0; Blob blob; }; + std::map map_; mutable std::shared_mutex mtx_; uint32_t latestSeq_ = 0; diff --git a/src/rpc/handlers/ServerInfo.cpp b/src/rpc/handlers/ServerInfo.cpp index 377c4cb7..6d07588a 100644 --- a/src/rpc/handlers/ServerInfo.cpp +++ b/src/rpc/handlers/ServerInfo.cpp @@ -45,6 +45,8 @@ doServerInfo(Context const& context) info[JS(counters)] = boost::json::object{}; info[JS(counters)].as_object()[JS(rpc)] = context.counters.report(); + info[JS(counters)].as_object()["subscriptions"] = + context.subscriptions->report(); auto serverInfoRippled = context.balancer->forwardToRippled( {{"counters", "server_info"}}, context.clientIp, context.yield); diff --git a/src/rpc/handlers/Subscribe.cpp b/src/rpc/handlers/Subscribe.cpp index b01f6c49..f65eef55 100644 --- a/src/rpc/handlers/Subscribe.cpp +++ b/src/rpc/handlers/Subscribe.cpp @@ -275,11 +275,29 @@ subscribeToBooks( } } +void +unsubscribeToBooks( + std::vector const& books, + std::shared_ptr session, + SubscriptionManager& manager) +{ + for (auto const& book : books) + { + manager.unsubBook(book, session); + } +} + Result doSubscribe(Context const& context) { auto request = context.params; + if (!request.contains(JS(streams)) && !request.contains(JS(accounts)) && + !request.contains(JS(accounts_proposed)) && + !request.contains(JS(books))) + return Status{ + Error::rpcINVALID_PARAMS, "does not contain valid subscription"}; + if (request.contains(JS(streams))) { if (!request.at(JS(streams)).is_array()) @@ -315,6 +333,7 @@ doSubscribe(Context const& context) if (status) return status; } + std::vector books; boost::json::array snapshot; if (request.contains(JS(books))) @@ -355,6 +374,12 @@ doUnsubscribe(Context const& context) { auto request = context.params; + if (!request.contains(JS(streams)) && !request.contains(JS(accounts)) && + !request.contains(JS(accounts_proposed)) && + !request.contains(JS(books))) + return Status{ + Error::rpcINVALID_PARAMS, "does not contain valid subscription"}; + if (request.contains(JS(streams))) { if (!request.at(JS(streams)).is_array()) @@ -391,6 +416,22 @@ doUnsubscribe(Context const& context) return status; } + std::vector books; + if (request.contains(JS(books))) + { + auto parsed = + validateAndGetBooks(context.yield, request, context.backend); + + if (auto status = std::get_if(&parsed)) + return *status; + + auto [bks, snap] = + std::get, boost::json::array>>( + parsed); + + books = std::move(bks); + } + if (request.contains(JS(streams))) unsubscribeToStreams(request, context.session, *context.subscriptions); @@ -401,6 +442,9 @@ doUnsubscribe(Context const& context) unsubscribeToAccountsProposed( request, context.session, *context.subscriptions); + if (request.contains("books")) + unsubscribeToBooks(books, context.session, *context.subscriptions); + boost::json::object response = {{"status", "success"}}; return response; } diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index 8dc7bbaf..7cca7f5d 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -5,25 +5,24 @@ template inline void sendToSubscribers( - std::shared_ptr& message, + std::shared_ptr const& message, T& subscribers, - boost::asio::io_context::strand& strand) + std::atomic_uint64_t& counter) { - boost::asio::post(strand, [&subscribers, message]() { - for (auto it = subscribers.begin(); it != subscribers.end();) + for (auto it = subscribers.begin(); it != subscribers.end();) + { + auto& session = *it; + if (session->dead()) { - auto& session = *it; - if (session->dead()) - { - it = subscribers.erase(it); - } - else - { - session->send(message); - ++it; - } + it = subscribers.erase(it); + --counter; } - }); + else + { + session->send(message); + ++it; + } + } } template @@ -31,11 +30,13 @@ inline void addSession( std::shared_ptr session, T& subscribers, - boost::asio::io_context::strand& strand) + std::atomic_uint64_t& counter) { - boost::asio::post(strand, [&subscribers, s = std::move(session)]() { - subscribers.emplace(s); - }); + if (!subscribers.contains(session)) + { + subscribers.insert(session); + ++counter; + } } template @@ -43,29 +44,37 @@ inline void removeSession( std::shared_ptr session, T& subscribers, - boost::asio::io_context::strand& strand) + std::atomic_uint64_t& counter) { - boost::asio::post(strand, [&subscribers, s = std::move(session)]() { - subscribers.erase(s); - }); + if (subscribers.contains(session)) + { + subscribers.erase(session); + --counter; + } } void Subscription::subscribe(std::shared_ptr const& session) { - addSession(session, subscribers_, strand_); + boost::asio::post(strand_, [this, session]() { + addSession(session, subscribers_, subCount_); + }); } void Subscription::unsubscribe(std::shared_ptr const& session) { - removeSession(session, subscribers_, strand_); + boost::asio::post(strand_, [this, session]() { + removeSession(session, subscribers_, subCount_); + }); } void Subscription::publish(std::shared_ptr& message) { - sendToSubscribers(message, subscribers_, strand_); + boost::asio::post(strand_, [this, message]() { + sendToSubscribers(message, subscribers_, subCount_); + }); } template @@ -74,7 +83,9 @@ SubscriptionMap::subscribe( std::shared_ptr const& session, Key const& account) { - addSession(session, subscribers_[account], strand_); + boost::asio::post(strand_, [this, session, account]() { + addSession(session, subscribers_[account], subCount_); + }); } template @@ -83,7 +94,22 @@ SubscriptionMap::unsubscribe( std::shared_ptr const& session, Key const& account) { - removeSession(session, subscribers_[account], strand_); + boost::asio::post(strand_, [this, account, session]() { + if (!subscribers_.contains(account)) + return; + + if (!subscribers_[account].contains(session)) + return; + + --subCount_; + + subscribers_[account].erase(session); + + if (subscribers_[account].size() == 0) + { + subscribers_.erase(account); + } + }); } template @@ -92,7 +118,12 @@ SubscriptionMap::publish( std::shared_ptr& message, Key const& account) { - sendToSubscribers(message, subscribers_[account], strand_); + boost::asio::post(strand_, [this, account, message]() { + if (!subscribers_.contains(account)) + return; + + sendToSubscribers(message, subscribers_[account], subCount_); + }); } boost::json::object @@ -122,7 +153,7 @@ getLedgerPubMessage( boost::json::object SubscriptionManager::subLedger( boost::asio::yield_context& yield, - std::shared_ptr& session) + std::shared_ptr session) { ledgerSubscribers_.subscribe(session); @@ -146,19 +177,19 @@ SubscriptionManager::subLedger( } void -SubscriptionManager::unsubLedger(std::shared_ptr& session) +SubscriptionManager::unsubLedger(std::shared_ptr session) { ledgerSubscribers_.unsubscribe(session); } void -SubscriptionManager::subTransactions(std::shared_ptr& session) +SubscriptionManager::subTransactions(std::shared_ptr session) { txSubscribers_.subscribe(session); } void -SubscriptionManager::unsubTransactions(std::shared_ptr& session) +SubscriptionManager::unsubTransactions(std::shared_ptr session) { txSubscribers_.unsubscribe(session); } @@ -169,6 +200,11 @@ SubscriptionManager::subAccount( std::shared_ptr& session) { accountSubscribers_.subscribe(session, account); + + std::unique_lock lk(cleanupMtx_); + cleanupFuncs_[session].emplace_back([this, account](session_ptr session) { + unsubAccount(account, session); + }); } void @@ -182,15 +218,19 @@ SubscriptionManager::unsubAccount( void SubscriptionManager::subBook( ripple::Book const& book, - std::shared_ptr& session) + std::shared_ptr session) { bookSubscribers_.subscribe(session, book); + + std::unique_lock lk(cleanupMtx_); + cleanupFuncs_[session].emplace_back( + [this, book](session_ptr session) { unsubBook(book, session); }); } void SubscriptionManager::unsubBook( ripple::Book const& book, - std::shared_ptr& session) + std::shared_ptr session) { bookSubscribers_.unsubscribe(session, book); } @@ -335,31 +375,31 @@ SubscriptionManager::forwardValidation(boost::json::object const& response) void SubscriptionManager::subProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr session) { accountProposedSubscribers_.subscribe(session, account); } void -SubscriptionManager::subManifest(std::shared_ptr& session) +SubscriptionManager::subManifest(std::shared_ptr session) { manifestSubscribers_.subscribe(session); } void -SubscriptionManager::unsubManifest(std::shared_ptr& session) +SubscriptionManager::unsubManifest(std::shared_ptr session) { manifestSubscribers_.unsubscribe(session); } void -SubscriptionManager::subValidation(std::shared_ptr& session) +SubscriptionManager::subValidation(std::shared_ptr session) { validationsSubscribers_.subscribe(session); } void -SubscriptionManager::unsubValidation(std::shared_ptr& session) +SubscriptionManager::unsubValidation(std::shared_ptr session) { validationsSubscribers_.unsubscribe(session); } @@ -367,19 +407,34 @@ SubscriptionManager::unsubValidation(std::shared_ptr& session) void SubscriptionManager::unsubProposedAccount( ripple::AccountID const& account, - std::shared_ptr& session) + std::shared_ptr session) { accountProposedSubscribers_.unsubscribe(session, account); } void -SubscriptionManager::subProposedTransactions(std::shared_ptr& session) +SubscriptionManager::subProposedTransactions(std::shared_ptr session) { txProposedSubscribers_.subscribe(session); } void -SubscriptionManager::unsubProposedTransactions(std::shared_ptr& session) +SubscriptionManager::unsubProposedTransactions(std::shared_ptr session) { txProposedSubscribers_.unsubscribe(session); } + +void +SubscriptionManager::cleanup(std::shared_ptr session) +{ + std::unique_lock lk(cleanupMtx_); + if (!cleanupFuncs_.contains(session)) + return; + + for (auto f : cleanupFuncs_[session]) + { + f(session); + } + + cleanupFuncs_.erase(session); +} diff --git a/src/subscriptions/SubscriptionManager.h b/src/subscriptions/SubscriptionManager.h index 9d62e2aa..70bf8028 100644 --- a/src/subscriptions/SubscriptionManager.h +++ b/src/subscriptions/SubscriptionManager.h @@ -11,6 +11,7 @@ class Subscription { boost::asio::io_context::strand strand_; std::unordered_set> subscribers_ = {}; + std::atomic_uint64_t subCount_ = 0; public: Subscription() = delete; @@ -31,15 +32,23 @@ public: void publish(std::shared_ptr& message); + + std::uint64_t + count() + { + return subCount_.load(); + } }; template class SubscriptionMap { - using subscribers = std::unordered_set>; + using ptr = std::shared_ptr; + using subscribers = std::set; boost::asio::io_context::strand strand_; std::unordered_map subscribers_ = {}; + std::atomic_uint64_t subCount_ = 0; public: SubscriptionMap() = delete; @@ -60,10 +69,18 @@ public: void publish(std::shared_ptr& message, Key const& key); + + std::uint64_t + count() + { + return subCount_.load(); + } }; class SubscriptionManager { + using session_ptr = std::shared_ptr; + std::vector workers_; boost::asio::io_context ioc_; std::optional work_; @@ -133,9 +150,7 @@ public: } boost::json::object - subLedger( - boost::asio::yield_context& yield, - std::shared_ptr& session); + subLedger(boost::asio::yield_context& yield, session_ptr session); void pubLedger( @@ -145,13 +160,13 @@ public: std::uint32_t txnCount); void - unsubLedger(std::shared_ptr& session); + unsubLedger(session_ptr session); void - subTransactions(std::shared_ptr& session); + subTransactions(session_ptr session); void - unsubTransactions(std::shared_ptr& session); + unsubTransactions(session_ptr session); void pubTransaction( @@ -159,32 +174,28 @@ public: ripple::LedgerInfo const& lgrInfo); void - subAccount( - ripple::AccountID const& account, - std::shared_ptr& session); + subAccount(ripple::AccountID const& account, session_ptr& session); void - unsubAccount( - ripple::AccountID const& account, - std::shared_ptr& session); + unsubAccount(ripple::AccountID const& account, session_ptr& session); void - subBook(ripple::Book const& book, std::shared_ptr& session); + subBook(ripple::Book const& book, session_ptr session); void - unsubBook(ripple::Book const& book, std::shared_ptr& session); + unsubBook(ripple::Book const& book, session_ptr session); void - subManifest(std::shared_ptr& session); + subManifest(session_ptr session); void - unsubManifest(std::shared_ptr& session); + unsubManifest(session_ptr session); void - subValidation(std::shared_ptr& session); + subValidation(session_ptr session); void - unsubValidation(std::shared_ptr& session); + unsubValidation(session_ptr session); void forwardProposedTransaction(boost::json::object const& response); @@ -196,26 +207,51 @@ public: forwardValidation(boost::json::object const& response); void - subProposedAccount( - ripple::AccountID const& account, - std::shared_ptr& session); + subProposedAccount(ripple::AccountID const& account, session_ptr session); void - unsubProposedAccount( - ripple::AccountID const& account, - std::shared_ptr& session); + unsubProposedAccount(ripple::AccountID const& account, session_ptr session); void - subProposedTransactions(std::shared_ptr& session); + subProposedTransactions(session_ptr session); void - unsubProposedTransactions(std::shared_ptr& session); + unsubProposedTransactions(session_ptr session); + + void + cleanup(session_ptr session); + + boost::json::object + report() + { + boost::json::object counts = {}; + + counts["ledger"] = ledgerSubscribers_.count(); + counts["transactions"] = txSubscribers_.count(); + counts["transactions_proposed"] = txProposedSubscribers_.count(); + counts["manifests"] = manifestSubscribers_.count(); + counts["validations"] = validationsSubscribers_.count(); + counts["account"] = accountSubscribers_.count(); + counts["accounts_proposed"] = accountProposedSubscribers_.count(); + counts["books"] = bookSubscribers_.count(); + + return counts; + } private: void - sendAll( - std::string const& pubMsg, - std::unordered_set>& subs); + sendAll(std::string const& pubMsg, std::unordered_set& subs); + + /** + * This is how we chose to cleanup subscriptions that have been closed. + * Each time we add a subscriber, we add the opposite lambda that + * unsubscribes that subscriber when cleanup is called with the session that + * closed. + */ + using CleanupFunction = std::function; + std::mutex cleanupMtx_; + std::unordered_map> + cleanupFuncs_ = {}; }; #endif // SUBSCRIPTION_MANAGER_H diff --git a/src/webserver/HttpBase.h b/src/webserver/HttpBase.h index af6611d8..da05563c 100644 --- a/src/webserver/HttpBase.h +++ b/src/webserver/HttpBase.h @@ -228,6 +228,7 @@ public: std::move(req_), lambda_, backend_, + subscriptions_, balancer_, etl_, dosGuard_, @@ -275,6 +276,7 @@ handle_request( request>&& req, Send&& send, std::shared_ptr backend, + std::shared_ptr subscriptions, std::shared_ptr balancer, std::shared_ptr etl, DOSGuard& dosGuard, @@ -349,7 +351,15 @@ handle_request( RPC::make_error(RPC::Error::rpcNOT_READY)))); std::optional context = RPC::make_HttpContext( - yc, request, backend, nullptr, balancer, etl, *range, counters, ip); + yc, + request, + backend, + subscriptions, + balancer, + etl, + *range, + counters, + ip); if (!context) return send(httpResponse( diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 7373259c..9a6bbc6a 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -99,6 +99,9 @@ class WsSession : public WsBase, BOOST_LOG_TRIVIAL(info) << "wsFail: " << what << ": " << ec.message(); boost::beast::get_lowest_layer(derived().ws()).socket().close(ec); + + if (auto manager = subscriptions_.lock(); manager) + manager->cleanup(derived().shared_from_this()); } }