From f32ddff3673e81d05a85309f7d9e32634c24c537 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 12 Mar 2012 10:03:54 -0500 Subject: [PATCH] concurrency review part 1 --- src/common.hpp | 3 +- src/connection.hpp | 310 +++++++++++++++++++++++++++++++------------ src/endpoint.hpp | 8 +- src/roles/client.hpp | 8 +- src/roles/server.hpp | 1 + 5 files changed, 239 insertions(+), 91 deletions(-) diff --git a/src/common.hpp b/src/common.hpp index f7e60348dd..2ec6d525bb 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -71,8 +71,7 @@ namespace websocketpp { CONNECTING = 0, OPEN = 1, CLOSING = 2, - CLOSED = 3, - DETACHED = 4, + CLOSED = 3 }; } } diff --git a/src/connection.hpp b/src/connection.hpp index 90658c0601..7e653d1409 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -63,7 +63,8 @@ template < class connection : public role< connection >, public socket< connection >, - public boost::enable_shared_from_this< connection > + public boost::enable_shared_from_this< connection >, + boost::noncopyable { public: typedef connection_traits< connection > traits; @@ -110,46 +111,84 @@ public: m_write_buffer(0), m_write_state(IDLE), m_remote_close_code(close::status::ABNORMAL_CLOSE), - m_read_state(READING) + m_read_state(READING), + m_strand(e.endpoint_base::m_io_service), + m_detached(false) { socket_type::init(); + // This should go away m_control_message = message::control_ptr(new message::control()); } - // SHOULD BE PROTECTED + /// Destroy the connection + connection() { + if (m_state != session::state::CLOSED) { + terminate(true); + } + } + + // copy/assignment constructors require C++11 + // boost::noncopyable is being used in the meantime. + // connection(connection const&) = delete; + // connection& operator=(connection const&) = delete + + /// Start the websocket connection async read loop + /* Begins the connection's async read loop. First any socket level + * initialization will happen (TLS handshake, etc) then the handshake and + * frame reads will start. + * + * Visibility: protected + * State: Should only be called once by the endpoint. + * Concurrency: safe as long as state is valid + */ void start() { // initialize the socket. socket_type::async_init( - boost::bind( + m_strand.wrap(boost::bind( &type::handle_socket_init, type::shared_from_this(), boost::asio::placeholders::error - ) + )) ); } - // END PROTECTED - // Valid always + /// Return current connection state + /* Visibility: public + * State: valid always + * Concurrency: callable from anywhere + */ session::state::value get_state() const { + // TODO: ensure read of m_state is atomic return m_state; } - /// Signals to the connection that its endpoint is going away. - /* The connection should not use any references to the endpoint after this + /// Detaches the connection from its endpoint + /* Called by the m_endpoint's destructor. In state DETACHED m_endpoint is + * no longer avaliable. The connection may stick around if the end user + * application needs to read state from it (ie close reasons, etc) but no + * operations requring the endpoint can be performed. + * + * Visibility: protected + * State: Should only be called once by the endpoint. + * Concurrency: safe as long as state is valid */ void detach() { - // TODO: lock connection state - m_state = session::state::DETACHED; + // TODO: ensure write of m_detached is atomic + m_detached = true; } - // Valid for OPEN state - /// convenience overload for sending a one off text message. + /// convenience overload for sending a one off message. + /* Creates a message, fills in payload, and queues a write as a message of + * type op. Default type is TEXT. + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + */ void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT) { - // TODO: do we need a lock? - if (m_state != session::state::OPEN) { - return; - } + // TODO: ensure read of m_state is atomic + if (m_state != session::state::OPEN) {return;} websocketpp::message::data::ptr msg = get_control_message2(); @@ -164,42 +203,138 @@ public: msg->set_payload(payload); send(msg); } + + /// Send message + /* Prepares (if necessary) and sends the given message + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + */ void send(message::data_ptr msg) { - // TODO: do we need a lock? - if (m_state != session::state::OPEN) { - return; - } + // TODO: need to make sure access to m_state is atomic + if (m_state != session::state::OPEN) {return;} m_processor->prepare_frame(msg); - write_message(msg); + + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + msg + )) + ); } - // TODO: overloads without code or reason? + /// Close connection + /* Closes the websocket connection with the given status code and reason. + * From state OPEN a clean connection close is initiated. From any other + * state the socket will be closed and the connection cleaned up. + * + * There is no feedback directly from close. Feedback will be provided via + * the on_fail or on_close callbacks. + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + */ void close(close::status::value code, const std::string& reason = "") { + // TODO: need to make sure access to m_state is atomic if (m_state == session::state::CONNECTING) { - terminate(true); + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::terminate, + type::shared_from_this(), + true + )) + ); + } else if (m_state == session::state::OPEN) { + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::send_close, + type::shared_from_this(), + code, + reason + )) + ); } else { - send_close(code, reason); + // in CLOSING state we are already closing, nothing to do + // in CLOSED state we are already closed, nothing to do } } - void ping(const std::string& payload) { - // TODO: do we need a lock? - if (m_state != session::state::OPEN) { - return; - } - - send_ping(payload); - } - void pong(const std::string& payload) { - // TODO: do we need a lock? - if (m_state != session::state::OPEN) { - return; - } - - send_pong(payload); - } + /// Send Ping + /* Initiates a ping with the given payload. + * + * There is no feedback directly from ping. Feedback will be provided via + * the on_pong or on_pong_timeout callbacks. + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + */ + void ping(const std::string& payload) { + // TODO: need to make sure access to m_state is atomic + if (m_state != session::state::OPEN) {return;} + if (m_detached) {return;} + + // TODO: optimize control messages and handle case where + // endpoint is out of messages + message::data_ptr control = get_control_message2(); + control->reset(frame::opcode::PING); + control->set_payload(payload); + m_processor->prepare_frame(control); + + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + control + )) + ); + } + + /// Send Pong + /* Initiates a pong with the given payload. + * + * There is no feedback from pong. + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + */ + void pong(const std::string& payload) { + // TODO: need to make sure access to m_state is atomic + if (m_state != session::state::OPEN) {return;} + if (m_detached) {return;} + + // TODO: optimize control messages and handle case where + // endpoint is out of messages + message::data_ptr control = get_control_message2(); + control->reset(frame::opcode::PONG); + control->set_payload(payload); + m_processor->prepare_frame(control); + + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + control + )) + ); + } + + /// Return send buffer size (payload bytes) + /* Initiates a pong with the given payload. + * + * There is no feedback from pong. + * + * Visibility: public + * State: Valid from any state. + * Concurrency: callable from any thread + */ uint64_t buffered_amount() const { + // TODO: ensure m_write_buffer is accessed atomically return m_write_buffer; } @@ -398,7 +533,7 @@ public: response = get_handler()->on_ping(type::shared_from_this(), msg->get_payload()); if (response) { - send_pong(msg->get_payload()); + pong(msg->get_payload()); } break; case frame::opcode::PONG: @@ -436,7 +571,17 @@ public: } } + /// Send a close frame + /* Initiates a close handshake by sending a close frame with the given code + * and reason. + * + * Visibility: protected + * State: Valid for OPEN, ignored otherwise. + * Concurrency: Must be called within m_strand + */ void send_close(close::status::value code, const std::string& reason) { + if (m_detached) {return;} + if (m_state != session::state::OPEN) { m_endpoint.elog().at(log::elevel::WARN) << "Tried to disconnect a session that wasn't open" @@ -462,11 +607,11 @@ public: m_timer.expires_from_now(boost::posix_time::milliseconds(1000)); m_timer.async_wait( - boost::bind( + m_strand.wrap(boost::bind( &type::fail_on_expire, type::shared_from_this(), boost::asio::placeholders::error - ) + )) ); m_local_close_code = code; @@ -544,26 +689,6 @@ public: m_write_state = INTURRUPT; } - void send_ping(const std::string& payload) { - // TODO: optimize control messages and handle case where - // endpoint is out of messages - message::data_ptr control = get_control_message2(); - control->reset(frame::opcode::PING); - control->set_payload(payload); - m_processor->prepare_frame(control); - write_message(control); - } - - void send_pong(const std::string& payload) { - // TODO: optimize control messages and handle case where - // endpoint is out of messages - message::data_ptr control = get_control_message2(); - control->reset(frame::opcode::PONG); - control->set_payload(payload); - m_processor->prepare_frame(control); - write_message(control); - } - void write_message(message::data_ptr msg) { if (m_write_state == INTURRUPT) { return; @@ -612,11 +737,11 @@ public: socket_type::get_socket(), m_write_buf, //m_write_queue.front()->get_buffer(), - boost::bind( + m_strand.wrap(boost::bind( &type::handle_write, type::shared_from_this(), boost::asio::placeholders::error - ) + )) ); } else { // if we are in an inturrupted state and had nothing else to write @@ -660,23 +785,27 @@ public: write(); } - // terminate cleans up a connection and removes it from the endpoint's - // connection list. + /// Ends the connection by cleaning up based on current state + /* Terminate will review the outstanding resources and close each + * appropriately. Attached handlers will recieve an on_fail or on_close call + * + * TODO: should we protect against long running handlers? + * + * Visibility: protected + * State: Valid from any state except CLOSED. + * Concurrency: Must be called from within m_strand + */ void terminate(bool failed_by_me) { - m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) - << "terminate called" << log::endl; - - if (m_state == session::state::CLOSED) { - // shouldn't be here - } + // If state is closed it either means terminate was called twice or + // something other than this library called it. In either case running + // it will only cause problems + if (m_state == session::state::CLOSED) {return;} + // TODO: ensure any other timers are accounted for here. // cancel the close timeout m_timer.cancel(); - - - // If this was a websocket connection notify the application handler - // about the close using either on_fail or on_close + // version -1 is an HTTP (non-websocket) connection. if (role_type::get_version() != -1) { // TODO: note, calling shutdown on the ssl socket for an HTTP // connection seems to cause shutdown to block for a very long time. @@ -692,16 +821,23 @@ public: if (old_state == session::state::CONNECTING) { get_handler()->on_fail(type::shared_from_this()); } else if (old_state == session::state::OPEN || - old_state == session::state::CLOSING) { + old_state == session::state::CLOSING) + { get_handler()->on_close(type::shared_from_this()); - } else { - // if we were already closed something is wrong } - log_close_result(); + + if (!m_detached) { + log_close_result(); + } + } + // finally remove this connection from the endpoint's list. This will - // remove the last shared pointer to the connection held by WS++. - m_endpoint.remove_connection(type::shared_from_this()); + // remove the last shared pointer to the connection held by WS++. If we + // are DETACHED this has already been done and can't be done again. + if (!m_detached) { + m_endpoint.remove_connection(type::shared_from_this()); + } } // this is called when an async asio call encounters an error @@ -742,6 +878,10 @@ public: handler_ptr get_handler() { return m_handler; } + + boost::asio::strand& get_strand() { + return m_strand; + } public: //protected: TODO: figure out why VCPP2010 doesn't like protected here endpoint_type& m_endpoint; @@ -779,6 +919,8 @@ public: // concurrency support boost::recursive_mutex m_lock; + boost::asio::strand m_strand; + bool m_detached; // TODO: this should be atomic }; // connection related types that it and its policy classes need. diff --git a/src/endpoint.hpp b/src/endpoint.hpp index 437de53870..4021e0f917 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -73,7 +73,7 @@ class endpoint : public endpoint_base, public role< endpoint >, public socket< endpoint >, - boost::noncopyable + boost::noncopyable { public: /// Type of the traits class that stores endpoint related types. @@ -137,7 +137,7 @@ public: m_pool->set_callback(boost::bind(&type::on_new_message,this)); } - /// Destroy and endpoint + /// Destroy an endpoint ~endpoint() { // Tell the memory pool we don't want to be notified about newly freed // messages any more (because we wont be here) @@ -146,6 +146,8 @@ public: // Detach any connections that are still alive at this point boost::lock_guard lock(get_lock()); + typename std::set::iterator it; + while (!m_connections.empty()) { remove_connection(*m_connections.begin()); } @@ -319,7 +321,7 @@ protected: boost::lock_guard lock(get_lock()); // TODO: is this safe to use? - //con->detach(); + con->detach(); m_connections.erase(con); diff --git a/src/roles/client.hpp b/src/roles/client.hpp index 73cf2501cf..2e1eafb46d 100644 --- a/src/roles/client.hpp +++ b/src/roles/client.hpp @@ -208,14 +208,18 @@ public: // Optional virtual bool on_ping(connection_ptr con,std::string) {return true;} virtual void on_pong(connection_ptr con,std::string) {} + virtual void on_pong_timeout(connection_ptr con,std::string) {} }; client (boost::asio::io_service& m) : m_endpoint(static_cast< endpoint_type& >(*this)), m_io_service(m), - m_gen(m_rng,boost::random::uniform_int_distribution<>(std::numeric_limits::min(), - std::numeric_limits::max())) {} + m_gen(m_rng, + boost::random::uniform_int_distribution<>( + std::numeric_limits::min(), + std::numeric_limits::max() + )) {} connection_ptr connect(const std::string& u); diff --git a/src/roles/server.hpp b/src/roles/server.hpp index 9364f2c3f9..0c1cc781b3 100644 --- a/src/roles/server.hpp +++ b/src/roles/server.hpp @@ -180,6 +180,7 @@ public: virtual bool on_ping(connection_ptr con,std::string) {return true;} virtual void on_pong(connection_ptr con,std::string) {} + virtual void on_pong_timeout(connection_ptr con,std::string) {} virtual void http(connection_ptr con) {} };