concurrency review part 1

This commit is contained in:
Peter Thorson
2012-03-12 10:03:54 -05:00
parent a3001a4c7c
commit f32ddff367
5 changed files with 239 additions and 91 deletions

View File

@@ -71,8 +71,7 @@ namespace websocketpp {
CONNECTING = 0,
OPEN = 1,
CLOSING = 2,
CLOSED = 3,
DETACHED = 4,
CLOSED = 3
};
}
}

View File

@@ -63,7 +63,8 @@ template <
class connection
: public role< connection<endpoint,role,socket> >,
public socket< connection<endpoint,role,socket> >,
public boost::enable_shared_from_this< connection<endpoint,role,socket> >
public boost::enable_shared_from_this< connection<endpoint,role,socket> >,
boost::noncopyable
{
public:
typedef connection_traits< connection<endpoint,role,socket> > traits;
@@ -110,46 +111,84 @@ public:
m_write_buffer(0),
m_write_state(IDLE),
m_remote_close_code(close::status::ABNORMAL_CLOSE),
m_read_state(READING)
m_read_state(READING),
m_strand(e.endpoint_base::m_io_service),
m_detached(false)
{
socket_type::init();
// This should go away
m_control_message = message::control_ptr(new message::control());
}
// SHOULD BE PROTECTED
/// Destroy the connection
connection() {
if (m_state != session::state::CLOSED) {
terminate(true);
}
}
// copy/assignment constructors require C++11
// boost::noncopyable is being used in the meantime.
// connection(connection const&) = delete;
// connection& operator=(connection const&) = delete
/// Start the websocket connection async read loop
/* Begins the connection's async read loop. First any socket level
* initialization will happen (TLS handshake, etc) then the handshake and
* frame reads will start.
*
* Visibility: protected
* State: Should only be called once by the endpoint.
* Concurrency: safe as long as state is valid
*/
void start() {
// initialize the socket.
socket_type::async_init(
boost::bind(
m_strand.wrap(boost::bind(
&type::handle_socket_init,
type::shared_from_this(),
boost::asio::placeholders::error
)
))
);
}
// END PROTECTED
// Valid always
/// Return current connection state
/* Visibility: public
* State: valid always
* Concurrency: callable from anywhere
*/
session::state::value get_state() const {
// TODO: ensure read of m_state is atomic
return m_state;
}
/// Signals to the connection that its endpoint is going away.
/* The connection should not use any references to the endpoint after this
/// Detaches the connection from its endpoint
/* Called by the m_endpoint's destructor. In state DETACHED m_endpoint is
* no longer avaliable. The connection may stick around if the end user
* application needs to read state from it (ie close reasons, etc) but no
* operations requring the endpoint can be performed.
*
* Visibility: protected
* State: Should only be called once by the endpoint.
* Concurrency: safe as long as state is valid
*/
void detach() {
// TODO: lock connection state
m_state = session::state::DETACHED;
// TODO: ensure write of m_detached is atomic
m_detached = true;
}
// Valid for OPEN state
/// convenience overload for sending a one off text message.
/// convenience overload for sending a one off message.
/* Creates a message, fills in payload, and queues a write as a message of
* type op. Default type is TEXT.
*
* Visibility: public
* State: Valid from OPEN, ignored otherwise
* Concurrency: callable from any thread
*/
void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT) {
// TODO: do we need a lock?
if (m_state != session::state::OPEN) {
return;
}
// TODO: ensure read of m_state is atomic
if (m_state != session::state::OPEN) {return;}
websocketpp::message::data::ptr msg = get_control_message2();
@@ -164,42 +203,138 @@ public:
msg->set_payload(payload);
send(msg);
}
/// Send message
/* Prepares (if necessary) and sends the given message
*
* Visibility: public
* State: Valid from OPEN, ignored otherwise
* Concurrency: callable from any thread
*/
void send(message::data_ptr msg) {
// TODO: do we need a lock?
if (m_state != session::state::OPEN) {
return;
}
// TODO: need to make sure access to m_state is atomic
if (m_state != session::state::OPEN) {return;}
m_processor->prepare_frame(msg);
write_message(msg);
m_endpoint.endpoint_base::m_io_service.post(
m_strand.wrap(boost::bind(
&type::write_message,
type::shared_from_this(),
msg
))
);
}
// TODO: overloads without code or reason?
/// Close connection
/* Closes the websocket connection with the given status code and reason.
* From state OPEN a clean connection close is initiated. From any other
* state the socket will be closed and the connection cleaned up.
*
* There is no feedback directly from close. Feedback will be provided via
* the on_fail or on_close callbacks.
*
* Visibility: public
* State: Valid from OPEN, ignored otherwise
* Concurrency: callable from any thread
*/
void close(close::status::value code, const std::string& reason = "") {
// TODO: need to make sure access to m_state is atomic
if (m_state == session::state::CONNECTING) {
terminate(true);
m_endpoint.endpoint_base::m_io_service.post(
m_strand.wrap(boost::bind(
&type::terminate,
type::shared_from_this(),
true
))
);
} else if (m_state == session::state::OPEN) {
m_endpoint.endpoint_base::m_io_service.post(
m_strand.wrap(boost::bind(
&type::send_close,
type::shared_from_this(),
code,
reason
))
);
} else {
send_close(code, reason);
// in CLOSING state we are already closing, nothing to do
// in CLOSED state we are already closed, nothing to do
}
}
void ping(const std::string& payload) {
// TODO: do we need a lock?
if (m_state != session::state::OPEN) {
return;
}
send_ping(payload);
}
void pong(const std::string& payload) {
// TODO: do we need a lock?
if (m_state != session::state::OPEN) {
return;
}
send_pong(payload);
}
/// Send Ping
/* Initiates a ping with the given payload.
*
* There is no feedback directly from ping. Feedback will be provided via
* the on_pong or on_pong_timeout callbacks.
*
* Visibility: public
* State: Valid from OPEN, ignored otherwise
* Concurrency: callable from any thread
*/
void ping(const std::string& payload) {
// TODO: need to make sure access to m_state is atomic
if (m_state != session::state::OPEN) {return;}
if (m_detached) {return;}
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_control_message2();
control->reset(frame::opcode::PING);
control->set_payload(payload);
m_processor->prepare_frame(control);
m_endpoint.endpoint_base::m_io_service.post(
m_strand.wrap(boost::bind(
&type::write_message,
type::shared_from_this(),
control
))
);
}
/// Send Pong
/* Initiates a pong with the given payload.
*
* There is no feedback from pong.
*
* Visibility: public
* State: Valid from OPEN, ignored otherwise
* Concurrency: callable from any thread
*/
void pong(const std::string& payload) {
// TODO: need to make sure access to m_state is atomic
if (m_state != session::state::OPEN) {return;}
if (m_detached) {return;}
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_control_message2();
control->reset(frame::opcode::PONG);
control->set_payload(payload);
m_processor->prepare_frame(control);
m_endpoint.endpoint_base::m_io_service.post(
m_strand.wrap(boost::bind(
&type::write_message,
type::shared_from_this(),
control
))
);
}
/// Return send buffer size (payload bytes)
/* Initiates a pong with the given payload.
*
* There is no feedback from pong.
*
* Visibility: public
* State: Valid from any state.
* Concurrency: callable from any thread
*/
uint64_t buffered_amount() const {
// TODO: ensure m_write_buffer is accessed atomically
return m_write_buffer;
}
@@ -398,7 +533,7 @@ public:
response = get_handler()->on_ping(type::shared_from_this(),
msg->get_payload());
if (response) {
send_pong(msg->get_payload());
pong(msg->get_payload());
}
break;
case frame::opcode::PONG:
@@ -436,7 +571,17 @@ public:
}
}
/// Send a close frame
/* Initiates a close handshake by sending a close frame with the given code
* and reason.
*
* Visibility: protected
* State: Valid for OPEN, ignored otherwise.
* Concurrency: Must be called within m_strand
*/
void send_close(close::status::value code, const std::string& reason) {
if (m_detached) {return;}
if (m_state != session::state::OPEN) {
m_endpoint.elog().at(log::elevel::WARN)
<< "Tried to disconnect a session that wasn't open"
@@ -462,11 +607,11 @@ public:
m_timer.expires_from_now(boost::posix_time::milliseconds(1000));
m_timer.async_wait(
boost::bind(
m_strand.wrap(boost::bind(
&type::fail_on_expire,
type::shared_from_this(),
boost::asio::placeholders::error
)
))
);
m_local_close_code = code;
@@ -544,26 +689,6 @@ public:
m_write_state = INTURRUPT;
}
void send_ping(const std::string& payload) {
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_control_message2();
control->reset(frame::opcode::PING);
control->set_payload(payload);
m_processor->prepare_frame(control);
write_message(control);
}
void send_pong(const std::string& payload) {
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_control_message2();
control->reset(frame::opcode::PONG);
control->set_payload(payload);
m_processor->prepare_frame(control);
write_message(control);
}
void write_message(message::data_ptr msg) {
if (m_write_state == INTURRUPT) {
return;
@@ -612,11 +737,11 @@ public:
socket_type::get_socket(),
m_write_buf,
//m_write_queue.front()->get_buffer(),
boost::bind(
m_strand.wrap(boost::bind(
&type::handle_write,
type::shared_from_this(),
boost::asio::placeholders::error
)
))
);
} else {
// if we are in an inturrupted state and had nothing else to write
@@ -660,23 +785,27 @@ public:
write();
}
// terminate cleans up a connection and removes it from the endpoint's
// connection list.
/// Ends the connection by cleaning up based on current state
/* Terminate will review the outstanding resources and close each
* appropriately. Attached handlers will recieve an on_fail or on_close call
*
* TODO: should we protect against long running handlers?
*
* Visibility: protected
* State: Valid from any state except CLOSED.
* Concurrency: Must be called from within m_strand
*/
void terminate(bool failed_by_me) {
m_endpoint.alog().at(log::alevel::DEBUG_CLOSE)
<< "terminate called" << log::endl;
if (m_state == session::state::CLOSED) {
// shouldn't be here
}
// If state is closed it either means terminate was called twice or
// something other than this library called it. In either case running
// it will only cause problems
if (m_state == session::state::CLOSED) {return;}
// TODO: ensure any other timers are accounted for here.
// cancel the close timeout
m_timer.cancel();
// If this was a websocket connection notify the application handler
// about the close using either on_fail or on_close
// version -1 is an HTTP (non-websocket) connection.
if (role_type::get_version() != -1) {
// TODO: note, calling shutdown on the ssl socket for an HTTP
// connection seems to cause shutdown to block for a very long time.
@@ -692,16 +821,23 @@ public:
if (old_state == session::state::CONNECTING) {
get_handler()->on_fail(type::shared_from_this());
} else if (old_state == session::state::OPEN ||
old_state == session::state::CLOSING) {
old_state == session::state::CLOSING)
{
get_handler()->on_close(type::shared_from_this());
} else {
// if we were already closed something is wrong
}
log_close_result();
if (!m_detached) {
log_close_result();
}
}
// finally remove this connection from the endpoint's list. This will
// remove the last shared pointer to the connection held by WS++.
m_endpoint.remove_connection(type::shared_from_this());
// remove the last shared pointer to the connection held by WS++. If we
// are DETACHED this has already been done and can't be done again.
if (!m_detached) {
m_endpoint.remove_connection(type::shared_from_this());
}
}
// this is called when an async asio call encounters an error
@@ -742,6 +878,10 @@ public:
handler_ptr get_handler() {
return m_handler;
}
boost::asio::strand& get_strand() {
return m_strand;
}
public:
//protected: TODO: figure out why VCPP2010 doesn't like protected here
endpoint_type& m_endpoint;
@@ -779,6 +919,8 @@ public:
// concurrency support
boost::recursive_mutex m_lock;
boost::asio::strand m_strand;
bool m_detached; // TODO: this should be atomic
};
// connection related types that it and its policy classes need.

View File

@@ -73,7 +73,7 @@ class endpoint
: public endpoint_base,
public role< endpoint<role,socket,logger> >,
public socket< endpoint<role,socket,logger> >,
boost::noncopyable
boost::noncopyable
{
public:
/// Type of the traits class that stores endpoint related types.
@@ -137,7 +137,7 @@ public:
m_pool->set_callback(boost::bind(&type::on_new_message,this));
}
/// Destroy and endpoint
/// Destroy an endpoint
~endpoint() {
// Tell the memory pool we don't want to be notified about newly freed
// messages any more (because we wont be here)
@@ -146,6 +146,8 @@ public:
// Detach any connections that are still alive at this point
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
typename std::set<connection_ptr>::iterator it;
while (!m_connections.empty()) {
remove_connection(*m_connections.begin());
}
@@ -319,7 +321,7 @@ protected:
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
// TODO: is this safe to use?
//con->detach();
con->detach();
m_connections.erase(con);

View File

@@ -208,14 +208,18 @@ public:
// Optional
virtual bool on_ping(connection_ptr con,std::string) {return true;}
virtual void on_pong(connection_ptr con,std::string) {}
virtual void on_pong_timeout(connection_ptr con,std::string) {}
};
client (boost::asio::io_service& m)
: m_endpoint(static_cast< endpoint_type& >(*this)),
m_io_service(m),
m_gen(m_rng,boost::random::uniform_int_distribution<>(std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max())) {}
m_gen(m_rng,
boost::random::uniform_int_distribution<>(
std::numeric_limits<int32_t>::min(),
std::numeric_limits<int32_t>::max()
)) {}
connection_ptr connect(const std::string& u);

View File

@@ -180,6 +180,7 @@ public:
virtual bool on_ping(connection_ptr con,std::string) {return true;}
virtual void on_pong(connection_ptr con,std::string) {}
virtual void on_pong_timeout(connection_ptr con,std::string) {}
virtual void http(connection_ptr con) {}
};