Add the ability to pause reading on a connection

This commit is contained in:
Peter Thorson
2014-01-25 18:46:44 -06:00
parent 780f7683a4
commit 4393a2562b
6 changed files with 209 additions and 27 deletions

View File

@@ -8,6 +8,9 @@ HEAD
for code. #298
- Feature: Adds a compile time switch to asio transport config to disable
certain multithreading features (some locks, asio strands)
- Feature: Adds the ability to pause reading on a connection. Paused connections will not
read more data from their socket, allowing TCP flow control to work without blocking
the main thread.
- Improvement: Open, close, and pong timeouts can be disabled entirely by setting their
duration to 0.
- Improvement: Numerous performance improvements. Including: tuned default

View File

@@ -499,7 +499,38 @@ BOOST_AUTO_TEST_CASE( client_is_perpetual ) {
}
BOOST_AUTO_TEST_CASE( client_failed_connection ) {
client c;
client c;
run_time_limited_client(c,"http://localhost:9005", 5, false);
}
BOOST_AUTO_TEST_CASE( pause_reading ) {
iostream_server s;
std::string handshake = "GET / HTTP/1.1\r\nHost: www.example.com\r\nConnection: upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n\r\n";
char buffer[2] = { char(0x81), char(0x80) };
// suppress output (it needs a place to go to avoid error but we don't care what it is)
std::stringstream null_output;
s.register_ostream(&null_output);
iostream_server::connection_ptr con = s.get_connection();
con->start();
// read handshake, should work
BOOST_CHECK_EQUAL( con->read_some(handshake.data(), handshake.length()), handshake.length());
// pause reading and try again. The first read should work, the second should return 0
// the first read was queued already after the handshake so it will go through because
// reading wasn't paused when it was queued. The byte it reads wont be enough to
// complete the frame so another read will be requested. This one wont actually happen
// because the connection is paused now.
con->pause_reading();
BOOST_CHECK_EQUAL( con->read_some(buffer, 1), 1);
BOOST_CHECK_EQUAL( con->read_some(buffer+1, 1), 0);
// resume reading and try again. Should work this time because the resume should have
// re-queued a read.
con->resume_reading();
BOOST_CHECK_EQUAL( con->read_some(buffer+1, 1), 1);
}
run_time_limited_client(c,"http://localhost:9005", 5, false);
}

View File

@@ -302,6 +302,7 @@ public:
, m_msg_manager(new con_msg_manager_type())
, m_send_buffer_size(0)
, m_write_flag(false)
, m_read_flag(true)
, m_is_server(is_server)
, m_alog(alog)
, m_elog(elog)
@@ -615,9 +616,46 @@ public:
* @return An error code
*/
lib::error_code interrupt();
/// Transport inturrupt callback
void handle_interrupt();
/// Pause reading of new data
/**
* Signals to the connection to halt reading of new data. While reading is paused,
* the connection will stop reading from its associated socket. In turn this will
* result in TCP based flow control kicking in and slowing data flow from the remote
* endpoint.
*
* This is useful for applications that push new requests to a queue to be processed
* by another thread and need a way to signal when their request queue is full without
* blocking the network processing thread.
*
* Use `resume_reading()` to resume.
*
* If supported by the transport this is done asynchronously. As such reading may not
* stop until the current read operation completes. Typically you can expect to
* receive no more bytes after initiating a read pause than the size of the read
* buffer.
*
* If reading is paused for this connection already nothing is changed.
*/
lib::error_code pause_reading();
/// Pause reading callback
void handle_pause_reading();
/// Resume reading of new data
/**
* Signals to the connection to resume reading of new data after it was paused by
* `pause_reading()`.
*
* If reading is not paused for this connection already nothing is changed.
*/
lib::error_code resume_reading();
/// Resume reading callback
void handle_resume_reading();
/// Send a ping
/**
@@ -1092,8 +1130,8 @@ public:
void handle_open_handshake_timeout(lib::error_code const & ec);
void handle_close_handshake_timeout(lib::error_code const & ec);
void handle_read_frame(lib::error_code const & ec,
size_t bytes_transferred);
void handle_read_frame(lib::error_code const & ec, size_t bytes_transferred);
void read_frame();
/// Get array of WebSocket protocol versions that this connection supports.
const std::vector<int>& get_supported_versions() const;
@@ -1380,6 +1418,9 @@ private:
*/
bool m_write_flag;
/// True if this connection is presently reading new data
bool m_read_flag;
// connection data
request_type m_request;
response_type m_response;

View File

@@ -363,6 +363,43 @@ public:
void interrupt(connection_hdl hdl, lib::error_code & ec);
void interrupt(connection_hdl hdl);
/// Pause reading of new data (exception free)
/**
* Signals to the connection to halt reading of new data. While reading is paused,
* the connection will stop reading from its associated socket. In turn this will
* result in TCP based flow control kicking in and slowing data flow from the remote
* endpoint.
*
* This is useful for applications that push new requests to a queue to be processed
* by another thread and need a way to signal when their request queue is full without
* blocking the network processing thread.
*
* Use `resume_reading()` to resume.
*
* If supported by the transport this is done asynchronously. As such reading may not
* stop until the current read operation completes. Typically you can expect to
* receive no more bytes after initiating a read pause than the size of the read
* buffer.
*
* If reading is paused for this connection already nothing is changed.
*/
void pause_reading(connection_hdl hdl, lib::error_code & ec);
/// Pause reading of new data
void pause_reading(connection_hdl hdl);
/// Resume reading of new data (exception free)
/**
* Signals to the connection to resume reading of new data after it was paused by
* `pause_reading()`.
*
* If reading is not paused for this connection already nothing is changed.
*/
void resume_reading(connection_hdl hdl, lib::error_code & ec);
/// Resume reading of new data
void resume_reading(connection_hdl hdl);
/// Create a message and add it to the outgoing send queue (exception free)
/**
* Convenience method to send a message given a payload string and an opcode

View File

@@ -323,6 +323,41 @@ void connection<config>::handle_interrupt() {
}
}
template <typename config>
lib::error_code connection<config>::pause_reading() {
m_alog.write(log::alevel::devel,"connection connection::pause_reading");
return transport_con_type::dispatch(
lib::bind(
&type::handle_pause_reading,
type::get_shared()
)
);
}
/// Pause reading handler. Not safe to call directly
template <typename config>
void connection<config>::handle_pause_reading() {
m_alog.write(log::alevel::devel,"connection connection::handle_pause_reading");
m_read_flag = false;
}
template <typename config>
lib::error_code connection<config>::resume_reading() {
m_alog.write(log::alevel::devel,"connection connection::resume_reading");
return transport_con_type::dispatch(
lib::bind(
&type::handle_resume_reading,
type::get_shared()
)
);
}
/// Resume reading helper method. Not safe to call directly
template <typename config>
void connection<config>::handle_resume_reading() {
m_read_flag = true;
read_frame();
}
@@ -839,9 +874,9 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
}
}
if (ec == transport::error::tls_short_read) {
m_elog.write(log::elevel::rerror,"got TLS short read, killing connection for now");
this->terminate(ec);
return;
m_elog.write(log::elevel::rerror,"got TLS short read, killing connection for now");
this->terminate(ec);
return;
}
std::stringstream s;
@@ -933,6 +968,16 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
}
}
read_frame();
}
/// Issue a new transport read unless reading is paused.
template <typename config>
void connection<config>::read_frame() {
if (!m_read_flag) {
return;
}
transport_con_type::async_read_at_least(
// std::min wont work with undefined static const values.
// TODO: is there a more elegant way to do this?
@@ -944,12 +989,6 @@ void connection<config>::handle_read_frame(const lib::error_code& ec,
1,
m_buf,
config::connection_read_buffer_size,
/*lib::bind(
&type::handle_read_frame,
type::get_shared(),
lib::placeholders::_1,
lib::placeholders::_2
)*/
m_handle_read_frame
);
}

View File

@@ -87,8 +87,7 @@ endpoint<connection,config>::create_connection() {
}
template <typename connection, typename config>
void endpoint<connection,config>::interrupt(connection_hdl hdl,
lib::error_code & ec)
void endpoint<connection,config>::interrupt(connection_hdl hdl, lib::error_code & ec)
{
connection_ptr con = get_con_from_hdl(hdl,ec);
if (ec) {return;}
@@ -106,8 +105,42 @@ void endpoint<connection,config>::interrupt(connection_hdl hdl) {
}
template <typename connection, typename config>
void endpoint<connection,config>::send(connection_hdl hdl, std::string const &
payload, frame::opcode::value op, lib::error_code & ec)
void endpoint<connection,config>::pause_reading(connection_hdl hdl, lib::error_code & ec)
{
connection_ptr con = get_con_from_hdl(hdl,ec);
if (ec) {return;}
ec = con->pause_reading();
}
template <typename connection, typename config>
void endpoint<connection,config>::pause_reading(connection_hdl hdl) {
lib::error_code ec;
pause_reading(hdl,ec);
if (ec) { throw ec; }
}
template <typename connection, typename config>
void endpoint<connection,config>::resume_reading(connection_hdl hdl, lib::error_code & ec)
{
connection_ptr con = get_con_from_hdl(hdl,ec);
if (ec) {return;}
ec = con->resume_reading();
}
template <typename connection, typename config>
void endpoint<connection,config>::resume_reading(connection_hdl hdl) {
lib::error_code ec;
resume_reading(hdl,ec);
if (ec) { throw ec; }
}
template <typename connection, typename config>
void endpoint<connection,config>::send(connection_hdl hdl, std::string const & payload,
frame::opcode::value op, lib::error_code & ec)
{
connection_ptr con = get_con_from_hdl(hdl,ec);
if (ec) {return;}
@@ -116,8 +149,8 @@ void endpoint<connection,config>::send(connection_hdl hdl, std::string const &
}
template <typename connection, typename config>
void endpoint<connection,config>::send(connection_hdl hdl, std::string const &
payload, frame::opcode::value op)
void endpoint<connection,config>::send(connection_hdl hdl, std::string const & payload,
frame::opcode::value op)
{
lib::error_code ec;
send(hdl,payload,op,ec);
@@ -187,8 +220,7 @@ void endpoint<connection,config>::ping(connection_hdl hdl, std::string const &
}
template <typename connection, typename config>
void endpoint<connection,config>::ping(connection_hdl hdl, std::string const &
payload)
void endpoint<connection,config>::ping(connection_hdl hdl, std::string const & payload)
{
lib::error_code ec;
ping(hdl,payload,ec);
@@ -196,8 +228,8 @@ void endpoint<connection,config>::ping(connection_hdl hdl, std::string const &
}
template <typename connection, typename config>
void endpoint<connection,config>::pong(connection_hdl hdl, std::string const &
payload, lib::error_code & ec)
void endpoint<connection,config>::pong(connection_hdl hdl, std::string const & payload,
lib::error_code & ec)
{
connection_ptr con = get_con_from_hdl(hdl,ec);
if (ec) {return;}
@@ -205,8 +237,7 @@ void endpoint<connection,config>::pong(connection_hdl hdl, std::string const &
}
template <typename connection, typename config>
void endpoint<connection,config>::pong(connection_hdl hdl, std::string const &
payload)
void endpoint<connection,config>::pong(connection_hdl hdl, std::string const & payload)
{
lib::error_code ec;
pong(hdl,payload,ec);