diff --git a/src/connection.hpp b/src/connection.hpp index f3da9d0a16..e894e7ada6 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -48,6 +48,7 @@ #include #include #include +#include namespace websocketpp { @@ -84,8 +85,6 @@ public: //friend class role; //friend class socket; - //friend class role::template connection; - //friend class socket::template connection; friend class role< connection >; friend class socket< connection >; @@ -95,6 +94,11 @@ public: INTURRUPT = 2 }; + enum read_state { + READING = 0, + WAITING = 1 + }; + connection(endpoint_type& e,handler_ptr h) : role_type(e), socket_type(e), @@ -103,9 +107,14 @@ public: m_timer(e.endpoint_base::m_io_service,boost::posix_time::seconds(0)), m_state(session::state::CONNECTING), m_write_buffer(0), - m_write_state(IDLE) + m_write_state(IDLE), + m_remote_close_code(close::status::ABNORMAL_CLOSE), + m_read_state(READING) { socket_type::init(); + + m_control_message = message::control_ptr(new message::control()); + m_read_queue_avaliable.push(message::data_ptr(new message::data())); } // SHOULD BE PROTECTED @@ -127,15 +136,26 @@ public: } // Valid for OPEN state - void send(const utf8_string& payload) { - binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::TEXT, - false,payload)); + void send(const utf8_string& payload,bool binary = false) { + binary_string_ptr msg; + if (binary) { + msg = m_processor->prepare_frame(frame::opcode::BINARY,false,payload); + } else { + msg = m_processor->prepare_frame(frame::opcode::TEXT,false,payload); + } - m_endpoint.endpoint_base::m_io_service.post( + // TODO: decide which of these to use. Direct function call better + // ensures that writes triggered by reads sent immediately prior to a + // close frame get written before the acknowledgement close frame. + // The async option will probably reduce latency + // See Autobahn test 7.1.3 + write_message(msg); + + /*m_endpoint.endpoint_base::m_io_service.post( boost::bind( &type::write_message, type::shared_from_this(), - msg)); + msg)); */ } void send(const binary_string& data) { binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY, @@ -153,7 +173,7 @@ public: binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING, false,payload)); - m_endpoint.m_io_service.post( + m_endpoint.endpoint_base::m_io_service.post( boost::bind( &type::write_message, type::shared_from_this(), @@ -162,7 +182,7 @@ public: void pong(const binary_string& payload) { binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG, false,payload)); - m_endpoint.m_io_service.post( + m_endpoint.endpoint_base::m_io_service.post( boost::bind( &type::write_message, type::shared_from_this(), @@ -199,18 +219,47 @@ public: // flow control interface message::data_ptr get_data_message() { // if we have one of this type free - if (!m_read_queue_data.empty()) { - message::data_ptr p = m_read_queue_data.front(); - m_read_queue_data.pop(); + if (!m_read_queue_avaliable.empty()) { + message::data_ptr p = m_read_queue_avaliable.front(); + m_read_queue_avaliable.pop(); + m_read_queue_used.insert(p); return p; } else { return message::data_ptr(); } } + void recycle(message::data_ptr p) { + if (m_read_queue_used.erase(p) == 0) { + // tried to recycle a pointer we don't control. + } else { + m_read_queue_avaliable.push(p); + + if (m_read_state == WAITING) { + m_endpoint.endpoint_base::m_io_service.post( + boost::bind( + &type::handle_read_frame, + type::shared_from_this(), + boost::system::error_code() + ) + ); + m_read_state = READING; + } + } + } + message::control_ptr get_control_message() { return m_control_message; } + + // TODO: deprecated. will change to get_rng? + int32_t gen() { + return 0; + } + + typename endpoint::alogger_type& alog() { + return m_endpoint.alog(); + } protected: void handle_socket_init(const boost::system::error_code& error) { if (error) { @@ -272,7 +321,7 @@ protected: case processor::error::PAYLOAD_VIOLATION: send_close(close::status::INVALID_PAYLOAD, e.what()); break; - case processor::error::INTERNAL_SERVER_ERROR: + case processor::error::INTERNAL_ENDPOINT_ERROR: send_close(close::status::INTERNAL_ENDPOINT_ERROR, e.what()); break; case processor::error::SOFT_ERROR: @@ -284,6 +333,7 @@ protected: // we need to wait for a message to be returned by the // client. We exit the read loop. handle_read_frame // will be restarted by recycle() + m_read_state = WAITING; return; default: // Fatal error, forcibly end connection immediately. @@ -314,141 +364,50 @@ protected: ); } } - - void handle_read_frame_old(const boost::system::error_code& error) { - // check if state changed while we were waiting for a read. - if (m_state == session::state::CLOSED) { return; } - - if (error) { - if (error == boost::asio::error::eof) { - // got unexpected EOF - // TODO: log error - terminate(false); - } else if (error == boost::asio::error::operation_aborted) { - // got unexpected abort (likely our server issued an abort on - // all connections on this io_service) - - // TODO: log error - terminate(true); - } else { - // Other unexpected error - - // TODO: log error - terminate(false); - } - } - - // process data from the buffer just read into - std::istream s(&m_buf); - - while (m_state != session::state::CLOSED && m_buf.size() > 0) { - try { - m_processor->consume(s); - - if (m_processor->ready()) { - process_message(); - m_processor->reset(); - } - } catch (const processor::exception& e) { - if (m_processor->ready()) { - m_processor->reset(); - } - - if (e.code() == processor::error::PROTOCOL_VIOLATION) { - send_close(close::status::PROTOCOL_ERROR, e.what()); - } else if (e.code() == processor::error::PAYLOAD_VIOLATION) { - send_close(close::status::INVALID_PAYLOAD, e.what()); - } else if (e.code() == processor::error::INTERNAL_SERVER_ERROR) { - send_close(close::status::POLICY_VIOLATION, e.what()); - } else if (e.code() == processor::error::SOFT_ERROR) { - // ignore and continue processing frames - continue; - } else { - // Fatal error, forcibly end connection immediately. - m_endpoint.elog().at(log::elevel::DEVEL) - << "Dropping TCP due to unrecoverable exception" - << log::endl; - terminate(true); - } - break; - } - } - - // try and read more - if (m_state != session::state::CLOSED && - m_processor->get_bytes_needed() > 0) { - // TODO: read timeout timer? - - boost::asio::async_read( - socket_type::get_socket(), - m_buf, - boost::asio::transfer_at_least(m_processor->get_bytes_needed()), - boost::bind( - &type::handle_read_frame, - type::shared_from_this(), - boost::asio::placeholders::error - ) - ); - } + void process_data(message::data_ptr msg) { + m_endpoint.get_handler()->on_message(type::shared_from_this(),msg); } - void process_message() { + void process_control(message::control_ptr msg) { bool response; - switch (m_processor->get_opcode()) { - case frame::opcode::TEXT: - m_endpoint.get_handler()->on_message( - type::shared_from_this(), - m_processor->get_utf8_payload()); - break; - case frame::opcode::BINARY: - m_endpoint.get_handler()->on_message( - type::shared_from_this(), - m_processor->get_binary_payload()); - break; + switch (msg->get_opcode()) { case frame::opcode::PING: response = m_endpoint.get_handler()->on_ping( type::shared_from_this(), - m_processor->get_binary_payload()); + msg->get_payload() + ); if (response) { // send response ping - write_message(m_processor->prepare_frame(frame::opcode::PONG,false,*m_processor->get_binary_payload())); + write_message(m_processor->prepare_frame(frame::opcode::PONG,false,msg->get_payload())); } break; case frame::opcode::PONG: m_endpoint.get_handler()->on_pong( type::shared_from_this(), - m_processor->get_binary_payload()); + msg->get_payload()); // TODO: disable ping response timer break; case frame::opcode::CLOSE: - m_remote_close_code = m_processor->get_close_code(); - m_remote_close_reason = m_processor->get_close_reason(); + m_remote_close_code = msg->get_close_code(); + m_remote_close_reason = msg->get_close_reason(); // check that the codes we got over the wire are valid - if (close::status::invalid(m_remote_close_code)) { - throw processor::exception("Invalid close code",processor::error::PROTOCOL_VIOLATION); - } - - if (close::status::reserved(m_remote_close_code)) { - throw processor::exception("Reserved close code",processor::error::PROTOCOL_VIOLATION); - } - if (m_state == session::state::OPEN) { // other end is initiating m_endpoint.elog().at(log::elevel::DEVEL) - << "sending close ack" << log::endl; + << "sending close ack" << log::endl; // TODO: send_close_ack(); } else if (m_state == session::state::CLOSING) { // ack of our close m_endpoint.elog().at(log::elevel::DEVEL) - << "got close ack" << log::endl; + << "got close ack" << log::endl; terminate(false); // TODO: start terminate timer (if client) @@ -458,7 +417,6 @@ protected: throw processor::exception("Invalid Opcode",processor::error::PROTOCOL_VIOLATION); break; } - } void send_close(close::status::value code, const std::string& reason) { @@ -515,9 +473,11 @@ protected: // frame throw "shouldn't be here"; } else if (close::status::invalid(m_remote_close_code)) { + // TODO: shouldn't be able to get here now either m_local_close_code = close::status::PROTOCOL_ERROR; m_local_close_reason = "Status code is invalid"; } else if (close::status::reserved(m_remote_close_code)) { + // TODO: shouldn't be able to get here now either m_local_close_code = close::status::PROTOCOL_ERROR; m_local_close_reason = "Status code is reserved"; } else { @@ -537,6 +497,10 @@ protected: } void write_message(binary_string_ptr msg) { + if (m_write_state == INTURRUPT) { + return; + } + m_write_buffer += msg->size(); m_write_queue.push(msg); write(); @@ -722,8 +686,10 @@ protected: bool m_dropped_by_me; // Read queue + read_state m_read_state; message::control_ptr m_control_message; - std::queue m_read_queue_data; + std::queue m_read_queue_avaliable; + std::set m_read_queue_used; }; // connection related types that it and its policy classes need.