From 6ca0b49ae0b989cf879c2b3285eceb439300232b Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Wed, 21 Mar 2012 21:57:58 -0600 Subject: [PATCH] more concurrency cleanup, misc features --- src/connection.hpp | 766 +++++++++++++++++++++++++++++++++++---------- 1 file changed, 596 insertions(+), 170 deletions(-) diff --git a/src/connection.hpp b/src/connection.hpp index 07ac48bbe7..ceff2bc6e4 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -106,6 +106,8 @@ public: socket_type(e), m_endpoint(e), m_handler(h), + m_read_threshold(e.get_read_threshold()), + m_silent_close(e.get_silent_close()), m_timer(e.endpoint_base::m_io_service,boost::posix_time::seconds(0)), m_state(session::state::CONNECTING), m_write_buffer(0), @@ -136,7 +138,8 @@ public: // connection& operator=(connection const&) = delete /// Start the websocket connection async read loop - /* Begins the connection's async read loop. First any socket level + /** + * Begins the connection's async read loop. First any socket level * initialization will happen (TLS handshake, etc) then the handshake and * frame reads will start. * @@ -156,17 +159,21 @@ public: } /// Return current connection state - /* Visibility: public + /** + * Visibility: public * State: valid always * Concurrency: callable from anywhere + * + * @return Current connection state */ session::state::value get_state() const { - // TODO: ensure read of m_state is atomic + boost::lock_guard lock(m_lock); return m_state; } /// Detaches the connection from its endpoint - /* Called by the m_endpoint's destructor. In state DETACHED m_endpoint is + /** + * Called by the m_endpoint's destructor. In state DETACHED m_endpoint is * no longer avaliable. The connection may stick around if the end user * application needs to read state from it (ie close reasons, etc) but no * operations requring the endpoint can be performed. @@ -176,60 +183,16 @@ public: * Concurrency: safe as long as state is valid */ void detach() { - // TODO: ensure write of m_detached is atomic + boost::lock_guard lock(m_lock); m_detached = true; } - /// convenience overload for sending a one off message. - /* Creates a message, fills in payload, and queues a write as a message of - * type op. Default type is TEXT. - * - * Visibility: public - * State: Valid from OPEN, ignored otherwise - * Concurrency: callable from any thread - */ - void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT) { - // TODO: ensure read of m_state is atomic - if (m_state != session::state::OPEN) {return;} - - websocketpp::message::data::ptr msg = get_control_message2(); - - if (!msg) { - throw exception("Endpoint send queue is full",error::SEND_QUEUE_FULL); - } - if (op != frame::opcode::TEXT && op != frame::opcode::BINARY) { - throw exception("opcode must be either TEXT or BINARY",error::GENERIC); - } - - msg->reset(op); - msg->set_payload(payload); - send(msg); - } - - /// Send message - /* Prepares (if necessary) and sends the given message - * - * Visibility: public - * State: Valid from OPEN, ignored otherwise - * Concurrency: callable from any thread - */ - void send(message::data_ptr msg) { - // TODO: need to make sure access to m_state is atomic - if (m_state != session::state::OPEN) {return;} - - m_processor->prepare_frame(msg); - - m_endpoint.endpoint_base::m_io_service.post( - m_strand.wrap(boost::bind( - &type::write_message, - type::shared_from_this(), - msg - )) - ); - } + void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT); + void send(message::data_ptr msg); /// Close connection - /* Closes the websocket connection with the given status code and reason. + /** + * Closes the websocket connection with the given status code and reason. * From state OPEN a clean connection close is initiated. From any other * state the socket will be closed and the connection cleaned up. * @@ -239,9 +202,15 @@ public: * Visibility: public * State: Valid from OPEN, ignored otherwise * Concurrency: callable from any thread + * + * @param code Close code to send + * @param reason Close reason to send */ void close(close::status::value code, const std::string& reason = "") { - // TODO: need to make sure access to m_state is atomic + boost::lock_guard lock(m_lock); + + if (m_detached) {return;} + if (m_state == session::state::CONNECTING) { m_endpoint.endpoint_base::m_io_service.post( m_strand.wrap(boost::bind( @@ -266,7 +235,8 @@ public: } /// Send Ping - /* Initiates a ping with the given payload. + /** + * Initiates a ping with the given payload. * * There is no feedback directly from ping. Feedback will be provided via * the on_pong or on_pong_timeout callbacks. @@ -274,9 +244,12 @@ public: * Visibility: public * State: Valid from OPEN, ignored otherwise * Concurrency: callable from any thread + * + * @param payload Payload to be used for the ping */ void ping(const std::string& payload) { - // TODO: need to make sure access to m_state is atomic + boost::lock_guard lock(m_lock); + if (m_state != session::state::OPEN) {return;} if (m_detached) {return;} @@ -297,16 +270,20 @@ public: } /// Send Pong - /* Initiates a pong with the given payload. + /** + * Initiates a pong with the given payload. * * There is no feedback from pong. * * Visibility: public * State: Valid from OPEN, ignored otherwise * Concurrency: callable from any thread + * + * @param payload Payload to be used for the pong */ void pong(const std::string& payload) { - // TODO: need to make sure access to m_state is atomic + boost::lock_guard lock(m_lock); + if (m_state != session::state::OPEN) {return;} if (m_detached) {return;} @@ -327,47 +304,212 @@ public: } /// Return send buffer size (payload bytes) - /* Initiates a pong with the given payload. - * - * There is no feedback from pong. - * + /** * Visibility: public * State: Valid from any state. * Concurrency: callable from any thread + * + * @return The current number of bytes in the outgoing send buffer. */ uint64_t buffered_amount() const { - // TODO: ensure m_write_buffer is accessed atomically + boost::lock_guard lock(m_lock); + return m_write_buffer; } - // Valid for CLOSED state + /// Get Local Close Code + /** + * Returns the close code that WebSocket++ believes to be the reason the connection + * closed. This value may include special values otherwise not allowed on the wire + * such as 1006 for abnormal closure or 1015 for TLS handshake not performed. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Close code supplied by WebSocket++ + */ close::status::value get_local_close_code() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_local_close_code called from state other than CLOSED", + error::INVALID_STATE); + } + return m_local_close_code; } - utf8_string get_local_close_reason() const { + + /// Get Local Close Reason + /** + * Returns the close reason that WebSocket++ believes to be the reason the connection + * closed. This is almost certainly the value passed to the `close()` method. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Close reason supplied by WebSocket++ + */ + std::string get_local_close_reason() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_local_close_reason called from state other than CLOSED", + error::INVALID_STATE); + } + return m_local_close_reason; } + + /// Get Remote Close Code + /** + * Returns the close reason that was received over the wire from the remote peer. + * This method may return values that are invalid on the wire such as 1005/No close + * code received, 1006 abnormal closure, or 1015 Bad TLS handshake. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Close code supplied by remote endpoint + */ close::status::value get_remote_close_code() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_remote_close_code called from state other than CLOSED", + error::INVALID_STATE); + } + return m_remote_close_code; } - utf8_string get_remote_close_reason() const { + + /// Get Remote Close Reason + /** + * Returns the close reason that was received over the wire from the remote peer. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Close reason supplied by remote endpoint + */ + std::string get_remote_close_reason() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_remote_close_reason called from state other than CLOSED", + error::INVALID_STATE); + } + return m_remote_close_reason; } + + /// Get failed_by_me + /** + * Returns whether or not the connection ending sequence was initiated by this + * endpoint. Will return true when this endpoint chooses to close normally or when it + * discovers an error and chooses to close the connection (either forcibly or not). + * Will return false when the close handshake was initiated by the remote endpoint or + * if the TCP connection was dropped or broken prematurely. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Whether or not the connection ending sequence was initiated by this endpoint + */ bool get_failed_by_me() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_failed_by_me called from state other than CLOSED", + error::INVALID_STATE); + } + return m_failed_by_me; } + + /// Get dropped_by_me + /** + * Returns whether or not the TCP connection was dropped by this endpoint. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return whether or not the TCP connection was dropped by this endpoint. + */ bool get_dropped_by_me() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_dropped_by_me called from state other than CLOSED", + error::INVALID_STATE); + } + return m_dropped_by_me; } + + /// Get closed_by_me + /** + * Returns whether or not the WebSocket closing handshake was initiated by this + * endpoint. + * + * Visibility: public + * State: Valid from CLOSED, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return Whether or not closing handshake was initiated by this endpoint + */ bool get_closed_by_me() const { + boost::lock_guard lock(m_lock); + + if (m_state != session::state::CLOSED) { + throw exception("get_closed_by_me called from state other than CLOSED", + error::INVALID_STATE); + } + return m_closed_by_me; } - // flow control interface + /// Get get_data_message + /** + * Returns a pointer to an outgoing message buffer. If there are no outgoing messages + * available, a NO_OUTGOING_MESSAGES exception is thrown. This happens when the + * endpoint has exhausted its resources dedicated to buffering outgoing messages. To + * deal with this error either increase available outgoing resources or throttle back + * the rate and size of outgoing messages. + * + * Visibility: public + * State: Valid from OPEN, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @return A pointer to the message buffer + */ message::data_ptr get_data_message() { - return m_endpoint.get_data_message(); + boost::lock_guard lock(m_lock); + + if (m_detached) { + throw exception("Endpoint was destroyed",error::ENDPOINT_UNAVAILABLE); + } + + if (m_state != session::state::OPEN) { + throw exception("get_data_message called from state other than OPEN", + error::INVALID_STATE); + } + + message::data_ptr msg = m_endpoint.get_data_message(); + + if (!msg) { + throw exception("No outgoing messages available",error::NO_OUTGOING_MESSAGES); + } else { + return msg; + } } + message::data_ptr get_control_message2() { return m_endpoint.get_control_message(); } @@ -376,23 +518,104 @@ public: return m_control_message; } - - // stuff about switching handlers on the fly - // TODO: organize more + /// Set connection handler + /** + * Sets the handler that will process callbacks for this connection. The switch is + * processed asynchronously so set_handler will return immediately. The existing + * handler will receive an on_unload callback immediately before the switch. After + * on_unload returns the original handler will not receive any more callbacks from + * this connection. The new handler will receive an on_load callback immediately after + * the switch and before any other callbacks are processed. + * + * Visibility: public + * State: Valid from any state + * Concurrency: callable from any thread + * + * @param new_handler the new handler to set + */ void set_handler(handler_ptr new_handler) { + boost::lock_guard lock(m_lock); + if (m_detached) {return;} if (!new_handler) { - m_endpoint.elog().at(log::elevel::FATAL) - << "Tried to switch to a NULL handler." << log::endl; + elog().at(log::elevel::FATAL) << "Tried to switch to a NULL handler." + << log::endl; terminate(true); return; } - handler_ptr old_handler = get_handler(); + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::set_handler_internal, + type::shared_from_this(), + new_handler + )) + ); + } + + /// Set connection read threshold + /** + * Set the read threshold for this connection. See endpoint::set_read_threshold for + * more information about the read threshold. + * + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @param val Size of the threshold in bytes + */ + void set_read_threshold(size_t val) { + boost::lock_guard lock(m_lock); - old_handler->on_unload(type::shared_from_this(),new_handler); - m_handler = new_handler; - new_handler->on_load(type::shared_from_this(),old_handler); + m_read_threshold = val; + } + + /// Get connection read threshold + /** + * Returns the connection read threshold. See set_read_threshold for more information + * about the read threshold. + * + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @return Size of the threshold in bytes + */ + size_t get_read_threshold() const { + boost::lock_guard lock(m_lock); + + return m_read_threshold; + } + + /// Set connection silent close setting + /** + * See endpoint::set_silent_close for more information. + * + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @param val New silent close value + */ + void set_silent_close(bool val) { + boost::lock_guard lock(m_lock); + + m_silent_close = val; + } + + /// Get connection silent close setting + /** + * Visibility: public + * State: valid always + * Concurrency: callable from anywhere + * + * @return Current silent close value + * @see set_silent_close() + */ + bool get_silent_close() const { + boost::lock_guard lock(m_lock); + + return m_silent_close; } // TODO: deprecated. will change to get_rng? @@ -400,14 +623,77 @@ public: return 0; } + /// Returns a reference to the endpoint's access logger. + /** + * Visibility: public + * State: Valid from any state + * Concurrency: callable from any thread + * + * @return A reference to the endpoint's access logger + */ typename endpoint::alogger_type& alog() { + if (m_detached) { + throw exception("Endpoint was destroyed",error::ENDPOINT_UNAVAILABLE); + } + return m_endpoint.alog(); } + + /// Returns a reference to the endpoint's error logger. + /** + * Visibility: public + * State: Valid from any state + * Concurrency: callable from any thread + * + * @return A reference to the endpoint's error logger + */ + typename endpoint::elogger_type& elog() { + if (m_detached) { + throw exception("Endpoint was destroyed",error::ENDPOINT_UNAVAILABLE); + } + + return m_endpoint.elog(); + } + + /// Returns a pointer to the endpoint's handler. + /** + * Visibility: public + * State: Valid from any state + * Concurrency: callable from any thread + * + * @return A pointer to the endpoint's default handler + */ + handler_ptr get_handler() { + boost::lock_guard lock(m_lock); + return m_handler; + } + + /// Returns a reference to the connections strand object. + /** + * Visibility: public + * State: Valid from any state + * Concurrency: callable from any thread + * + * @return A reference to the connection's strand object + */ + boost::asio::strand& get_strand() { + boost::lock_guard lock(m_lock); + return m_strand; + } public: //protected: TODO: figure out why VCPP2010 doesn't like protected here + + /// Socket initialization callback + /* This is the async return point for initializing the socket policy. After this point + * the socket is open and ready. + * + * Visibility: protected + * State: Valid only once per connection during the initialization sequence. + * Concurrency: Must be called within m_strand + */ void handle_socket_init(const boost::system::error_code& error) { if (error) { - m_endpoint.elog().at(log::elevel::RERROR) + elog().at(log::elevel::RERROR) << "Socket initialization failed, error code: " << error << log::endl; this->terminate(false); @@ -417,15 +703,33 @@ public: role_type::async_init(); } public: + /// ASIO callback for async_read of more frame data + /** + * Callback after ASIO has read some data that needs to be sent to a frame processor + * + * TODO: think about how receiving a very large buffer would affect concurrency due to + * that handler running for an unusually long period of time? Is a maximum size + * necessary on m_buf? + * TODO: think about how terminate here works with the locks and concurrency + * + * Visibility: protected + * State: valid for states OPEN and CLOSING, ignored otherwise + * Concurrency: must be called via strand. Only one async_read should be outstanding + * at a time. Should only be called from inside handle_read_frame or by + * the role's init method. + */ void handle_read_frame(const boost::system::error_code& error) { + boost::lock_guard lock(m_lock); + // check if state changed while we were waiting for a read. if (m_state == session::state::CLOSED) { return; } + if (m_state == session::state::CONNECTING) { return; } if (error) { if (error == boost::asio::error::eof) { // got unexpected EOF // TODO: log error - m_endpoint.elog().at(log::elevel::RERROR) + elog().at(log::elevel::RERROR) << "Remote connection dropped unexpectedly" << log::endl; terminate(false); } else if (error == boost::asio::error::operation_aborted) { @@ -433,14 +737,14 @@ public: // all connections on this io_service) // TODO: log error - m_endpoint.elog().at(log::elevel::RERROR) + elog().at(log::elevel::RERROR) << "Terminating due to abort: " << error << log::endl; terminate(true); } else { // Other unexpected error // TODO: log error - m_endpoint.elog().at(log::elevel::RERROR) + elog().at(log::elevel::RERROR) << "Terminating due to unknown error: " << error << log::endl; terminate(false); @@ -469,18 +773,18 @@ public: switch(e.code()) { case processor::error::PROTOCOL_VIOLATION: - send_close(close::status::PROTOCOL_ERROR, e.what()); + send_close(close::status::PROTOCOL_ERROR,e.what()); break; case processor::error::PAYLOAD_VIOLATION: - send_close(close::status::INVALID_PAYLOAD, e.what()); + send_close(close::status::INVALID_PAYLOAD,e.what()); break; case processor::error::INTERNAL_ENDPOINT_ERROR: - send_close(close::status::INTERNAL_ENDPOINT_ERROR, e.what()); + send_close(close::status::INTERNAL_ENDPOINT_ERROR,e.what()); break; case processor::error::SOFT_ERROR: continue; case processor::error::MESSAGE_TOO_BIG: - send_close(close::status::MESSAGE_TOO_BIG, e.what()); + send_close(close::status::MESSAGE_TOO_BIG,e.what()); break; case processor::error::OUT_OF_MESSAGES: // we need to wait for a message to be returned by the @@ -491,13 +795,9 @@ public: return; default: // Fatal error, forcibly end connection immediately. - m_endpoint.elog().at(log::elevel::DEVEL) - << "Dropping TCP due to unrecoverable exception: " - << e.code() - << " (" - << e.what() - << ")" - << log::endl; + elog().at(log::elevel::DEVEL) + << "Dropping TCP due to unrecoverable exception: " << e.code() + << " (" << e.what() << ")" << log::endl; terminate(true); } break; @@ -509,43 +809,78 @@ public: if (m_state != session::state::CLOSED && m_processor->get_bytes_needed() > 0) { // TODO: read timeout timer? - // TODO: make a configuration option for this. - // bytes_to_get = 1 will result in the fastest failing on bad UTF8 - // bytes_to_get = bytes_needed will result in less processing overhead - //size_t bytes_to_get = static_cast(m_processor->get_bytes_needed()); - size_t bytes_to_get = 1; - boost::asio::async_read( socket_type::get_socket(), m_buf, - boost::asio::transfer_at_least(bytes_to_get), - boost::bind( + boost::asio::transfer_at_least(std::min( + m_read_threshold, + static_cast(m_processor->get_bytes_needed()) + )), + m_strand.wrap(boost::bind( &type::handle_read_frame, type::shared_from_this(), boost::asio::placeholders::error - ) + )) ); } } public: //protected: TODO: figure out why VCPP2010 doesn't like protected here - void process_data(message::data_ptr msg) { - get_handler()->on_message(type::shared_from_this(),msg); + /// Internal Implementation for set_handler + /** + * Visibility: protected + * State: Valid for all states + * Concurrency: Must be called within m_strand + * + * @param new_handler The handler to switch to + */ + void set_handler_internal(handler_ptr new_handler) { + boost::lock_guard lock(m_lock); + + if (!new_handler) { + elog().at(log::elevel::FATAL) + << "Tried to switch to a NULL handler." << log::endl; + // TODO: unserialized call to terminate? + terminate(true); + return; + } + + handler_ptr old_handler = m_handler; + + old_handler->on_unload(type::shared_from_this(),new_handler); + m_handler = new_handler; + new_handler->on_load(type::shared_from_this(),old_handler); } + /// Dispatch a data message + /** + * Visibility: private + * State: no state checking, should only be called within handle_read_frame + * Concurrency: Must be called within m_stranded method + */ + void process_data(message::data_ptr msg) { + m_handler->on_message(type::shared_from_this(),msg); + } + + /// Dispatch or handle a control message + /** + * Visibility: private + * State: no state checking, should only be called within handle_read_frame + * Concurrency: Must be called within m_stranded method + */ void process_control(message::control_ptr msg) { bool response; switch (msg->get_opcode()) { case frame::opcode::PING: - response = get_handler()->on_ping(type::shared_from_this(), - msg->get_payload()); + response = m_handler->on_ping(type::shared_from_this(), + msg->get_payload()); if (response) { pong(msg->get_payload()); } break; case frame::opcode::PONG: - get_handler()->on_pong(type::shared_from_this(), - msg->get_payload()); + m_handler->on_pong(type::shared_from_this(), + msg->get_payload()); // TODO: disable ping response timer break; @@ -557,15 +892,13 @@ public: if (m_state == session::state::OPEN) { // other end is initiating - m_endpoint.elog().at(log::elevel::DEVEL) - << "sending close ack" << log::endl; + elog().at(log::elevel::DEVEL) << "sending close ack" << log::endl; // TODO: send_close_ack(); } else if (m_state == session::state::CLOSING) { // ack of our close - m_endpoint.elog().at(log::elevel::DEVEL) - << "got close ack" << log::endl; + elog().at(log::elevel::DEVEL) << "got close ack" << log::endl; terminate(false); // TODO: start terminate timer (if client) @@ -579,30 +912,35 @@ public: } /// Send a close frame - /* Initiates a close handshake by sending a close frame with the given code + /** + * Initiates a close handshake by sending a close frame with the given code * and reason. * * Visibility: protected * State: Valid for OPEN, ignored otherwise. * Concurrency: Must be called within m_strand + * + * @param code The code to send + * @param reason The reason to send */ void send_close(close::status::value code, const std::string& reason) { + boost::lock_guard lock(m_lock); + if (m_detached) {return;} if (m_state != session::state::OPEN) { - m_endpoint.elog().at(log::elevel::WARN) - << "Tried to disconnect a session that wasn't open" - << log::endl; + elog().at(log::elevel::WARN) + << "Tried to disconnect a session that wasn't open" << log::endl; return; } if (close::status::invalid(code)) { - m_endpoint.elog().at(log::elevel::WARN) + elog().at(log::elevel::WARN) << "Tried to close a connection with invalid close code: " << code << log::endl; return; } else if (close::status::reserved(code)) { - m_endpoint.elog().at(log::elevel::WARN) + elog().at(log::elevel::WARN) << "Tried to close a connection with reserved close code: " << code << log::endl; return; @@ -621,8 +959,13 @@ public: )) ); - m_local_close_code = code; - m_local_close_reason = reason; + if (m_silent_close) { + m_local_close_code = close::status::NO_STATUS; + m_local_close_reason = ""; + } else { + m_local_close_code = code; + m_local_close_reason = reason; + } // TODO: optimize control messages and handle case where endpoint is // out of messages @@ -630,9 +973,8 @@ public: if (!msg) { // server is out of resources, close connection. - m_endpoint.elog().at(log::elevel::RERROR) - << "Server has run out of message buffers." - << log::endl; + elog().at(log::elevel::RERROR) + << "Server has run out of message buffers." << log::endl; terminate(true); return; } @@ -655,12 +997,18 @@ public: // m_write_state = INTURRUPT; } - // send an acknowledgement close frame + /// send an acknowledgement close frame + /** + * Visibility: private + * State: no state checking, should only be called within process_control + * Concurrency: Must be called within m_stranded method + */ void send_close_ack() { - // TODO: state should be OPEN - // echo close value unless there is a good reason not to. - if (m_remote_close_code == close::status::NO_STATUS) { + if (m_silent_close) { + m_local_close_code = close::status::NO_STATUS; + m_local_close_reason = ""; + } else if (m_remote_close_code == close::status::NO_STATUS) { m_local_close_code = close::status::NORMAL; m_local_close_reason = ""; } else if (m_remote_close_code == close::status::ABNORMAL_CLOSE) { @@ -692,17 +1040,14 @@ public: if (!msg) { // server is out of resources, close connection. - m_endpoint.elog().at(log::elevel::RERROR) - << "Server has run out of message buffers." - << log::endl; + elog().at(log::elevel::RERROR) + << "Server has run out of message buffers." << log::endl; terminate(true); return; } msg->reset(frame::opcode::CLOSE); - m_processor->prepare_close_frame(msg, - m_local_close_code, - m_local_close_reason); + m_processor->prepare_close_frame(msg,m_local_close_code,m_local_close_reason); m_endpoint.endpoint_base::m_io_service.post( m_strand.wrap(boost::bind( @@ -711,14 +1056,19 @@ public: msg )) ); - //m_write_state = INTURRUPT; + //m_write_state = INTURRUPT; } - // must be called async using ioservice.post + /// Push message to write queue and start writer if it was idle + /** + * Visibility: protected (called only by asio dispatcher) + * State: Valid from OPEN and CLOSING, ignored otherwise + * Concurrency: Must be called within m_stranded method + */ void write_message(message::data_ptr msg) { - if (m_write_state == INTURRUPT) { - return; - } + boost::lock_guard lock(m_lock); + if (m_state != session::state::OPEN && m_state != session::state::CLOSING) {return;} + if (m_write_state == INTURRUPT) {return;} m_write_buffer += msg->get_payload().size(); m_write_queue.push(msg); @@ -726,6 +1076,12 @@ public: write(); } + /// Begin async write of next message in list + /** + * Visibility: private + * State: Valid only as called from write_message or handle_write + * Concurrency: Must be called within m_stranded method + */ void write() { switch (m_write_state) { case IDLE: @@ -769,18 +1125,24 @@ public: // if we are in an inturrupted state and had nothing else to write // it is safe to terminate the connection. if (m_write_state == INTURRUPT) { - m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) + alog().at(log::alevel::DEBUG_CLOSE) << "Exit after inturrupt" << log::endl; terminate(false); } } } + /// async write callback + /** + * Visibility: protected (called only by asio dispatcher) + * State: Valid from OPEN and CLOSING, ignored otherwise + * Concurrency: Must be called within m_stranded method + */ void handle_write(const boost::system::error_code& error) { if (error) { if (error == boost::asio::error::operation_aborted) { // previous write was aborted - m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) + alog().at(log::alevel::DEBUG_CLOSE) << "handle_write was called with operation_aborted error" << log::endl; } else { @@ -790,8 +1152,10 @@ public: } } + boost::lock_guard lock(m_lock); + if (m_write_queue.size() == 0) { - m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) + alog().at(log::alevel::DEBUG_CLOSE) << "handle_write called with empty queue" << log::endl; return; } @@ -808,16 +1172,19 @@ public: } if (code != frame::opcode::CLOSE) { - write(); + // only continue next write if the connection is still open + if (m_state == session::state::OPEN || m_state == session::state::CLOSING) { + write(); + } } else { - m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) + alog().at(log::alevel::DEBUG_CLOSE) << "Exit after writing close frame" << log::endl; terminate(false); } } /// Ends the connection by cleaning up based on current state - /* Terminate will review the outstanding resources and close each + /** Terminate will review the outstanding resources and close each * appropriately. Attached handlers will recieve an on_fail or on_close call * * TODO: should we protect against long running handlers? @@ -825,8 +1192,11 @@ public: * Visibility: protected * State: Valid from any state except CLOSED. * Concurrency: Must be called from within m_strand + * + * @param failed_by_me Whether or not to mark the connection as failed by me */ void terminate(bool failed_by_me) { + boost::lock_guard lock(m_lock); // If state is closed it either means terminate was called twice or // something other than this library called it. In either case running // it will only cause problems @@ -850,17 +1220,16 @@ public: m_state = session::state::CLOSED; if (old_state == session::state::CONNECTING) { - get_handler()->on_fail(type::shared_from_this()); + m_handler->on_fail(type::shared_from_this()); } else if (old_state == session::state::OPEN || old_state == session::state::CLOSING) { - get_handler()->on_close(type::shared_from_this()); + m_handler->on_close(type::shared_from_this()); } if (!m_detached) { log_close_result(); - } - + } } // finally remove this connection from the endpoint's list. This will @@ -873,31 +1242,30 @@ public: // this is called when an async asio call encounters an error void log_error(std::string msg,const boost::system::error_code& e) { - m_endpoint.elog().at(log::elevel::RERROR) - << msg << "(" << e << ")" << log::endl; + elog().at(log::elevel::RERROR) << msg << "(" << e << ")" << log::endl; } void log_close_result() { - m_endpoint.alog().at(log::alevel::DISCONNECT) - //<< "Disconnect " << (m_was_clean ? "Clean" : "Unclean") - << "Disconnect " - << " close local:[" << m_local_close_code - << (m_local_close_reason == "" ? "" : ","+m_local_close_reason) - << "] remote:[" << m_remote_close_code - << (m_remote_close_reason == "" ? "" : ","+m_remote_close_reason) << "]" - << log::endl; + alog().at(log::alevel::DISCONNECT) + //<< "Disconnect " << (m_was_clean ? "Clean" : "Unclean") + << "Disconnect " + << " close local:[" << m_local_close_code + << (m_local_close_reason == "" ? "" : ","+m_local_close_reason) + << "] remote:[" << m_remote_close_code + << (m_remote_close_reason == "" ? "" : ","+m_remote_close_reason) << "]" + << log::endl; } void fail_on_expire(const boost::system::error_code& error) { if (error) { if (error != boost::asio::error::operation_aborted) { - m_endpoint.elog().at(log::elevel::DEVEL) - << "fail_on_expire timer ended in unknown error" << log::endl; + elog().at(log::elevel::DEVEL) + << "fail_on_expire timer ended in unknown error" << log::endl; terminate(false); } return; } - m_endpoint.elog().at(log::elevel::DEVEL) + elog().at(log::elevel::DEVEL) << "fail_on_expire timer expired" << log::endl; terminate(true); } @@ -905,22 +1273,19 @@ public: boost::asio::streambuf& buffer() { return m_buf; } - - handler_ptr get_handler() { - return m_handler; - } - - boost::asio::strand& get_strand() { - return m_strand; - } public: //protected: TODO: figure out why VCPP2010 doesn't like protected here endpoint_type& m_endpoint; - handler_ptr m_handler; + + // Overridable connection specific settings + handler_ptr m_handler; // object to dispatch callbacks to + size_t m_read_threshold; // maximum number of bytes to fetch in + // a single async read. + bool m_silent_close; // suppresses the return of detailed + // close codes. // Network resources boost::asio::streambuf m_buf; - boost::asio::deadline_timer m_timer; // WebSocket connection state @@ -969,7 +1334,68 @@ struct connection_traits< connection > { typedef role< type > role_type; typedef socket< type > socket_type; }; + +/// convenience overload for sending a one off message. +/** + * Creates a message, fills in payload, and queues a write as a message of + * type op. Default type is TEXT. + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + * + * @param payload Payload to write_state + * @param op opcode to send the message as + */ +template class role,template class socket> +void +connection::send(const std::string& payload,frame::opcode::value op) +{ + { + boost::lock_guard lock(m_lock); + if (m_state != session::state::OPEN) {return;} + } + websocketpp::message::data::ptr msg = get_control_message2(); + + if (!msg) { + throw exception("Endpoint send queue is full",error::SEND_QUEUE_FULL); + } + if (op != frame::opcode::TEXT && op != frame::opcode::BINARY) { + throw exception("opcode must be either TEXT or BINARY",error::GENERIC); + } + + msg->reset(op); + msg->set_payload(payload); + send(msg); +} + +/// Send message +/** + * Prepares (if necessary) and sends the given message + * + * Visibility: public + * State: Valid from OPEN, ignored otherwise + * Concurrency: callable from any thread + * + * @param msg A pointer to a data message buffer to send. + */ +template class role,template class socket> +void connection::send(message::data_ptr msg) { + boost::lock_guard lock(m_lock); + if (m_state != session::state::OPEN) {return;} + + m_processor->prepare_frame(msg); + + m_endpoint.endpoint_base::m_io_service.post( + m_strand.wrap(boost::bind( + &type::write_message, + type::shared_from_this(), + msg + )) + ); +} + } // namespace websocketpp #endif // WEBSOCKETPP_CONNECTION_HPP