updates endpoint concurrency features

This commit is contained in:
Peter Thorson
2013-01-20 08:56:11 -06:00
parent eb1603177f
commit 9a7631b2da
3 changed files with 76 additions and 31 deletions

View File

@@ -69,7 +69,7 @@ public:
typedef typename connection_type::message_handler message_handler;
/// Type of message pointers that this endpoint uses
typedef typename connection_type::message_ptr message_ptr;
// TODO: organize these
typedef typename connection_type::termination_handler termination_handler;
@@ -89,9 +89,10 @@ public:
*
* The default value for this version is WebSocket++/0.3.0dev
*
* @return A reference to the user agent string.
* @return The user agent string.
*/
const std::string& get_user_agent() const {
std::string get_user_agent() const {
scoped_lock_type guard(m_mutex);
return m_user_agent;
}
@@ -104,35 +105,73 @@ public:
*
* The default value for this version is WebSocket++/0.3.0dev
*
* @return A reference to the user agent string.
* @param ua The string to set the user agent to.
*/
void set_user_agent(const std::string& ua) {
scoped_lock_type guard(m_mutex);
m_user_agent = ua;
}
bool is_server() const {return m_is_server;}
/// Returns whether or not this endpoint is a server.
/**
* @return Whether or not this endpoint is a server
*/
bool is_server() const {
return m_is_server;
}
/*************************/
/* Set Handler functions */
/*************************/
void set_open_handler(open_handler h) {m_open_handler = h;}
void set_close_handler(close_handler h) {m_close_handler = h;}
void set_fail_handler(fail_handler h) {m_fail_handler = h;}
void set_ping_handler(ping_handler h) {m_ping_handler = h;}
void set_pong_handler(pong_handler h) {m_pong_handler = h;}
void set_open_handler(open_handler h) {
scoped_lock_type guard(m_mutex);
m_open_handler = h;
}
void set_close_handler(close_handler h) {
scoped_lock_type guard(m_mutex);
m_close_handler = h;
}
void set_fail_handler(fail_handler h) {
scoped_lock_type guard(m_mutex);
m_fail_handler = h;
}
void set_ping_handler(ping_handler h) {
scoped_lock_type guard(m_mutex);
m_ping_handler = h;
}
void set_pong_handler(pong_handler h) {
scoped_lock_type guard(m_mutex);
m_pong_handler = h;
}
void set_pong_timeout_handler(pong_timeout_handler h) {
scoped_lock_type guard(m_mutex);
m_pong_timeout_handler = h;
}
void set_interrupt_handler(interrupt_handler h) {m_interrupt_handler = h;}
void set_http_handler(http_handler h) {m_http_handler = h;}
void set_validate_handler(validate_handler h) {m_validate_handler = h;}
void set_message_handler(message_handler h) {m_message_handler = h;}
void set_interrupt_handler(interrupt_handler h) {
scoped_lock_type guard(m_mutex);
m_interrupt_handler = h;
}
void set_http_handler(http_handler h) {
scoped_lock_type guard(m_mutex);
m_http_handler = h;
}
void set_validate_handler(validate_handler h) {
scoped_lock_type guard(m_mutex);
m_validate_handler = h;
}
void set_message_handler(message_handler h) {
scoped_lock_type guard(m_mutex);
m_message_handler = h;
}
/*************************************/
/* Connection pass through functions */
/*************************************/
/**
* Is thread safe if transport is
*/
void interrupt(connection_hdl hdl, lib::error_code & ec) {
connection_ptr con = get_con_from_hdl(hdl);
if (!con) {
@@ -144,7 +183,10 @@ public:
ec = con->interrupt();
}
/**
* Is thread safe if transport is
*/
void interrupt(connection_hdl hdl) {
lib::error_code ec;
interrupt(hdl,ec);
@@ -196,16 +238,19 @@ protected:
/// Retrieves a connection_ptr from a connection_hdl
/**
* Converting a weak pointer to shared_ptr is not thread safe because the
* pointer could be deleted at any time.
*
* @param hdl The connection handle to translate
*
* @return the connection_ptr. May be NULL if the handle was invalid.
*/
connection_ptr get_con_from_hdl(connection_hdl hdl) {
return lib::static_pointer_cast<connection_type,void>(hdl.lock());
scoped_lock_type lock(m_mutex);
connection_ptr con = lib::static_pointer_cast<connection_type,void>(
hdl.lock());
return con;
}
// protected resources
mutex_type m_mutex;
private:
// dynamic settings
std::string m_user_agent;
@@ -228,7 +273,7 @@ private:
const bool m_is_server;
// endpoint state
mutex_type m_state_lock;
mutex_type m_mutex;
};
} // namespace websocketpp

View File

@@ -75,6 +75,8 @@ endpoint<connection,config>::create_connection() {
)
);
transport_type::init(con);
scoped_lock_type lock(m_mutex);
m_connections.insert(con);
@@ -88,7 +90,7 @@ template <typename connection, typename config>
void endpoint<connection,config>::remove_connection(connection_ptr con) {
std::cout << "remove_connection. New count: " << m_connections.size()-1
<< std::endl;
scoped_lock_type lock(m_state_lock);
scoped_lock_type lock(m_mutex);
// unregister the termination handler
con->set_termination_handler(termination_handler());

View File

@@ -236,16 +236,14 @@ protected:
void set_handle(connection_hdl hdl) {
m_connection_hdl = hdl;
}
/// Trigger the on_interrupt handler
/**
* This needs to be thread safe
*
* Might need a strand at some point?
*/
lib::error_code interrupt(inturrupt_handler handler) {
// strand post handle_inturrupt
/*m_strand->post(lib::bind(
&type::handle_inturrupt,
this,
handler
));*/
// would this work?
m_io_service->post(handler);
return lib::error_code();
}