From 4393a2562b277e1d8aae8110582fe82bc53fbe71 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sat, 25 Jan 2014 18:46:44 -0600 Subject: [PATCH] Add the ability to pause reading on a connection --- changelog.md | 3 ++ test/transport/integration.cpp | 37 ++++++++++++++++-- websocketpp/connection.hpp | 47 +++++++++++++++++++++-- websocketpp/endpoint.hpp | 37 ++++++++++++++++++ websocketpp/impl/connection_impl.hpp | 57 +++++++++++++++++++++++----- websocketpp/impl/endpoint_impl.hpp | 55 +++++++++++++++++++++------ 6 files changed, 209 insertions(+), 27 deletions(-) diff --git a/changelog.md b/changelog.md index cf75cd6b47..e74efd1877 100644 --- a/changelog.md +++ b/changelog.md @@ -8,6 +8,9 @@ HEAD for code. #298 - Feature: Adds a compile time switch to asio transport config to disable certain multithreading features (some locks, asio strands) +- Feature: Adds the ability to pause reading on a connection. Paused connections will not + read more data from their socket, allowing TCP flow control to work without blocking + the main thread. - Improvement: Open, close, and pong timeouts can be disabled entirely by setting their duration to 0. - Improvement: Numerous performance improvements. Including: tuned default diff --git a/test/transport/integration.cpp b/test/transport/integration.cpp index d422aabfe8..0185a4d7d0 100644 --- a/test/transport/integration.cpp +++ b/test/transport/integration.cpp @@ -499,7 +499,38 @@ BOOST_AUTO_TEST_CASE( client_is_perpetual ) { } BOOST_AUTO_TEST_CASE( client_failed_connection ) { - client c; + client c; + + run_time_limited_client(c,"http://localhost:9005", 5, false); +} + + +BOOST_AUTO_TEST_CASE( pause_reading ) { + iostream_server s; + std::string handshake = "GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n\r\n"; + char buffer[2] = { char(0x81), char(0x80) }; + + // suppress output (it needs a place to go to avoid error but we don't care what it is) + std::stringstream null_output; + s.register_ostream(&null_output); + + iostream_server::connection_ptr con = s.get_connection(); + con->start(); + + // read handshake, should work + BOOST_CHECK_EQUAL( con->read_some(handshake.data(), handshake.length()), handshake.length()); + + // pause reading and try again. The first read should work, the second should return 0 + // the first read was queued already after the handshake so it will go through because + // reading wasn't paused when it was queued. The byte it reads wont be enough to + // complete the frame so another read will be requested. This one wont actually happen + // because the connection is paused now. + con->pause_reading(); + BOOST_CHECK_EQUAL( con->read_some(buffer, 1), 1); + BOOST_CHECK_EQUAL( con->read_some(buffer+1, 1), 0); + // resume reading and try again. Should work this time because the resume should have + // re-queued a read. + con->resume_reading(); + BOOST_CHECK_EQUAL( con->read_some(buffer+1, 1), 1); +} - run_time_limited_client(c,"http://localhost:9005", 5, false); -} \ No newline at end of file diff --git a/websocketpp/connection.hpp b/websocketpp/connection.hpp index c8be36331b..f555cfaabe 100644 --- a/websocketpp/connection.hpp +++ b/websocketpp/connection.hpp @@ -302,6 +302,7 @@ public: , m_msg_manager(new con_msg_manager_type()) , m_send_buffer_size(0) , m_write_flag(false) + , m_read_flag(true) , m_is_server(is_server) , m_alog(alog) , m_elog(elog) @@ -615,9 +616,46 @@ public: * @return An error code */ lib::error_code interrupt(); - + /// Transport inturrupt callback void handle_interrupt(); + + /// Pause reading of new data + /** + * Signals to the connection to halt reading of new data. While reading is paused, + * the connection will stop reading from its associated socket. In turn this will + * result in TCP based flow control kicking in and slowing data flow from the remote + * endpoint. + * + * This is useful for applications that push new requests to a queue to be processed + * by another thread and need a way to signal when their request queue is full without + * blocking the network processing thread. + * + * Use `resume_reading()` to resume. + * + * If supported by the transport this is done asynchronously. As such reading may not + * stop until the current read operation completes. Typically you can expect to + * receive no more bytes after initiating a read pause than the size of the read + * buffer. + * + * If reading is paused for this connection already nothing is changed. + */ + lib::error_code pause_reading(); + + /// Pause reading callback + void handle_pause_reading(); + + /// Resume reading of new data + /** + * Signals to the connection to resume reading of new data after it was paused by + * `pause_reading()`. + * + * If reading is not paused for this connection already nothing is changed. + */ + lib::error_code resume_reading(); + + /// Resume reading callback + void handle_resume_reading(); /// Send a ping /** @@ -1092,8 +1130,8 @@ public: void handle_open_handshake_timeout(lib::error_code const & ec); void handle_close_handshake_timeout(lib::error_code const & ec); - void handle_read_frame(lib::error_code const & ec, - size_t bytes_transferred); + void handle_read_frame(lib::error_code const & ec, size_t bytes_transferred); + void read_frame(); /// Get array of WebSocket protocol versions that this connection supports. const std::vector& get_supported_versions() const; @@ -1380,6 +1418,9 @@ private: */ bool m_write_flag; + /// True if this connection is presently reading new data + bool m_read_flag; + // connection data request_type m_request; response_type m_response; diff --git a/websocketpp/endpoint.hpp b/websocketpp/endpoint.hpp index 2df806a343..567ca6b543 100644 --- a/websocketpp/endpoint.hpp +++ b/websocketpp/endpoint.hpp @@ -363,6 +363,43 @@ public: void interrupt(connection_hdl hdl, lib::error_code & ec); void interrupt(connection_hdl hdl); + /// Pause reading of new data (exception free) + /** + * Signals to the connection to halt reading of new data. While reading is paused, + * the connection will stop reading from its associated socket. In turn this will + * result in TCP based flow control kicking in and slowing data flow from the remote + * endpoint. + * + * This is useful for applications that push new requests to a queue to be processed + * by another thread and need a way to signal when their request queue is full without + * blocking the network processing thread. + * + * Use `resume_reading()` to resume. + * + * If supported by the transport this is done asynchronously. As such reading may not + * stop until the current read operation completes. Typically you can expect to + * receive no more bytes after initiating a read pause than the size of the read + * buffer. + * + * If reading is paused for this connection already nothing is changed. + */ + void pause_reading(connection_hdl hdl, lib::error_code & ec); + + /// Pause reading of new data + void pause_reading(connection_hdl hdl); + + /// Resume reading of new data (exception free) + /** + * Signals to the connection to resume reading of new data after it was paused by + * `pause_reading()`. + * + * If reading is not paused for this connection already nothing is changed. + */ + void resume_reading(connection_hdl hdl, lib::error_code & ec); + + /// Resume reading of new data + void resume_reading(connection_hdl hdl); + /// Create a message and add it to the outgoing send queue (exception free) /** * Convenience method to send a message given a payload string and an opcode diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp index 0449e74661..6191b9e765 100644 --- a/websocketpp/impl/connection_impl.hpp +++ b/websocketpp/impl/connection_impl.hpp @@ -323,6 +323,41 @@ void connection::handle_interrupt() { } } +template +lib::error_code connection::pause_reading() { + m_alog.write(log::alevel::devel,"connection connection::pause_reading"); + return transport_con_type::dispatch( + lib::bind( + &type::handle_pause_reading, + type::get_shared() + ) + ); +} + +/// Pause reading handler. Not safe to call directly +template +void connection::handle_pause_reading() { + m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading"); + m_read_flag = false; +} + +template +lib::error_code connection::resume_reading() { + m_alog.write(log::alevel::devel,"connection connection::resume_reading"); + return transport_con_type::dispatch( + lib::bind( + &type::handle_resume_reading, + type::get_shared() + ) + ); +} + +/// Resume reading helper method. Not safe to call directly +template +void connection::handle_resume_reading() { + m_read_flag = true; + read_frame(); +} @@ -839,9 +874,9 @@ void connection::handle_read_frame(const lib::error_code& ec, } } if (ec == transport::error::tls_short_read) { - m_elog.write(log::elevel::rerror,"got TLS short read, killing connection for now"); - this->terminate(ec); - return; + m_elog.write(log::elevel::rerror,"got TLS short read, killing connection for now"); + this->terminate(ec); + return; } std::stringstream s; @@ -933,6 +968,16 @@ void connection::handle_read_frame(const lib::error_code& ec, } } + read_frame(); +} + +/// Issue a new transport read unless reading is paused. +template +void connection::read_frame() { + if (!m_read_flag) { + return; + } + transport_con_type::async_read_at_least( // std::min wont work with undefined static const values. // TODO: is there a more elegant way to do this? @@ -944,12 +989,6 @@ void connection::handle_read_frame(const lib::error_code& ec, 1, m_buf, config::connection_read_buffer_size, - /*lib::bind( - &type::handle_read_frame, - type::get_shared(), - lib::placeholders::_1, - lib::placeholders::_2 - )*/ m_handle_read_frame ); } diff --git a/websocketpp/impl/endpoint_impl.hpp b/websocketpp/impl/endpoint_impl.hpp index 0dd14c5d13..3101ec8bdd 100644 --- a/websocketpp/impl/endpoint_impl.hpp +++ b/websocketpp/impl/endpoint_impl.hpp @@ -87,8 +87,7 @@ endpoint::create_connection() { } template -void endpoint::interrupt(connection_hdl hdl, - lib::error_code & ec) +void endpoint::interrupt(connection_hdl hdl, lib::error_code & ec) { connection_ptr con = get_con_from_hdl(hdl,ec); if (ec) {return;} @@ -106,8 +105,42 @@ void endpoint::interrupt(connection_hdl hdl) { } template -void endpoint::send(connection_hdl hdl, std::string const & - payload, frame::opcode::value op, lib::error_code & ec) +void endpoint::pause_reading(connection_hdl hdl, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + ec = con->pause_reading(); +} + +template +void endpoint::pause_reading(connection_hdl hdl) { + lib::error_code ec; + pause_reading(hdl,ec); + if (ec) { throw ec; } +} + +template +void endpoint::resume_reading(connection_hdl hdl, lib::error_code & ec) +{ + connection_ptr con = get_con_from_hdl(hdl,ec); + if (ec) {return;} + + ec = con->resume_reading(); +} + +template +void endpoint::resume_reading(connection_hdl hdl) { + lib::error_code ec; + resume_reading(hdl,ec); + if (ec) { throw ec; } +} + + + +template +void endpoint::send(connection_hdl hdl, std::string const & payload, + frame::opcode::value op, lib::error_code & ec) { connection_ptr con = get_con_from_hdl(hdl,ec); if (ec) {return;} @@ -116,8 +149,8 @@ void endpoint::send(connection_hdl hdl, std::string const & } template -void endpoint::send(connection_hdl hdl, std::string const & - payload, frame::opcode::value op) +void endpoint::send(connection_hdl hdl, std::string const & payload, + frame::opcode::value op) { lib::error_code ec; send(hdl,payload,op,ec); @@ -187,8 +220,7 @@ void endpoint::ping(connection_hdl hdl, std::string const & } template -void endpoint::ping(connection_hdl hdl, std::string const & - payload) +void endpoint::ping(connection_hdl hdl, std::string const & payload) { lib::error_code ec; ping(hdl,payload,ec); @@ -196,8 +228,8 @@ void endpoint::ping(connection_hdl hdl, std::string const & } template -void endpoint::pong(connection_hdl hdl, std::string const & - payload, lib::error_code & ec) +void endpoint::pong(connection_hdl hdl, std::string const & payload, + lib::error_code & ec) { connection_ptr con = get_con_from_hdl(hdl,ec); if (ec) {return;} @@ -205,8 +237,7 @@ void endpoint::pong(connection_hdl hdl, std::string const & } template -void endpoint::pong(connection_hdl hdl, std::string const & - payload) +void endpoint::pong(connection_hdl hdl, std::string const & payload) { lib::error_code ec; pong(hdl,payload,ec);