diff --git a/src/roles/client.hpp b/src/roles/client.hpp index 2e1eafb46d..e2d8526eb2 100644 --- a/src/roles/client.hpp +++ b/src/roles/client.hpp @@ -150,25 +150,13 @@ public: write_request(); } - - void write_request(); void handle_write_request(const boost::system::error_code& error); void read_response(); void handle_read_response(const boost::system::error_code& error, - std::size_t bytes_transferred); + std::size_t bytes_transferred); void log_open_result(); - - // retry once - bool retry() { - m_retry++; - if (m_retry > 1) { - return false; - } else { - return true; - } - } private: endpoint& m_endpoint; connection_type& m_connection; @@ -184,14 +172,13 @@ public: std::string m_handshake_key; http::parser::request m_request; http::parser::response m_response; - - int m_retry; }; // types typedef client type; typedef endpoint endpoint_type; + typedef typename endpoint_traits::connection_type connection_type; typedef typename endpoint_traits::connection_ptr connection_ptr; typedef typename endpoint_traits::handler_ptr handler_ptr; @@ -221,35 +208,19 @@ public: std::numeric_limits::max() )) {} - connection_ptr connect(const std::string& u); - connection_ptr get_connection(const std::string& u); + + connection_ptr connect(const std::string& u); connection_ptr connect(connection_ptr con); - // TODO: add a `perpetual` option - // TODO: error handling for being called in alternate states - // TODO: run should only be callable from `STOPPED` state - void run() { - m_io_service.run(); - } - - // TODO: error handling for being called in alternate states - // TODO: reset should only be callable from `RUNNING` state and should - // clean up existing connections. - void reset() { - m_io_service.reset(); - } - + void run(bool perpetual = false); + void end_perpetual(); + void reset(); protected: - bool is_server() const { - return false; - } - int32_t rand() { - return m_gen(); - } + bool is_server() const {return false;} + int32_t rand() {return m_gen();} private: - void handle_connect(connection_ptr con, - const boost::system::error_code& error); + void handle_connect(connection_ptr con, const boost::system::error_code& error); endpoint_type& m_endpoint; boost::asio::io_service& m_io_service; @@ -259,18 +230,108 @@ private: boost::random::random_device&, boost::random::uniform_int_distribution<> > m_gen; + + boost::shared_ptr m_idle_worker; }; // client implimentation +/// Start the client ASIO loop +/** + * Calls run on the endpoint's io_service. This method will block until the io_service + * run method returns. This method may only be called when the endpoint is in the IDLE + * state. Endpoints start in the idle state and can be returned to the IDLE state by + * calling reset. `run` has a perpetual flag (default is false) that indicates whether + * or not it should return after all connections have been made. + * + * Important note: Calling run with perpetual = false on a client endpoint will return + * immediately unless you have already called connect() at least once. To get around + * this either queue up all connections you want to make before calling run or call + * run with perpetual in another thread. + * + * Visibility: public + * State: Valid from IDLE, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @param perpetual whether or not to run the endpoint in perpetual mode + * @exception websocketpp::exception with code error::INVALID_STATE if called from a state other than IDLE + */ template -typename endpoint_traits::connection_ptr -client::connect(const std::string& u) { - connection_ptr con = get_connection(u); - connect(con); - return con; +void client::run(bool perpetual) { + { + boost::lock_guard lock(m_endpoint.m_lock); + + if (m_endpoint.m_state != endpoint::IDLE) { + throw exception("client::run called from invalid state",error::INVALID_STATE); + } + + if (perpetual) { + m_idle_worker = boost::shared_ptr( + new boost::asio::io_service::work(m_io_service) + ); + } + + m_endpoint.m_state = endpoint::RUNNING; + } + m_io_service.run(); + m_endpoint.m_state = endpoint::STOPPED; } +/// End the idle work loop that keeps the io_service active +/** + * Calling end_perpetual on a client endpoint that was started in perpetual mode (via + * run(true), will stop the idle work object that prevents the run method from + * returning even when there is no work for it to do. Use if you want to gracefully + * stop the endpoint. Use stop() to forcibly stop the endpoint. + * + * Visibility: public + * State: Valid from RUNNING, ignored otherwise + * Concurrency: callable from any thread + */ +template +void client::end_perpetual() { + if (m_idle_worker) { + m_idle_worker.reset(); + } +} + +/// Reset a stopped endpoint. +/** + * Resets an endpoint that was stopped by stop() or whose run() method exited due to + * running out of work. reset() should not be called while the endpoint is running. + * Use stop() and/or end_perpetual() first and then reset once one of those methods + * has fully stopped the endpoint. + * + * Visibility: public + * State: Valid from STOPPED, an exception is thrown otherwise + * Concurrency: callable from any thread + */ +template +void client::reset() { + boost::lock_guard lock(m_endpoint.m_lock); + + if (m_endpoint.m_state != endpoint::STOPPED) { + throw exception("client::reset called from invalid state",error::INVALID_STATE); + } + + m_io_service.reset(); + + m_endpoint.m_state = endpoint::IDLE; +} + +/// Returns a new connection +/** + * Creates and returns a pointer to a new connection to the given URI suitable for passing + * to connect(). This method allows applying connection specific settings before + * performing the connection. + * + * Visibility: public + * State: Valid from IDLE or RUNNING, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @param u The URI that this connection will connect to. + * @return The pointer to the new connection + */ template typename endpoint_traits::connection_ptr client::get_connection(const std::string& u) { @@ -285,8 +346,8 @@ client::get_connection(const std::string& u) { connection_ptr con = m_endpoint.create_connection(); if (!con) { - throw websocketpp::exception("Endpoint is unavailable.", - websocketpp::error::ENDPOINT_UNAVAILABLE); + throw websocketpp::exception("get_connection called from invalid state", + websocketpp::error::INVALID_STATE); } con->set_uri(location); @@ -297,6 +358,17 @@ client::get_connection(const std::string& u) { } } +/// Begin the connect process for the given connection. +/** + * Initiates the async connect request for connection con. + * + * Visibility: public + * State: Valid from IDLE or RUNNING, an exception is thrown otherwise + * Concurrency: callable from any thread + * + * @param con A pointer to the connection to connect + * @return The pointer to con + */ template typename endpoint_traits::connection_ptr client::connect(connection_ptr con) { @@ -322,6 +394,13 @@ client::connect(connection_ptr con) { return con; } +/// Convenience method, equivalent to connect(get_connection(u)) +template +typename endpoint_traits::connection_ptr +client::connect(const std::string& u) { + return connect(get_connection(u)); +} + template void client::handle_connect(connection_ptr con, const boost::system::error_code& error) @@ -375,6 +454,7 @@ void client::handle_connect(connection_ptr con, template template void client::connection::write_request() { + boost::lock_guard lock(m_connection.m_lock); // async write to handle_write m_request.set_method("GET"); @@ -434,26 +514,27 @@ void client::connection::write_request() { m_connection.get_socket(), //boost::asio::buffer(raw), buffer, - boost::bind( + m_connection.get_strand().wrap(boost::bind( &type::handle_write_request, m_connection.shared_from_this(), boost::asio::placeholders::error - ) + )) ); } template template void client::connection::handle_write_request( - const boost::system::error_code& error) + const boost::system::error_code& error) { if (error) { + // TODO: detached state? - - m_endpoint.elog().at(log::elevel::RERROR) << "Error writing WebSocket request. code: " << error << log::endl; + m_endpoint.elog().at(log::elevel::RERROR) + << "Error writing WebSocket request. code: " + << error << log::endl; m_connection.terminate(false); return; - } read_response(); @@ -466,12 +547,12 @@ void client::connection::read_response() { m_connection.get_socket(), m_connection.buffer(), "\r\n\r\n", - boost::bind( + m_connection.get_strand().wrap(boost::bind( &type::handle_read_response, m_connection.shared_from_this(), boost::asio::placeholders::error, boost::asio::placeholders::bytes_transferred - ) + )) ); } @@ -480,8 +561,13 @@ template void client::connection::handle_read_response ( const boost::system::error_code& error, std::size_t bytes_transferred) { + boost::lock_guard lock(m_connection.m_lock); + + // detached check? + if (error) { - m_endpoint.elog().at(log::elevel::RERROR) << "Error reading HTTP request. code: " << error << log::endl; + m_endpoint.elog().at(log::elevel::RERROR) << "Error reading HTTP request. code: " + << error << log::endl; m_connection.terminate(false); return; } @@ -492,10 +578,12 @@ void client::connection::handle_read_response ( if (!m_response.parse_complete(request)) { // not a valid HTTP response // TODO: this should be a client error - throw http::exception("Could not parse server response.",http::status_code::BAD_REQUEST); + throw http::exception("Could not parse server response.", + http::status_code::BAD_REQUEST); } - m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_response.raw() << log::endl; + m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_response.raw() + << log::endl; // error checking if (m_response.get_status_code() != http::status_code::SWITCHING_PROTOCOLS) { @@ -558,9 +646,17 @@ void client::connection::handle_read_response ( m_connection.m_state = session::state::OPEN; - m_endpoint.get_handler()->on_open(m_connection.shared_from_this()); + m_connection.get_handler()->on_open(m_connection.shared_from_this()); - m_connection.handle_read_frame(boost::system::error_code()); + get_io_service().post( + m_connection.m_strand.wrap(boost::bind( + &connection_type::handle_read_frame, + m_connection.shared_from_this(), + boost::system::error_code() + )) + ); + + //m_connection.handle_read_frame(boost::system::error_code()); } catch (const http::exception& e) { m_endpoint.elog().at(log::elevel::RERROR) << "Error processing server handshake. Server HTTP response: "