From 7ef6ea4ee3e3bffd60363d1b9633ed8e67bdb57b Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 8 Jan 2012 20:19:41 -0600 Subject: [PATCH] updates connection to use control message pool --- src/connection.hpp | 72 +++++++++++++++++++++++++++------------------- 1 file changed, 42 insertions(+), 30 deletions(-) diff --git a/src/connection.hpp b/src/connection.hpp index d6f55d6dde..dc5bb0885a 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -134,29 +134,23 @@ public: } // Valid for OPEN state - void send(const utf8_string& payload,bool binary = false) { - /*binary_string_ptr msg; - if (binary) { - msg = m_processor->prepare_frame(frame::opcode::BINARY,!m_endpoint.is_server(),payload); - } else { - msg = m_processor->prepare_frame(frame::opcode::TEXT,!m_endpoint.is_server(),payload); - } - - // TODO: decide which of these to use. Direct function call better - // ensures that writes triggered by reads sent immediately prior to a - // close frame get written before the acknowledgement close frame. - // The async option will probably reduce latency - // See Autobahn test 7.1.3 - //write_message(msg); - - m_endpoint.endpoint_base::m_io_service.post( - boost::bind( - &type::write_message, - type::shared_from_this(), - msg));*/ + /// convenience overload for sending a one off text message. + void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT) { + websocketpp::message::data::ptr msg = get_control_message2(); + + if (!msg) { + throw exception("Endpoint send queue is full",error::SEND_QUEUE_FULL); + } + if (op != frame::opcode::TEXT && op != frame::opcode::BINARY) { + throw exception("opcode must be either TEXT or BINARY",error::GENERIC); + } + + msg->reset(op); + msg->set_payload(payload); + send(msg); } void send(message::data_ptr msg) { - m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand()); + m_processor->prepare_frame(msg); write_message(msg); } @@ -204,6 +198,10 @@ public: return m_endpoint.get_data_message(); } + message::data_ptr get_control_message2() { + return m_endpoint.get_control_message(); + } + message::control_ptr get_control_message() { return m_control_message; } @@ -254,17 +252,20 @@ public: if (error == boost::asio::error::eof) { // got unexpected EOF // TODO: log error + m_endpoint.elog().at(log::elevel::ERROR) << "Remote connection dropped unexpectedly" << log::endl; terminate(false); } else if (error == boost::asio::error::operation_aborted) { // got unexpected abort (likely our server issued an abort on // all connections on this io_service) // TODO: log error + m_endpoint.elog().at(log::elevel::ERROR) << "Terminating due to abort: " << error << log::endl; terminate(true); } else { // Other unexpected error // TODO: log error + m_endpoint.elog().at(log::elevel::ERROR) << "Terminating due to unknown error: " << error << log::endl; terminate(false); } } @@ -428,7 +429,7 @@ protected: // TODO: optimize control messages and handle case where endpoint is // out of messages - message::data_ptr msg = get_data_message(); + message::data_ptr msg = get_control_message2(); if (!msg) { // server is out of resources, close connection. @@ -440,7 +441,7 @@ protected: } msg->reset(frame::opcode::CLOSE); - m_processor->prepare_close_frame(msg,!m_endpoint.is_server(),rand(),code,reason); + m_processor->prepare_close_frame(msg,code,reason); write_message(msg); m_write_state = INTURRUPT; @@ -479,11 +480,19 @@ protected: // TODO: optimize control messages and handle case where endpoint is // out of messages - message::data_ptr msg = get_data_message(); + message::data_ptr msg = get_control_message2(); + + if (!msg) { + // server is out of resources, close connection. + m_endpoint.elog().at(log::elevel::ERROR) + << "Server has run out of message buffers." + << log::endl; + terminate(true); + return; + } + msg->reset(frame::opcode::CLOSE); m_processor->prepare_close_frame(msg, - !m_endpoint.is_server(), - rand(), m_local_close_code, m_local_close_reason); write_message(msg); @@ -493,20 +502,20 @@ protected: void send_ping(const std::string& payload) { // TODO: optimize control messages and handle case where // endpoint is out of messages - message::data_ptr control = get_data_message(); + message::data_ptr control = get_control_message2(); control->reset(frame::opcode::PING); control->set_payload(payload); - m_processor->prepare_frame(control,!m_endpoint.is_server(),rand()); + m_processor->prepare_frame(control); write_message(control); } void send_pong(const std::string& payload) { // TODO: optimize control messages and handle case where // endpoint is out of messages - message::data_ptr control = get_data_message(); + message::data_ptr control = get_control_message2(); control->reset(frame::opcode::PONG); control->set_payload(payload); - m_processor->prepare_frame(control,!m_endpoint.is_server(),rand()); + m_processor->prepare_frame(control); write_message(control); } @@ -551,6 +560,8 @@ protected: data.push_back(boost::asio::buffer(m_write_queue.front()->get_header())); data.push_back(boost::asio::buffer(m_write_queue.front()->get_payload())); + m_endpoint.alog().at(log::alevel::DEVEL) << "write header: " << to_hex(m_write_queue.front()->get_header()) << log::endl; + boost::asio::async_write( socket_type::get_socket(), data, @@ -564,6 +575,7 @@ protected: // if we are in an inturrupted state and had nothing else to write // it is safe to terminate the connection. if (m_write_state == INTURRUPT) { + m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) << "Exit after inturrupt" << log::endl; terminate(false); } }