adds timer support to asio endpoints, impliments dns resolve timer references #226

This commit is contained in:
Peter Thorson
2013-05-08 08:52:52 -05:00
parent b408ab876e
commit d85ea32851
2 changed files with 224 additions and 148 deletions

View File

@@ -80,9 +80,9 @@ public:
/// Type of a pointer to the ASIO io_service being used
typedef boost::asio::io_service* io_service_ptr;
/// Type of a pointer to the ASIO timer class
typedef lib::shared_ptr<boost::asio::deadline_timer> timer_ptr;
/// Type of a pointer to the ASIO timer class
typedef lib::shared_ptr<boost::asio::deadline_timer> timer_ptr;
// generate and manage our own io_service
explicit connection(bool is_server, alog_type& alog, elog_type& elog)
: m_is_server(is_server)
@@ -136,8 +136,8 @@ public:
* @param ec A status value
*/
void set_proxy(const std::string & uri, lib::error_code & ec) {
// TODO: return errors for illegal URIs here?
// TODO: should https urls be illegal for the moment?
// TODO: return errors for illegal URIs here?
// TODO: should https urls be illegal for the moment?
m_proxy = uri;
m_proxy_data.reset(new proxy_data());
ec = lib::error_code();
@@ -145,9 +145,9 @@ public:
/// Set the proxy to connect through (exception)
void set_proxy(const std::string & uri) {
lib::error_code ec;
set_proxy(uri,ec);
if (ec) { throw ec; }
lib::error_code ec;
set_proxy(uri,ec);
if (ec) { throw ec; }
}
/// Set the basic auth credentials to use (exception free)
@@ -164,26 +164,26 @@ public:
* @param ec A status value
*/
void set_proxy_basic_auth(const std::string & username, const
std::string & password, lib::error_code & ec)
std::string & password, lib::error_code & ec)
{
if (!m_proxy_data) {
ec = make_error_code(websocketpp::error::invalid_state);
return;
ec = make_error_code(websocketpp::error::invalid_state);
return;
}
// TODO: username can't contain ':'
std::string val = "Basic "+base64_encode(username + ":" + password);
m_proxy_data->req.replace_header("Proxy-Authorization",val);
ec = lib::error_code();
// TODO: username can't contain ':'
std::string val = "Basic "+base64_encode(username + ":" + password);
m_proxy_data->req.replace_header("Proxy-Authorization",val);
ec = lib::error_code();
}
/// Set the basic auth credentials to use (exception)
void set_proxy_basic_auth(const std::string & username, const
std::string & password)
std::string & password)
{
lib::error_code ec;
set_proxy_basic_auth(username,password,ec);
if (ec) { throw ec; }
lib::error_code ec;
set_proxy_basic_auth(username,password,ec);
if (ec) { throw ec; }
}
/// Set the proxy timeout duration (exception free)
@@ -198,8 +198,8 @@ public:
*/
void set_proxy_timeout(long duration, lib::error_code & ec) {
if (!m_proxy_data) {
ec = make_error_code(websocketpp::error::invalid_state);
return;
ec = make_error_code(websocketpp::error::invalid_state);
return;
}
m_proxy_data->timeout_proxy = duration;
@@ -208,9 +208,9 @@ public:
/// Set the proxy timeout duration (exception)
void set_proxy_timeout(long duration) {
lib::error_code ec;
set_proxy_timeout(duration,ec);
if (ec) { throw ec; }
lib::error_code ec;
set_proxy_timeout(duration,ec);
if (ec) { throw ec; }
}
const std::string & get_proxy() const {
@@ -253,7 +253,8 @@ public:
*/
lib::error_code proxy_init(const std::string & authority) {
if (!m_proxy_data) {
return websocketpp::error::make_error_code(websocketpp::error::invalid_state);
return websocketpp::error::make_error_code(
websocketpp::error::invalid_state);
}
m_proxy_data->req.set_version("HTTP/1.1");
m_proxy_data->req.set_method("CONNECT");
@@ -263,28 +264,28 @@ public:
return lib::error_code();
}
/// Call back a function after a period of time.
/**
* Sets a timer that calls back a function after the specified period of
* milliseconds. Returns a handle that can be used to cancel the timer.
* A cancelled timer will return the error code error::operation_aborted
* A timer that expired will return no error.
*
* @param duration Length of time to wait in milliseconds
*
* @param callback The function to call back when the timer has expired
*
* @return A handle that can be used to cancel the timer if it is no longer
* needed.
*/
timer_ptr set_timer(long duration, timer_handler callback) {
timer_ptr new_timer(
new boost::asio::deadline_timer(
*m_io_service,
boost::posix_time::milliseconds(duration)
)
);
/// Call back a function after a period of time.
/**
* Sets a timer that calls back a function after the specified period of
* milliseconds. Returns a handle that can be used to cancel the timer.
* A cancelled timer will return the error code error::operation_aborted
* A timer that expired will return no error.
*
* @param duration Length of time to wait in milliseconds
*
* @param callback The function to call back when the timer has expired
*
* @return A handle that can be used to cancel the timer if it is no longer
* needed.
*/
timer_ptr set_timer(long duration, timer_handler callback) {
timer_ptr new_timer(
new boost::asio::deadline_timer(
*m_io_service,
boost::posix_time::milliseconds(duration)
)
);
new_timer->async_wait(
lib::bind(
@@ -297,34 +298,33 @@ public:
);
return new_timer;
}
/// Timer callback
/**
* The timer pointer is included to ensure the timer isn't destroyed until
* after it has expired.
*
* @param t Pointer to the timer in question
*
* @param callback The function to call back
*
* @param ec The status code
*/
void handle_timer(timer_ptr t, timer_handler callback, const
}
/// Timer callback
/**
* The timer pointer is included to ensure the timer isn't destroyed until
* after it has expired.
*
* @param t Pointer to the timer in question
*
* @param callback The function to call back
*
* @param ec The status code
*/
void handle_timer(timer_ptr t, timer_handler callback, const
boost::system::error_code& ec)
{
if (ec) {
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
callback(make_error_code(transport::error::operation_aborted));
callback(make_error_code(transport::error::operation_aborted));
} else {
m_elog.write(log::elevel::info,
"asio handle_timer error: "+ec.message());
callback(make_error_code(error::pass_through));
log_err(log::elevel::info,"asio handle_timer",ec);
callback(make_error_code(error::pass_through));
}
} else {
callback(lib::error_code());
}
}
}
protected:
/// Initialize transport for reading
/**
@@ -371,7 +371,8 @@ protected:
callback(ec);
}
// If we have a proxy set issue a proxy connect, otherwise skip to post_init
// If we have a proxy set issue a proxy connect, otherwise skip to
// post_init
if (!m_proxy.empty()) {
proxy_write(callback);
} else {
@@ -420,19 +421,19 @@ protected:
m_proxy_data->write_buf.size()));
m_alog.write(log::alevel::devel,m_proxy_data->write_buf);
// Set a timer so we don't wait forever for the proxy to respond
m_proxy_data->timer = this->set_timer(
m_proxy_data->timeout_proxy,
lib::bind(
&type::handle_proxy_timeout,
this,
callback,
lib::placeholders::_1
)
);
// Send proxy request
// Set a timer so we don't wait forever for the proxy to respond
m_proxy_data->timer = this->set_timer(
m_proxy_data->timeout_proxy,
lib::bind(
&type::handle_proxy_timeout,
this,
callback,
lib::placeholders::_1
)
);
// Send proxy request
boost::asio::async_write(
socket_con_type::get_next_layer(),
m_bufs,
@@ -446,16 +447,18 @@ protected:
}
void handle_proxy_timeout(init_handler callback, const lib::error_code & ec) {
if (ec == transport::error::operation_aborted) {
m_alog.write(log::alevel::devel,"asio handle_proxy_write timer cancelled");
return;
} else if (ec) {
m_alog.write(log::alevel::devel,"asio handle_proxy_write timer error: "+ec.message());
callback(ec);
} else {
m_alog.write(log::alevel::devel,"asio handle_proxy_write timer expired");
callback(make_error_code(transport::error::timeout));
}
if (ec == transport::error::operation_aborted) {
m_alog.write(log::alevel::devel,
"asio handle_proxy_write timer cancelled");
return;
} else if (ec) {
log_err(log::elevel::devel,"asio handle_proxy_write",ec);
callback(ec);
} else {
m_alog.write(log::alevel::devel,
"asio handle_proxy_write timer expired");
callback(make_error_code(transport::error::timeout));
}
}
void handle_proxy_write(init_handler callback, const
@@ -468,8 +471,7 @@ protected:
m_bufs.clear();
if (ec) {
m_elog.write(log::elevel::info,
"asio handle_proxy_write error: "+ec.message());
log_err(log::elevel::info,"asio handle_proxy_write",ec);
m_proxy_data->timer->cancel();
callback(make_error_code(error::pass_through));
} else {
@@ -622,13 +624,7 @@ protected:
handler(make_error_code(transport::error::tls_short_read),
bytes_transferred);
} else {
// other error that we cannot translate into a WebSocket++
// transport error. Use pass through and print an info warning
// with the original error.
std::stringstream s;
s << "asio async_read_at_least error: "
<< ec << " (" << ec.message() << ")";
m_elog.write(log::elevel::info,s.str());
log_err(log::elevel::info,"asio async_read_at_least",ec);
handler(make_error_code(transport::error::pass_through),
bytes_transferred);
}
@@ -673,10 +669,7 @@ protected:
{
m_bufs.clear();
if (ec) {
std::stringstream s;
s << "asio async_write error: " << ec
<< " (" << ec.message() << ")";
m_elog.write(log::elevel::info,s.str());
log_err(log::elevel::info,"asio async_write",ec);
handler(make_error_code(transport::error::pass_through));
} else {
handler(lib::error_code());
@@ -739,24 +732,29 @@ protected:
}
if (ec) {
std::stringstream s;
s << "asio async_shutdown error: " << ec
<< " (" << ec.message() << ")";
m_elog.write(log::elevel::info,s.str());
log_err(log::elevel::info,"asio async_shutdown",ec);
h(make_error_code(transport::error::pass_through));
} else {
h(lib::error_code());
}
}
private:
/// Convenience method for logging the code and message for an error_code
std::string log_err(log::level l,const char * msg, lib::error_code & ec)
{
std::stringstream s;
s << msg << " error: " << ec << " (" << ec.message() << ")";
m_elog->write(l,s.str());
}
// static settings
const bool m_is_server;
alog_type& m_alog;
elog_type& m_elog;
struct proxy_data {
proxy_data() : timeout_proxy(config::timeout_proxy) {}
proxy_data() : timeout_proxy(config::timeout_proxy) {}
request_type req;
response_type res;
std::string write_buf;

View File

@@ -74,14 +74,16 @@ public:
/// Type of a shared pointer to the connection transport component
/// associated with this endpoint transport component
typedef typename transport_con_type::ptr transport_con_ptr;
/// Type of a pointer to the ASIO io_service being used
typedef boost::asio::io_service* io_service_ptr;
/// Type of a shared pointer to the acceptor being used
typedef lib::shared_ptr<boost::asio::ip::tcp::acceptor> acceptor_ptr;
/// Type of a shared pointer to the resolver being used
typedef lib::shared_ptr<boost::asio::ip::tcp::resolver> resolver_ptr;
/// Type of timer handle
typedef lib::shared_ptr<boost::asio::deadline_timer> timer_ptr;
// generate and manage our own io_service
explicit endpoint()
: m_external_io_service(false)
@@ -293,28 +295,68 @@ public:
listen(*endpoint_iterator);
}
typedef lib::shared_ptr<boost::asio::deadline_timer> timer_ptr;
timer_ptr set_timer(long duration, timer_handler handler) {
timer_ptr timer(new boost::asio::deadline_timer(*m_io_service));
timer->expires_from_now(boost::posix_time::milliseconds(duration));
timer->async_wait(lib::bind(&type::timer_handler, this, handler,
lib::placeholders::_1));
return timer;
}
void timer_handler(timer_handler h, const boost::system::error_code& ec) {
if (ec == boost::asio::error::operation_aborted) {
h(make_error_code(transport::error::operation_aborted));
} else if (ec) {
std::stringstream s;
s << "asio async_wait error: " << ec << " (" << ec.message() << ")";
m_elog->write(log::elevel::devel,s.str());
h(make_error_code(transport::error::pass_through));
/// Call back a function after a period of time.
/**
* Sets a timer that calls back a function after the specified period of
* milliseconds. Returns a handle that can be used to cancel the timer.
* A cancelled timer will return the error code error::operation_aborted
* A timer that expired will return no error.
*
* @param duration Length of time to wait in milliseconds
*
* @param callback The function to call back when the timer has expired
*
* @return A handle that can be used to cancel the timer if it is no longer
* needed.
*/
timer_ptr set_timer(long duration, timer_handler callback) {
timer_ptr new_timer(
new boost::asio::deadline_timer(
*m_io_service,
boost::posix_time::milliseconds(duration)
)
);
new_timer->async_wait(
lib::bind(
&type::handle_timer,
this,
new_timer,
callback,
lib::placeholders::_1
)
);
return new_timer;
}
/// Timer callback
/**
* The timer pointer is included to ensure the timer isn't destroyed until
* after it has expired.
*
* @param t Pointer to the timer in question
*
* @param callback The function to call back
*
* @param ec The status code
*/
void handle_timer(timer_ptr t, timer_handler callback, const
boost::system::error_code& ec)
{
if (ec) {
if (ec == boost::asio::error::operation_aborted) {
callback(make_error_code(transport::error::operation_aborted));
} else {
m_elog->write(log::elevel::info,
"asio handle_timer error: "+ec.message());
log_err(log::elevel::info,"asio handle_timer",ec);
callback(make_error_code(error::pass_through));
}
} else {
h(lib::error_code());
callback(lib::error_code());
}
}
}
boost::asio::io_service& get_io_service() {
return *m_io_service;
@@ -396,12 +438,25 @@ protected:
"starting async DNS resolve for "+host+":"+port);
}
timer_ptr dns_timer = tcon->set_timer(
config::timeout_dns,
lib::bind(
&type::handle_resolve_timeout,
this,
tcon,
dns_timer,
cb,
lib::placeholders::_1
)
);
m_resolver->async_resolve(
query,
lib::bind(
&type::handle_resolve,
this,
tcon,
dns_timer,
cb,
lib::placeholders::_1,
lib::placeholders::_2
@@ -409,28 +464,48 @@ protected:
);
}
void handle_resolve_timeout(transport_con_ptr tcon, timer_ptr dns_timer,
connect_handler callback, const lib::error_code & ec)
{
m_resolver->cancel();
if (ec == transport::error::operation_aborted) {
m_alog->write(log::alevel::devel,
"asio handle_resolve_timeout timer cancelled");
} else if (ec) {
log_err(log::elevel::devel,"asio handle_resolve_timeout",ec);
callback(tcon->get_handle(),ec);
} else {
m_alog->write(log::alevel::devel,
"asio handle_resolve_timeout timer expired");
callback(tcon->get_handle(),
make_error_code(transport::error::timeout));
}
}
void handle_resolve(transport_con_ptr tcon, connect_handler callback,
const boost::system::error_code& ec,
timer_ptr dns_timer, const boost::system::error_code& ec,
boost::asio::ip::tcp::resolver::iterator iterator)
{
dns_timer->cancel();
if (ec) {
//con->terminate();
// TODO: Better translation of errors at this point
std::stringstream s;
s << "asio async_resolve error:"
<< ec << " (" << ec.message() << ")";
m_elog->write(log::elevel::info,s.str());
if (ec == boost::asio::error::operation_aborted) {
m_alog->write(log::alevel::devel,
"asio handle_resolve resolve cancelled");
return;
}
log_err(log::elevel::info,"asio async_resolve",ec);
callback(tcon->get_handle(),make_error_code(error::pass_through));
return;
}
if (m_alog->static_test(log::alevel::devel)) {
std::stringstream s;
s << "Async DNS resolve successful. Results: ";
boost::asio::ip::tcp::resolver::iterator it, end;
for (it = iterator; it != end; ++it) {
s << (*it).endpoint() << " ";
}
@@ -456,12 +531,7 @@ protected:
const boost::system::error_code& ec)
{
if (ec) {
//con->terminate();
// TODO: Better translation of errors at this point
std::stringstream s;
s << "asio async_connect error: "
<< ec << " (" << ec.message() << ")";
m_elog->write(log::elevel::info,s.str());
log_err(log::elevel::info,"asio async_connect",ec);
callback(tcon->get_handle(),make_error_code(error::pass_through));
return;
}
@@ -506,6 +576,14 @@ protected:
return lib::error_code();
}
private:
/// Convenience method for logging the code and message for an error_code
std::string log_err(log::level l,const char * msg, lib::error_code & ec)
{
std::stringstream s;
s << msg << " error: " << ec << " (" << ec.message() << ")";
m_elog->write(l,s.str());
}
enum state {
UNINITIALIZED = 0,
READY = 1,