preliminary work on endpoint/connection concurrency support fixes #63

This commit is contained in:
Peter Thorson
2012-02-03 09:25:56 -06:00
parent 30455f985f
commit 5065ff2615
7 changed files with 65 additions and 11 deletions

View File

@@ -33,7 +33,7 @@ objects = network_utilities.o sha1.o base64.o md5.o uri.o hybi_header.o data.o
BOOST_LIB_PATH ?= /usr/local/lib
BOOST_INCLUDE_PATH ?= /usr/local/include
libs = -L$(BOOST_LIB_PATH) -lboost_system -lboost_date_time -lboost_regex -lboost_random -lboost_program_options
libs = -L$(BOOST_LIB_PATH) -lboost_system -lboost_date_time -lboost_regex -lboost_random -lboost_program_options -lboost_thread
//libs_static = $(BOOST_PATH)/boost_system.a $(BOOST_PATH)/boost_regex.a

View File

@@ -10,7 +10,7 @@ SHARED ?= "1"
ifeq ($(SHARED), 1)
LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lcrypto -lssl -lpthread -lwebsocketpp
else
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lcrypto -lssl -lpthread
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lcrypto -lssl -lpthread
endif
broadcast_server: broadcast_server_tls.o

View File

@@ -9,9 +9,9 @@ CXX ?= c++
SHARED ?= "1"
ifeq ($(SHARED), 1)
LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_program_options -lpthread -lwebsocketpp
LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_program_options -lboost_thread -lpthread -lwebsocketpp
else
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lpthread
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lpthread
endif
echo_server: echo_server.o

View File

@@ -8,9 +8,9 @@ CXX ?= c++
SHARED ?= "1"
ifeq ($(SHARED), 1)
LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lcrypto -lssl -lpthread -lwebsocketpp
LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_thread -lcrypto -lssl -lpthread -lwebsocketpp
else
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lcrypto -lssl -lpthread
LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lcrypto -lssl -lpthread
endif
echo_server_tls: echo_server_tls.o

View File

@@ -745,6 +745,9 @@ protected:
// Read queue
read_state m_read_state;
message::control_ptr m_control_message;
// concurrency support
boost::recursive_mutex m_lock;
};
// connection related types that it and its policy classes need.

View File

@@ -34,6 +34,7 @@
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <boost/utility.hpp>
#include <iostream>
@@ -180,6 +181,8 @@ public:
* be NULL.
*/
void set_handler(handler_ptr new_handler) {
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
if (!new_handler) {
elog().at(log::elevel::FATAL)
<< "Tried to switch to a NULL handler." << log::endl;
@@ -203,6 +206,8 @@ public:
void close_all(close::status::value code = close::status::GOING_AWAY,
const std::string& reason = "")
{
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
alog().at(log::alevel::ENDPOINT)
<< "Endpoint received signal to close all connections cleanly with code "
<< code << " and reason " << reason << log::endl;
@@ -237,7 +242,9 @@ public:
void stop(bool clean = true,
close::status::value code = close::status::GOING_AWAY,
const std::string& reason = "")
{
{
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
if (clean) {
alog().at(log::alevel::ENDPOINT)
<< "Endpoint is stopping cleanly" << log::endl;
@@ -268,6 +275,8 @@ protected:
* shared pointer if one could not be created.
*/
connection_ptr create_connection() {
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
if (m_state == STOPPING || m_state == STOPPED) {
return connection_ptr();
}
@@ -293,6 +302,8 @@ protected:
* @param con A shared pointer to a connection created by this endpoint.
*/
void remove_connection(connection_ptr con) {
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
m_connections.erase(con);
alog().at(log::alevel::DEVEL) << "Connection removed: count is now: "
@@ -314,11 +325,13 @@ protected:
}
/// Gets a shared pointer to a read/write data message.
// TODO: thread safety
message::data::ptr get_data_message() {
return m_pool->get();
}
/// Gets a shared pointer to a read/write control message.
// TODO: thread safety
message::data::ptr get_control_message() {
return m_pool_control->get();
}
@@ -326,6 +339,8 @@ protected:
/// Asks the endpoint to restart this connection's handle_read_frame loop
/// when there are avaliable data messages.
void wait(connection_ptr con) {
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
m_read_waiting.push(con);
alog().at(log::alevel::DEVEL) << "connection " << con << " is waiting. " << m_read_waiting.size() << log::endl;
}
@@ -333,6 +348,8 @@ protected:
/// Message pool callback indicating that there is a free data message
/// avaliable. Causes one waiting connection to get restarted.
void on_new_message() {
boost::lock_guard<boost::recursive_mutex> lock(get_lock());
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
@@ -344,6 +361,10 @@ protected:
}
}
boost::recursive_mutex& get_lock() {
return m_lock;
}
private:
enum state {
IDLE = 0,
@@ -362,6 +383,9 @@ private:
message::pool<message::data>::ptr m_pool;
message::pool<message::data>::ptr m_pool_control;
std::queue<connection_ptr> m_read_waiting;
// concurrency support
boost::recursive_mutex m_lock;
};
/// traits class that allows looking up relevant endpoint types by the fully

View File

@@ -39,6 +39,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <iostream>
#include <stdexcept>
@@ -183,6 +184,7 @@ public:
// this constructor, which also requires a port. This port number can be
// ignored, as it is always overwriten later by the listen() member func
m_acceptor(m),
m_state(IDLE),
m_timer(m,boost::posix_time::seconds(0)) {}
void listen(uint16_t port);
@@ -221,13 +223,20 @@ private:
endpoint_type& m_endpoint;
boost::asio::io_service& m_io_service;
boost::asio::ip::tcp::acceptor m_acceptor;
state m_state;
boost::asio::deadline_timer m_timer;
};
template <class endpoint>
void server<endpoint>::listen(const boost::asio::ip::tcp::endpoint& e) {
m_acceptor.open(e.protocol());
boost::lock_guard<boost::recursive_mutex> lock(m_endpoint.get_lock());
if (m_state != IDLE) {
throw exception("listen called from invalid state.");
}
m_acceptor.open(e.protocol());
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor.bind(e);
m_acceptor.listen();
@@ -237,7 +246,6 @@ void server<endpoint>::listen(const boost::asio::ip::tcp::endpoint& e) {
}
// server<endpoint> Implimentation
// TODO: protect listen from being called twice or in the wrong state.
// TODO: provide a way to stop/reset the server endpoint
template <class endpoint>
void server<endpoint>::listen(uint16_t port) {
@@ -258,6 +266,8 @@ void server<endpoint>::listen(const std::string &host, const std::string &servic
template <class endpoint>
void server<endpoint>::start_accept() {
boost::lock_guard<boost::recursive_mutex> lock(m_endpoint.get_lock());
connection_ptr con = m_endpoint.create_connection();
if (con == connection_ptr()) {
@@ -267,7 +277,7 @@ void server<endpoint>::start_accept() {
<< log::endl;
return;
}
m_acceptor.async_accept(
con->get_raw_socket(),
boost::bind(
@@ -286,6 +296,8 @@ template <class endpoint>
void server<endpoint>::handle_accept(connection_ptr con,
const boost::system::error_code& error)
{
boost::lock_guard<boost::recursive_mutex> lock(m_endpoint.get_lock());
if (error) {
if (error == boost::system::errc::too_many_files_open) {
m_endpoint.elog().at(log::elevel::ERROR)
@@ -316,6 +328,8 @@ template <class connection_type>
void server<endpoint>::connection<connection_type>::select_subprotocol(
const std::string& value)
{
// TODO: should this be locked?
std::vector<std::string>::iterator it;
it = std::find(m_requested_subprotocols.begin(),
@@ -334,6 +348,8 @@ template <class connection_type>
void server<endpoint>::connection<connection_type>::select_extension(
const std::string& value)
{
// TODO: should this be locked?
if (value == "") {
return;
}
@@ -357,6 +373,8 @@ template <class connection_type>
void server<endpoint>::connection<connection_type>::set_body(
const std::string& value)
{
// TODO: should this be locked?
if (m_connection.m_version != -1) {
// TODO: throw exception
throw std::invalid_argument("set_body called from invalid state");
@@ -365,10 +383,15 @@ void server<endpoint>::connection<connection_type>::set_body(
m_response.set_body(value);
}
/// initiates an async read for an HTTP header
/**
* Thread Safety: locks connection
*/
template <class endpoint>
template <class connection_type>
void server<endpoint>::connection<connection_type>::async_init() {
boost::lock_guard<boost::recursive_mutex> lock(m_connection.m_lock);
boost::asio::async_read_until(
m_connection.get_socket(),
m_connection.buffer(),
@@ -382,6 +405,10 @@ void server<endpoint>::connection<connection_type>::async_init() {
);
}
/// processes the response from an async read for an HTTP header
/**
* Thread Safety: async asio calls are not thread safe
*/
template <class endpoint>
template <class connection_type>
void server<endpoint>::connection<connection_type>::handle_read_request(