diff --git a/src/connection.hpp b/src/connection.hpp index e3d6030476..07ac48bbe7 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -506,14 +506,19 @@ public: } // try and read more - if (m_state != session::state::CLOSED && - m_processor->get_bytes_needed() > 0) { + if (m_state != session::state::CLOSED && m_processor->get_bytes_needed() > 0) { // TODO: read timeout timer? + // TODO: make a configuration option for this. + // bytes_to_get = 1 will result in the fastest failing on bad UTF8 + // bytes_to_get = bytes_needed will result in less processing overhead + //size_t bytes_to_get = static_cast(m_processor->get_bytes_needed()); + size_t bytes_to_get = 1; + boost::asio::async_read( socket_type::get_socket(), m_buf, - boost::asio::transfer_at_least(static_cast(m_processor->get_bytes_needed())), + boost::asio::transfer_at_least(bytes_to_get), boost::bind( &type::handle_read_frame, type::shared_from_this(), @@ -634,9 +639,20 @@ public: msg->reset(frame::opcode::CLOSE); m_processor->prepare_close_frame(msg,code,reason); - write_message(msg); - m_write_state = INTURRUPT; + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + msg + )) + ); + + // By setting inturrupt here we flush all outgoing messages from the message + // queue. Doing so is compliant behavior but "non-strict". The default will be + // fully compliant. This should be a configurable setting. Production environments + // should choose the appropriate option. + // m_write_state = INTURRUPT; } // send an acknowledgement close frame @@ -687,10 +703,18 @@ public: m_processor->prepare_close_frame(msg, m_local_close_code, m_local_close_reason); - write_message(msg); - m_write_state = INTURRUPT; + + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + msg + )) + ); + //m_write_state = INTURRUPT; } + // must be called async using ioservice.post void write_message(message::data_ptr msg) { if (m_write_state == INTURRUPT) { return; @@ -722,23 +746,19 @@ public: break; } - if (m_write_queue.size() > 0) { + if (!m_write_queue.empty()) { if (m_write_state == IDLE) { m_write_state = WRITING; } - - //std::vector data; - - + m_write_buf.push_back(boost::asio::buffer(m_write_queue.front()->get_header())); m_write_buf.push_back(boost::asio::buffer(m_write_queue.front()->get_payload())); - m_endpoint.alog().at(log::alevel::DEVEL) << "write header: " << zsutil::to_hex(m_write_queue.front()->get_header()) << log::endl; + //m_endpoint.alog().at(log::alevel::DEVEL) << "write header: " << zsutil::to_hex(m_write_queue.front()->get_header()) << log::endl; boost::asio::async_write( socket_type::get_socket(), m_write_buf, - //m_write_queue.front()->get_buffer(), m_strand.wrap(boost::bind( &type::handle_write, type::shared_from_this(), @@ -778,13 +798,22 @@ public: m_write_buffer -= m_write_queue.front()->get_payload().size(); m_write_buf.clear(); + + frame::opcode::value code = m_write_queue.front()->get_opcode(); + m_write_queue.pop(); if (m_write_state == WRITING) { m_write_state = IDLE; } - write(); + if (code != frame::opcode::CLOSE) { + write(); + } else { + m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) + << "Exit after writing close frame" << log::endl; + terminate(false); + } } /// Ends the connection by cleaning up based on current state