From 9a7631b2da5d9233e323eced32a16d936180d7cd Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 20 Jan 2013 08:56:11 -0600 Subject: [PATCH] updates endpoint concurrency features --- websocketpp/endpoint.hpp | 87 +++++++++++++++++------ websocketpp/impl/endpoint_impl.hpp | 4 +- websocketpp/transport/asio/connection.hpp | 16 ++--- 3 files changed, 76 insertions(+), 31 deletions(-) diff --git a/websocketpp/endpoint.hpp b/websocketpp/endpoint.hpp index 8e9178a69f..5e07df0618 100644 --- a/websocketpp/endpoint.hpp +++ b/websocketpp/endpoint.hpp @@ -69,7 +69,7 @@ public: typedef typename connection_type::message_handler message_handler; /// Type of message pointers that this endpoint uses typedef typename connection_type::message_ptr message_ptr; - + // TODO: organize these typedef typename connection_type::termination_handler termination_handler; @@ -89,9 +89,10 @@ public: * * The default value for this version is WebSocket++/0.3.0dev * - * @return A reference to the user agent string. + * @return The user agent string. */ - const std::string& get_user_agent() const { + std::string get_user_agent() const { + scoped_lock_type guard(m_mutex); return m_user_agent; } @@ -104,35 +105,73 @@ public: * * The default value for this version is WebSocket++/0.3.0dev * - * @return A reference to the user agent string. + * @param ua The string to set the user agent to. */ void set_user_agent(const std::string& ua) { + scoped_lock_type guard(m_mutex); m_user_agent = ua; } - bool is_server() const {return m_is_server;} + /// Returns whether or not this endpoint is a server. + /** + * @return Whether or not this endpoint is a server + */ + bool is_server() const { + return m_is_server; + } /*************************/ /* Set Handler functions */ /*************************/ - void set_open_handler(open_handler h) {m_open_handler = h;} - void set_close_handler(close_handler h) {m_close_handler = h;} - void set_fail_handler(fail_handler h) {m_fail_handler = h;} - void set_ping_handler(ping_handler h) {m_ping_handler = h;} - void set_pong_handler(pong_handler h) {m_pong_handler = h;} + void set_open_handler(open_handler h) { + scoped_lock_type guard(m_mutex); + m_open_handler = h; + } + void set_close_handler(close_handler h) { + scoped_lock_type guard(m_mutex); + m_close_handler = h; + } + void set_fail_handler(fail_handler h) { + scoped_lock_type guard(m_mutex); + m_fail_handler = h; + } + void set_ping_handler(ping_handler h) { + scoped_lock_type guard(m_mutex); + m_ping_handler = h; + } + void set_pong_handler(pong_handler h) { + scoped_lock_type guard(m_mutex); + m_pong_handler = h; + } void set_pong_timeout_handler(pong_timeout_handler h) { + scoped_lock_type guard(m_mutex); m_pong_timeout_handler = h; } - void set_interrupt_handler(interrupt_handler h) {m_interrupt_handler = h;} - void set_http_handler(http_handler h) {m_http_handler = h;} - void set_validate_handler(validate_handler h) {m_validate_handler = h;} - void set_message_handler(message_handler h) {m_message_handler = h;} + void set_interrupt_handler(interrupt_handler h) { + scoped_lock_type guard(m_mutex); + m_interrupt_handler = h; + } + void set_http_handler(http_handler h) { + scoped_lock_type guard(m_mutex); + m_http_handler = h; + } + void set_validate_handler(validate_handler h) { + scoped_lock_type guard(m_mutex); + m_validate_handler = h; + } + void set_message_handler(message_handler h) { + scoped_lock_type guard(m_mutex); + m_message_handler = h; + } /*************************************/ /* Connection pass through functions */ /*************************************/ - + + /** + * Is thread safe if transport is + */ void interrupt(connection_hdl hdl, lib::error_code & ec) { connection_ptr con = get_con_from_hdl(hdl); if (!con) { @@ -144,7 +183,10 @@ public: ec = con->interrupt(); } - + + /** + * Is thread safe if transport is + */ void interrupt(connection_hdl hdl) { lib::error_code ec; interrupt(hdl,ec); @@ -196,16 +238,19 @@ protected: /// Retrieves a connection_ptr from a connection_hdl /** + * Converting a weak pointer to shared_ptr is not thread safe because the + * pointer could be deleted at any time. + * * @param hdl The connection handle to translate * * @return the connection_ptr. May be NULL if the handle was invalid. */ connection_ptr get_con_from_hdl(connection_hdl hdl) { - return lib::static_pointer_cast(hdl.lock()); + scoped_lock_type lock(m_mutex); + connection_ptr con = lib::static_pointer_cast( + hdl.lock()); + return con; } - - // protected resources - mutex_type m_mutex; private: // dynamic settings std::string m_user_agent; @@ -228,7 +273,7 @@ private: const bool m_is_server; // endpoint state - mutex_type m_state_lock; + mutex_type m_mutex; }; } // namespace websocketpp diff --git a/websocketpp/impl/endpoint_impl.hpp b/websocketpp/impl/endpoint_impl.hpp index 0895625b42..8959cf34cc 100644 --- a/websocketpp/impl/endpoint_impl.hpp +++ b/websocketpp/impl/endpoint_impl.hpp @@ -75,6 +75,8 @@ endpoint::create_connection() { ) ); transport_type::init(con); + + scoped_lock_type lock(m_mutex); m_connections.insert(con); @@ -88,7 +90,7 @@ template void endpoint::remove_connection(connection_ptr con) { std::cout << "remove_connection. New count: " << m_connections.size()-1 << std::endl; - scoped_lock_type lock(m_state_lock); + scoped_lock_type lock(m_mutex); // unregister the termination handler con->set_termination_handler(termination_handler()); diff --git a/websocketpp/transport/asio/connection.hpp b/websocketpp/transport/asio/connection.hpp index 4fe9983262..de942b9bc0 100644 --- a/websocketpp/transport/asio/connection.hpp +++ b/websocketpp/transport/asio/connection.hpp @@ -236,16 +236,14 @@ protected: void set_handle(connection_hdl hdl) { m_connection_hdl = hdl; } - + + /// Trigger the on_interrupt handler + /** + * This needs to be thread safe + * + * Might need a strand at some point? + */ lib::error_code interrupt(inturrupt_handler handler) { - // strand post handle_inturrupt - /*m_strand->post(lib::bind( - &type::handle_inturrupt, - this, - handler - ));*/ - - // would this work? m_io_service->post(handler); return lib::error_code(); }