updates write_flag code to be robust and exception/thread safe

This commit is contained in:
Peter Thorson
2013-02-01 08:59:57 -06:00
parent 8d050767e1
commit 99bc0abffc
2 changed files with 62 additions and 44 deletions

View File

@@ -167,7 +167,7 @@ public:
, m_internal_state(session::internal_state::USER_INIT)
, m_msg_manager(new con_msg_manager_type())
, m_send_buffer_size(0)
, m_temp_lock(false)
, m_write_flag(false)
, m_is_server(is_server)
, m_alog(alog)
, m_elog(elog)
@@ -784,10 +784,8 @@ private:
* TODO: unit tests
*
* @param msg The message to push
*
* @return whether or not the queue was empty.
*/
bool write_push(message_ptr msg);
void write_push(message_ptr msg);
/// Pop a message from the write queue
/**
@@ -834,7 +832,7 @@ private:
mutable mutex_type m_connection_state_lock;
/// The lock used to protect shared state involved in sending messages
/// The lock used to protect the message queue
/**
* Serializes access to the write queue as well as shared state within the
* processor.
@@ -875,7 +873,12 @@ private:
* Lock m_write_lock
*/
std::vector<transport::buffer> m_send_buffer;
bool m_temp_lock;
/// True if there is currently an outstanding transport write
/**
* Lock m_write_lock
*/
bool m_write_flag;
// connection data
request_type m_request;

View File

@@ -103,7 +103,8 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
outgoing_msg = msg;
scoped_lock_type lock(m_write_lock);
needs_writing = write_push(outgoing_msg);
write_push(outgoing_msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
} else {
outgoing_msg = m_msg_manager->get_message();
@@ -118,7 +119,8 @@ lib::error_code connection<config>::send(typename config::message_type::ptr msg)
return ec;
}
needs_writing = write_push(outgoing_msg);
write_push(outgoing_msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
}
if (needs_writing) {
@@ -152,7 +154,8 @@ void connection<config>::ping(const std::string& payload) {
bool needs_writing = false;
{
scoped_lock_type lock(m_write_lock);
needs_writing = write_push(msg);
write_push(msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
}
if (needs_writing) {
@@ -186,7 +189,8 @@ void connection<config>::pong(const std::string& payload, lib::error_code& ec) {
bool needs_writing = false;
{
scoped_lock_type lock(m_write_lock);
needs_writing = write_push(msg);
write_push(msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
}
if (needs_writing) {
@@ -1036,32 +1040,36 @@ void connection<config>::write_frame() {
message_ptr msg;
{
scoped_lock_type lock(m_write_lock);
if (m_temp_lock) {
return;
} else {
m_temp_lock = true;
}
if (m_send_queue.empty()) {
return;
}
msg = write_pop();
if (!msg) {
m_elog.write(log::elevel::warn,"found empty message in write queue");
throw;
}
scoped_lock_type lock(m_write_lock);
// Check the write flag. If true, there is an outstanding transport
// write already. In this case we just return. The write handler will
// start a new write if the write queue isn't empty. If false, we set
// the write flag and proceed to initiate a transport write.
if (m_write_flag) {
return;
}
// Get the next message in the queue. This will return an empty
// message if the queue was empty.
msg = write_pop();
if (!msg) {
return;
}
// At this point we own the next message to be sent and are
// responsible for holding the write flag until it is successfully
// sent or there is some error
m_write_flag = true;
}
const std::string& header = msg->get_header();
const std::string& payload = msg->get_payload();
const std::string& payload = msg->get_payload();
m_send_buffer.push_back(transport::buffer(header.c_str(),header.size()));
m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size()));
if (m_alog.static_test(log::alevel::frame_header)) {
std::stringstream s;
s << "Dispatching write with " << header.size()
@@ -1073,7 +1081,7 @@ void connection<config>::write_frame() {
if (m_alog.static_test(log::alevel::frame_payload)) {
m_alog.write(log::alevel::frame_payload,"Payload: "+utility::to_hex(payload));
}
transport_con_type::async_write(
m_send_buffer,
lib::bind(
@@ -1092,7 +1100,8 @@ void connection<config>::handle_write_frame(bool terminate,
m_send_buffer.clear();
if (ec) {
m_elog.write(log::elevel::devel,"error in handle_write_frame: "+ec.message());
m_elog.write(log::elevel::fatal,"error in handle_write_frame: "+ec.message());
this->terminate();
return;
}
@@ -1107,9 +1116,10 @@ void connection<config>::handle_write_frame(bool terminate,
{
scoped_lock_type lock(m_write_lock);
needs_writing = !m_send_queue.empty();
// release write flag
m_write_flag = false;
m_temp_lock = false;
needs_writing = !m_send_queue.empty();
}
if (needs_writing) {
@@ -1335,11 +1345,11 @@ lib::error_code connection<config>::send_close_frame(close::status::value code,
msg->set_terminal(true);
}
// Concurrency review
bool needs_writing = false;
{
scoped_lock_type lock(m_write_lock);
needs_writing = write_push(msg);
write_push(msg);
needs_writing = !m_write_flag && !m_send_queue.empty();
}
if (needs_writing) {
@@ -1399,26 +1409,31 @@ connection<config>::get_processor(int version) const {
}
template <typename config>
bool connection<config>::write_push(typename config::message_type::ptr msg)
void connection<config>::write_push(typename config::message_type::ptr msg)
{
bool empty = m_send_queue.empty();
if (!msg) {
return;
}
m_send_buffer_size += msg->get_payload().size();
m_send_queue.push(msg);
std::stringstream s;
s << "write_push: message count: " << m_send_queue.size()
<< " buffer size: " << m_send_buffer_size;
m_alog.write(log::alevel::devel,s.str());
return empty;
}
template <typename config>
typename config::message_type::ptr connection<config>::write_pop()
{
message_ptr msg = m_send_queue.front();
message_ptr msg;
if (m_send_queue.empty()) {
return msg;
}
msg = m_send_queue.front();
m_send_buffer_size -= msg->get_payload().size();
m_send_queue.pop();