From 5065ff26157e90072a940c37396245fcec9eb09a Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Fri, 3 Feb 2012 09:25:56 -0600 Subject: [PATCH] preliminary work on endpoint/connection concurrency support fixes #63 --- Makefile | 2 +- examples/broadcast_server_tls/Makefile | 2 +- examples/echo_server/Makefile | 4 +-- examples/echo_server_tls/Makefile | 4 +-- src/connection.hpp | 3 +++ src/endpoint.hpp | 26 ++++++++++++++++++- src/roles/server.hpp | 35 +++++++++++++++++++++++--- 7 files changed, 65 insertions(+), 11 deletions(-) diff --git a/Makefile b/Makefile index 7ad4159ddb..edb9065692 100644 --- a/Makefile +++ b/Makefile @@ -33,7 +33,7 @@ objects = network_utilities.o sha1.o base64.o md5.o uri.o hybi_header.o data.o BOOST_LIB_PATH ?= /usr/local/lib BOOST_INCLUDE_PATH ?= /usr/local/include -libs = -L$(BOOST_LIB_PATH) -lboost_system -lboost_date_time -lboost_regex -lboost_random -lboost_program_options +libs = -L$(BOOST_LIB_PATH) -lboost_system -lboost_date_time -lboost_regex -lboost_random -lboost_program_options -lboost_thread //libs_static = $(BOOST_PATH)/boost_system.a $(BOOST_PATH)/boost_regex.a diff --git a/examples/broadcast_server_tls/Makefile b/examples/broadcast_server_tls/Makefile index baaa441c7c..5dc913f159 100644 --- a/examples/broadcast_server_tls/Makefile +++ b/examples/broadcast_server_tls/Makefile @@ -10,7 +10,7 @@ SHARED ?= "1" ifeq ($(SHARED), 1) LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lcrypto -lssl -lpthread -lwebsocketpp else - LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lcrypto -lssl -lpthread + LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lcrypto -lssl -lpthread endif broadcast_server: broadcast_server_tls.o diff --git a/examples/echo_server/Makefile b/examples/echo_server/Makefile index 5d34ad2e4b..b2ba630bdb 100644 --- a/examples/echo_server/Makefile +++ b/examples/echo_server/Makefile @@ -9,9 +9,9 @@ CXX ?= c++ SHARED ?= "1" ifeq ($(SHARED), 1) - LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_program_options -lpthread -lwebsocketpp + LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_program_options -lboost_thread -lpthread -lwebsocketpp else - LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lpthread + LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lpthread endif echo_server: echo_server.o diff --git a/examples/echo_server_tls/Makefile b/examples/echo_server_tls/Makefile index 4593ef8b1d..c239aee506 100644 --- a/examples/echo_server_tls/Makefile +++ b/examples/echo_server_tls/Makefile @@ -8,9 +8,9 @@ CXX ?= c++ SHARED ?= "1" ifeq ($(SHARED), 1) - LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lcrypto -lssl -lpthread -lwebsocketpp + LDFLAGS := $(LDFLAGS) -lboost_system -lboost_date_time -lboost_thread -lcrypto -lssl -lpthread -lwebsocketpp else - LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a -lcrypto -lssl -lpthread + LDFLAGS := $(LDFLAGS) ../../libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a -lcrypto -lssl -lpthread endif echo_server_tls: echo_server_tls.o diff --git a/src/connection.hpp b/src/connection.hpp index 04d6b6a4fb..014dc149ee 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -745,6 +745,9 @@ protected: // Read queue read_state m_read_state; message::control_ptr m_control_message; + + // concurrency support + boost::recursive_mutex m_lock; }; // connection related types that it and its policy classes need. diff --git a/src/endpoint.hpp b/src/endpoint.hpp index bd6aac5a64..48f0e354b4 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -34,6 +34,7 @@ #include #include +#include #include #include @@ -180,6 +181,8 @@ public: * be NULL. */ void set_handler(handler_ptr new_handler) { + boost::lock_guard lock(get_lock()); + if (!new_handler) { elog().at(log::elevel::FATAL) << "Tried to switch to a NULL handler." << log::endl; @@ -203,6 +206,8 @@ public: void close_all(close::status::value code = close::status::GOING_AWAY, const std::string& reason = "") { + boost::lock_guard lock(get_lock()); + alog().at(log::alevel::ENDPOINT) << "Endpoint received signal to close all connections cleanly with code " << code << " and reason " << reason << log::endl; @@ -237,7 +242,9 @@ public: void stop(bool clean = true, close::status::value code = close::status::GOING_AWAY, const std::string& reason = "") - { + { + boost::lock_guard lock(get_lock()); + if (clean) { alog().at(log::alevel::ENDPOINT) << "Endpoint is stopping cleanly" << log::endl; @@ -268,6 +275,8 @@ protected: * shared pointer if one could not be created. */ connection_ptr create_connection() { + boost::lock_guard lock(get_lock()); + if (m_state == STOPPING || m_state == STOPPED) { return connection_ptr(); } @@ -293,6 +302,8 @@ protected: * @param con A shared pointer to a connection created by this endpoint. */ void remove_connection(connection_ptr con) { + boost::lock_guard lock(get_lock()); + m_connections.erase(con); alog().at(log::alevel::DEVEL) << "Connection removed: count is now: " @@ -314,11 +325,13 @@ protected: } /// Gets a shared pointer to a read/write data message. + // TODO: thread safety message::data::ptr get_data_message() { return m_pool->get(); } /// Gets a shared pointer to a read/write control message. + // TODO: thread safety message::data::ptr get_control_message() { return m_pool_control->get(); } @@ -326,6 +339,8 @@ protected: /// Asks the endpoint to restart this connection's handle_read_frame loop /// when there are avaliable data messages. void wait(connection_ptr con) { + boost::lock_guard lock(get_lock()); + m_read_waiting.push(con); alog().at(log::alevel::DEVEL) << "connection " << con << " is waiting. " << m_read_waiting.size() << log::endl; } @@ -333,6 +348,8 @@ protected: /// Message pool callback indicating that there is a free data message /// avaliable. Causes one waiting connection to get restarted. void on_new_message() { + boost::lock_guard lock(get_lock()); + if (!m_read_waiting.empty()) { connection_ptr next = m_read_waiting.front(); @@ -344,6 +361,10 @@ protected: } } + + boost::recursive_mutex& get_lock() { + return m_lock; + } private: enum state { IDLE = 0, @@ -362,6 +383,9 @@ private: message::pool::ptr m_pool; message::pool::ptr m_pool_control; std::queue m_read_waiting; + + // concurrency support + boost::recursive_mutex m_lock; }; /// traits class that allows looking up relevant endpoint types by the fully diff --git a/src/roles/server.hpp b/src/roles/server.hpp index 2f9e641a0a..9bbaedbd07 100644 --- a/src/roles/server.hpp +++ b/src/roles/server.hpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -183,6 +184,7 @@ public: // this constructor, which also requires a port. This port number can be // ignored, as it is always overwriten later by the listen() member func m_acceptor(m), + m_state(IDLE), m_timer(m,boost::posix_time::seconds(0)) {} void listen(uint16_t port); @@ -221,13 +223,20 @@ private: endpoint_type& m_endpoint; boost::asio::io_service& m_io_service; boost::asio::ip::tcp::acceptor m_acceptor; + state m_state; boost::asio::deadline_timer m_timer; }; template void server::listen(const boost::asio::ip::tcp::endpoint& e) { - m_acceptor.open(e.protocol()); + boost::lock_guard lock(m_endpoint.get_lock()); + + if (m_state != IDLE) { + throw exception("listen called from invalid state."); + } + + m_acceptor.open(e.protocol()); m_acceptor.set_option(boost::asio::socket_base::reuse_address(true)); m_acceptor.bind(e); m_acceptor.listen(); @@ -237,7 +246,6 @@ void server::listen(const boost::asio::ip::tcp::endpoint& e) { } // server Implimentation -// TODO: protect listen from being called twice or in the wrong state. // TODO: provide a way to stop/reset the server endpoint template void server::listen(uint16_t port) { @@ -258,6 +266,8 @@ void server::listen(const std::string &host, const std::string &servic template void server::start_accept() { + boost::lock_guard lock(m_endpoint.get_lock()); + connection_ptr con = m_endpoint.create_connection(); if (con == connection_ptr()) { @@ -267,7 +277,7 @@ void server::start_accept() { << log::endl; return; } - + m_acceptor.async_accept( con->get_raw_socket(), boost::bind( @@ -286,6 +296,8 @@ template void server::handle_accept(connection_ptr con, const boost::system::error_code& error) { + boost::lock_guard lock(m_endpoint.get_lock()); + if (error) { if (error == boost::system::errc::too_many_files_open) { m_endpoint.elog().at(log::elevel::ERROR) @@ -316,6 +328,8 @@ template void server::connection::select_subprotocol( const std::string& value) { + // TODO: should this be locked? + std::vector::iterator it; it = std::find(m_requested_subprotocols.begin(), @@ -334,6 +348,8 @@ template void server::connection::select_extension( const std::string& value) { + // TODO: should this be locked? + if (value == "") { return; } @@ -357,6 +373,8 @@ template void server::connection::set_body( const std::string& value) { + // TODO: should this be locked? + if (m_connection.m_version != -1) { // TODO: throw exception throw std::invalid_argument("set_body called from invalid state"); @@ -365,10 +383,15 @@ void server::connection::set_body( m_response.set_body(value); } - +/// initiates an async read for an HTTP header +/** + * Thread Safety: locks connection + */ template template void server::connection::async_init() { + boost::lock_guard lock(m_connection.m_lock); + boost::asio::async_read_until( m_connection.get_socket(), m_connection.buffer(), @@ -382,6 +405,10 @@ void server::connection::async_init() { ); } +/// processes the response from an async read for an HTTP header +/** + * Thread Safety: async asio calls are not thread safe + */ template template void server::connection::handle_read_request(