From 9348aa4c217a730ef364abe22208e9b2204d9da4 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Wed, 21 Mar 2012 22:37:04 -0600 Subject: [PATCH] adds thread pool support to server --- src/common.hpp | 3 +- src/roles/server.hpp | 66 +++++++++++++++++++++++++++++--------------- 2 files changed, 46 insertions(+), 23 deletions(-) diff --git a/src/common.hpp b/src/common.hpp index e121cf025b..a2ff5b7713 100644 --- a/src/common.hpp +++ b/src/common.hpp @@ -59,9 +59,10 @@ namespace websocketpp { const uint64_t DEFAULT_MAX_MESSAGE_SIZE = 0xFFFFFF; // ~16MB const size_t DEFAULT_READ_THRESHOLD = 1; // 512 would be a more sane value for this - const bool DEFAULT_SILENT_CLOSE = false; // true + const size_t MAX_THREAD_POOL_SIZE = 64; + const uint16_t DEFAULT_PORT = 80; const uint16_t DEFAULT_SECURE_PORT = 443; diff --git a/src/roles/server.hpp b/src/roles/server.hpp index 3c294162ef..d7609fbf79 100644 --- a/src/roles/server.hpp +++ b/src/roles/server.hpp @@ -39,6 +39,7 @@ #include #include #include +#include #include #include @@ -194,17 +195,17 @@ public: m_state(IDLE), m_timer(m,boost::posix_time::seconds(0)) {} - void listen(uint16_t port); - void listen(const boost::asio::ip::tcp::endpoint& e); + void listen(uint16_t port, size_t n = 1); + void listen(const boost::asio::ip::tcp::endpoint& e, size_t num_threads = 1); // uses internal resolver - void listen(const std::string &host, const std::string &service); + void listen(const std::string &host, const std::string &service, size_t n = 1); template - void listen(const InternetProtocol &internet_protocol, uint16_t port) { + void listen(const InternetProtocol &internet_protocol, uint16_t port, size_t n = 1) { m_endpoint.alog().at(log::alevel::DEVEL) << "role::server listening on port " << port << log::endl; boost::asio::ip::tcp::endpoint e(internet_protocol, port); - listen(e); + listen(e,n); } protected: bool is_server() { @@ -236,33 +237,54 @@ private: }; template -void server::listen(const boost::asio::ip::tcp::endpoint& e) { - boost::unique_lock lock(m_endpoint.m_lock); +void server::listen(const boost::asio::ip::tcp::endpoint& e,size_t num_threads) { + { + boost::unique_lock lock(m_endpoint.m_lock); + + if (m_state != IDLE) { + throw exception("listen called from invalid state."); + } + + m_acceptor.open(e.protocol()); + m_acceptor.set_option(boost::asio::socket_base::reuse_address(true)); + m_acceptor.bind(e); + m_acceptor.listen(); - if (m_state != IDLE) { - throw exception("listen called from invalid state."); + this->start_accept(); } - m_acceptor.open(e.protocol()); - m_acceptor.set_option(boost::asio::socket_base::reuse_address(true)); - m_acceptor.bind(e); - m_acceptor.listen(); - - this->start_accept(); - - lock.unlock(); - m_endpoint.run_internal(); + if (num_threads == 1) { + m_endpoint.run_internal(); + } else if (num_threads > 1 && num_threads <= MAX_THREAD_POOL_SIZE) { + std::vector< boost::shared_ptr > threads; + + for (std::size_t i = 0; i < num_threads; ++i) { + boost::shared_ptr thread( + new boost::thread(boost::bind( + &endpoint_type::run_internal, + &m_endpoint + )) + ); + threads.push_back(thread); + } + + for (std::size_t i = 0; i < threads.size(); ++i) { + threads[i]->join(); + } + } else { + throw exception("listen called with invalid num_threads value"); + } } // server Implimentation // TODO: provide a way to stop/reset the server endpoint template -void server::listen(uint16_t port) { - listen(boost::asio::ip::tcp::v6(), port); +void server::listen(uint16_t port, size_t n) { + listen(boost::asio::ip::tcp::v6(), port, n); } template -void server::listen(const std::string &host, const std::string &service) { +void server::listen(const std::string &host, const std::string &service, size_t n) { boost::asio::ip::tcp::resolver resolver(m_io_service); boost::asio::ip::tcp::resolver::query query(host, service); boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query); @@ -270,7 +292,7 @@ void server::listen(const std::string &host, const std::string &servic if (endpoint_iterator == end) { throw std::invalid_argument("Can't resolve host/service to listen"); } - listen(*endpoint_iterator); + listen(*endpoint_iterator,n); } template