diff --git a/src/subscriptions/Message.h b/src/subscriptions/Message.h new file mode 100644 index 000000000..32c58b1d9 --- /dev/null +++ b/src/subscriptions/Message.h @@ -0,0 +1,40 @@ +#ifndef CLIO_SUBSCRIPTION_MESSAGE_H +#define CLIO_SUBSCRIPTION_MESSAGE_H + +#include + +// This class should only be constructed once, then it can +// be read from in parallel by many websocket senders +class Message +{ + std::string message_; + +public: + Message() = delete; + Message(std::string&& message) : message_(std::move(message)) + { + } + + Message(Message const&) = delete; + Message(Message&&) = delete; + Message& + operator=(Message const&) = delete; + Message& + operator=(Message&&) = delete; + + ~Message() = default; + + char* + data() + { + return message_.data(); + } + + std::size_t + size() + { + return message_.size(); + } +}; + +#endif // CLIO_SUBSCRIPTION_MESSAGE_H \ No newline at end of file diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index 8cc9eb729..8dc7bbaf0 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -5,7 +5,7 @@ template inline void sendToSubscribers( - std::string const& message, + std::shared_ptr& message, T& subscribers, boost::asio::io_context::strand& strand) { @@ -63,7 +63,7 @@ Subscription::unsubscribe(std::shared_ptr const& session) } void -Subscription::publish(std::string const& message) +Subscription::publish(std::shared_ptr& message) { sendToSubscribers(message, subscribers_, strand_); } @@ -88,7 +88,9 @@ SubscriptionMap::unsubscribe( template void -SubscriptionMap::publish(std::string const& message, Key const& account) +SubscriptionMap::publish( + std::shared_ptr& message, + Key const& account) { sendToSubscribers(message, subscribers_[account], strand_); } @@ -200,8 +202,10 @@ SubscriptionManager::pubLedger( std::string const& ledgerRange, std::uint32_t txnCount) { - ledgerSubscribers_.publish(boost::json::serialize( + auto message = std::make_shared(boost::json::serialize( getLedgerPubMessage(lgrInfo, fees, ledgerRange, txnCount))); + + ledgerSubscribers_.publish(message); } void @@ -250,7 +254,7 @@ SubscriptionManager::pubTransaction( } } - std::string pubMsg{boost::json::serialize(pubObj)}; + auto pubMsg = std::make_shared(boost::json::serialize(pubObj)); txSubscribers_.publish(pubMsg); auto accounts = meta->getAffectedAccounts(); @@ -304,7 +308,7 @@ void SubscriptionManager::forwardProposedTransaction( boost::json::object const& response) { - std::string pubMsg{boost::json::serialize(response)}; + auto pubMsg = std::make_shared(boost::json::serialize(response)); txProposedSubscribers_.publish(pubMsg); auto transaction = response.at("transaction").as_object(); @@ -317,15 +321,15 @@ SubscriptionManager::forwardProposedTransaction( void SubscriptionManager::forwardManifest(boost::json::object const& response) { - std::string pubMsg{boost::json::serialize(response)}; + auto pubMsg = std::make_shared(boost::json::serialize(response)); manifestSubscribers_.publish(pubMsg); } void SubscriptionManager::forwardValidation(boost::json::object const& response) { - std::string pubMsg{boost::json::serialize(response)}; - validationsSubscribers_.publish(std::move(pubMsg)); + auto pubMsg = std::make_shared(boost::json::serialize(response)); + validationsSubscribers_.publish(pubMsg); } void diff --git a/src/subscriptions/SubscriptionManager.h b/src/subscriptions/SubscriptionManager.h index 6bf872d9c..9d62e2aab 100644 --- a/src/subscriptions/SubscriptionManager.h +++ b/src/subscriptions/SubscriptionManager.h @@ -3,6 +3,7 @@ #include #include +#include class WsBase; @@ -29,7 +30,7 @@ public: unsubscribe(std::shared_ptr const& session); void - publish(std::string const& message); + publish(std::shared_ptr& message); }; template @@ -58,7 +59,7 @@ public: unsubscribe(std::shared_ptr const& session, Key const& key); void - publish(std::string const& message, Key const& key); + publish(std::shared_ptr& message, Key const& key); }; class SubscriptionManager diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 94df204bb..7373259c6 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -11,6 +11,7 @@ #include #include #include +#include #include #include @@ -49,7 +50,7 @@ protected: public: // Send, that enables SubscriptionManager to publish to clients virtual void - send(std::string const& msg) = 0; + send(std::shared_ptr msg) = 0; virtual ~WsBase() { @@ -87,7 +88,7 @@ class WsSession : public WsBase, std::mutex mtx_; bool sending_ = false; - std::queue messages_; + std::queue> messages_; void wsFail(boost::beast::error_code ec, char const* what) @@ -138,7 +139,7 @@ public: { sending_ = true; derived().ws().async_write( - net::buffer(messages_.front()), + net::buffer(messages_.front()->data(), messages_.front()->size()), boost::beast::bind_front_handler( &WsSession::on_write, derived().shared_from_this())); } @@ -168,18 +169,25 @@ public: } void - send(std::string const& msg) override + send(std::shared_ptr msg) override { net::dispatch( derived().ws().get_executor(), [this, self = derived().shared_from_this(), - msg = std::string(msg)]() { + msg = std::move(msg)]() { messages_.push(std::move(msg)); maybe_send_next(); }); } + void + send(std::string&& msg) + { + auto sharedMsg = std::make_shared(std::move(msg)); + send(sharedMsg); + } + void run(http::request req) {