From 99bc0abffcad5a531c17eeaa5aadfbd7334b4f56 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Fri, 1 Feb 2013 08:59:57 -0600 Subject: [PATCH] updates write_flag code to be robust and exception/thread safe --- websocketpp/connection.hpp | 15 +++-- websocketpp/impl/connection_impl.hpp | 91 ++++++++++++++++------------ 2 files changed, 62 insertions(+), 44 deletions(-) diff --git a/websocketpp/connection.hpp b/websocketpp/connection.hpp index 37da591d6d..c642f4027a 100644 --- a/websocketpp/connection.hpp +++ b/websocketpp/connection.hpp @@ -167,7 +167,7 @@ public: , m_internal_state(session::internal_state::USER_INIT) , m_msg_manager(new con_msg_manager_type()) , m_send_buffer_size(0) - , m_temp_lock(false) + , m_write_flag(false) , m_is_server(is_server) , m_alog(alog) , m_elog(elog) @@ -784,10 +784,8 @@ private: * TODO: unit tests * * @param msg The message to push - * - * @return whether or not the queue was empty. */ - bool write_push(message_ptr msg); + void write_push(message_ptr msg); /// Pop a message from the write queue /** @@ -834,7 +832,7 @@ private: mutable mutex_type m_connection_state_lock; - /// The lock used to protect shared state involved in sending messages + /// The lock used to protect the message queue /** * Serializes access to the write queue as well as shared state within the * processor. @@ -875,7 +873,12 @@ private: * Lock m_write_lock */ std::vector m_send_buffer; - bool m_temp_lock; + + /// True if there is currently an outstanding transport write + /** + * Lock m_write_lock + */ + bool m_write_flag; // connection data request_type m_request; diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp index 5e2378e849..1640dfd37b 100644 --- a/websocketpp/impl/connection_impl.hpp +++ b/websocketpp/impl/connection_impl.hpp @@ -103,7 +103,8 @@ lib::error_code connection::send(typename config::message_type::ptr msg) outgoing_msg = msg; scoped_lock_type lock(m_write_lock); - needs_writing = write_push(outgoing_msg); + write_push(outgoing_msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); } else { outgoing_msg = m_msg_manager->get_message(); @@ -118,7 +119,8 @@ lib::error_code connection::send(typename config::message_type::ptr msg) return ec; } - needs_writing = write_push(outgoing_msg); + write_push(outgoing_msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { @@ -152,7 +154,8 @@ void connection::ping(const std::string& payload) { bool needs_writing = false; { scoped_lock_type lock(m_write_lock); - needs_writing = write_push(msg); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { @@ -186,7 +189,8 @@ void connection::pong(const std::string& payload, lib::error_code& ec) { bool needs_writing = false; { scoped_lock_type lock(m_write_lock); - needs_writing = write_push(msg); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { @@ -1036,32 +1040,36 @@ void connection::write_frame() { message_ptr msg; { - scoped_lock_type lock(m_write_lock); - - if (m_temp_lock) { - return; - } else { - m_temp_lock = true; - } - - if (m_send_queue.empty()) { - return; - } - - msg = write_pop(); - - if (!msg) { - m_elog.write(log::elevel::warn,"found empty message in write queue"); - throw; - } + scoped_lock_type lock(m_write_lock); + + // Check the write flag. If true, there is an outstanding transport + // write already. In this case we just return. The write handler will + // start a new write if the write queue isn't empty. If false, we set + // the write flag and proceed to initiate a transport write. + if (m_write_flag) { + return; + } + + // Get the next message in the queue. This will return an empty + // message if the queue was empty. + msg = write_pop(); + + if (!msg) { + return; + } + + // At this point we own the next message to be sent and are + // responsible for holding the write flag until it is successfully + // sent or there is some error + m_write_flag = true; } - + const std::string& header = msg->get_header(); - const std::string& payload = msg->get_payload(); - + const std::string& payload = msg->get_payload(); + m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); - + if (m_alog.static_test(log::alevel::frame_header)) { std::stringstream s; s << "Dispatching write with " << header.size() @@ -1073,7 +1081,7 @@ void connection::write_frame() { if (m_alog.static_test(log::alevel::frame_payload)) { m_alog.write(log::alevel::frame_payload,"Payload: "+utility::to_hex(payload)); } - + transport_con_type::async_write( m_send_buffer, lib::bind( @@ -1092,7 +1100,8 @@ void connection::handle_write_frame(bool terminate, m_send_buffer.clear(); if (ec) { - m_elog.write(log::elevel::devel,"error in handle_write_frame: "+ec.message()); + m_elog.write(log::elevel::fatal,"error in handle_write_frame: "+ec.message()); + this->terminate(); return; } @@ -1107,9 +1116,10 @@ void connection::handle_write_frame(bool terminate, { scoped_lock_type lock(m_write_lock); - needs_writing = !m_send_queue.empty(); + // release write flag + m_write_flag = false; - m_temp_lock = false; + needs_writing = !m_send_queue.empty(); } if (needs_writing) { @@ -1335,11 +1345,11 @@ lib::error_code connection::send_close_frame(close::status::value code, msg->set_terminal(true); } - // Concurrency review bool needs_writing = false; { scoped_lock_type lock(m_write_lock); - needs_writing = write_push(msg); + write_push(msg); + needs_writing = !m_write_flag && !m_send_queue.empty(); } if (needs_writing) { @@ -1399,26 +1409,31 @@ connection::get_processor(int version) const { } template -bool connection::write_push(typename config::message_type::ptr msg) +void connection::write_push(typename config::message_type::ptr msg) { - bool empty = m_send_queue.empty(); + if (!msg) { + return; + } m_send_buffer_size += msg->get_payload().size(); - m_send_queue.push(msg); std::stringstream s; s << "write_push: message count: " << m_send_queue.size() << " buffer size: " << m_send_buffer_size; m_alog.write(log::alevel::devel,s.str()); - - return empty; } template typename config::message_type::ptr connection::write_pop() { - message_ptr msg = m_send_queue.front(); + message_ptr msg; + + if (m_send_queue.empty()) { + return msg; + } + + msg = m_send_queue.front(); m_send_buffer_size -= msg->get_payload().size(); m_send_queue.pop();