adds thread pool support to server

This commit is contained in:
Peter Thorson
2012-03-21 22:37:04 -06:00
parent 6ca0b49ae0
commit 9348aa4c21
2 changed files with 46 additions and 23 deletions

View File

@@ -59,9 +59,10 @@ namespace websocketpp {
const uint64_t DEFAULT_MAX_MESSAGE_SIZE = 0xFFFFFF; // ~16MB
const size_t DEFAULT_READ_THRESHOLD = 1; // 512 would be a more sane value for this
const bool DEFAULT_SILENT_CLOSE = false; // true
const size_t MAX_THREAD_POOL_SIZE = 64;
const uint16_t DEFAULT_PORT = 80;
const uint16_t DEFAULT_SECURE_PORT = 443;

View File

@@ -39,6 +39,7 @@
#include <boost/bind.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/scoped_ptr.hpp>
#include <boost/thread.hpp>
#include <boost/thread/recursive_mutex.hpp>
#include <iostream>
@@ -194,17 +195,17 @@ public:
m_state(IDLE),
m_timer(m,boost::posix_time::seconds(0)) {}
void listen(uint16_t port);
void listen(const boost::asio::ip::tcp::endpoint& e);
void listen(uint16_t port, size_t n = 1);
void listen(const boost::asio::ip::tcp::endpoint& e, size_t num_threads = 1);
// uses internal resolver
void listen(const std::string &host, const std::string &service);
void listen(const std::string &host, const std::string &service, size_t n = 1);
template <typename InternetProtocol>
void listen(const InternetProtocol &internet_protocol, uint16_t port) {
void listen(const InternetProtocol &internet_protocol, uint16_t port, size_t n = 1) {
m_endpoint.alog().at(log::alevel::DEVEL)
<< "role::server listening on port " << port << log::endl;
boost::asio::ip::tcp::endpoint e(internet_protocol, port);
listen(e);
listen(e,n);
}
protected:
bool is_server() {
@@ -236,33 +237,54 @@ private:
};
template <class endpoint>
void server<endpoint>::listen(const boost::asio::ip::tcp::endpoint& e) {
boost::unique_lock<boost::recursive_mutex> lock(m_endpoint.m_lock);
void server<endpoint>::listen(const boost::asio::ip::tcp::endpoint& e,size_t num_threads) {
{
boost::unique_lock<boost::recursive_mutex> lock(m_endpoint.m_lock);
if (m_state != IDLE) {
throw exception("listen called from invalid state.");
}
m_acceptor.open(e.protocol());
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor.bind(e);
m_acceptor.listen();
if (m_state != IDLE) {
throw exception("listen called from invalid state.");
this->start_accept();
}
m_acceptor.open(e.protocol());
m_acceptor.set_option(boost::asio::socket_base::reuse_address(true));
m_acceptor.bind(e);
m_acceptor.listen();
this->start_accept();
lock.unlock();
m_endpoint.run_internal();
if (num_threads == 1) {
m_endpoint.run_internal();
} else if (num_threads > 1 && num_threads <= MAX_THREAD_POOL_SIZE) {
std::vector< boost::shared_ptr<boost::thread> > threads;
for (std::size_t i = 0; i < num_threads; ++i) {
boost::shared_ptr<boost::thread> thread(
new boost::thread(boost::bind(
&endpoint_type::run_internal,
&m_endpoint
))
);
threads.push_back(thread);
}
for (std::size_t i = 0; i < threads.size(); ++i) {
threads[i]->join();
}
} else {
throw exception("listen called with invalid num_threads value");
}
}
// server<endpoint> Implimentation
// TODO: provide a way to stop/reset the server endpoint
template <class endpoint>
void server<endpoint>::listen(uint16_t port) {
listen(boost::asio::ip::tcp::v6(), port);
void server<endpoint>::listen(uint16_t port, size_t n) {
listen(boost::asio::ip::tcp::v6(), port, n);
}
template <class endpoint>
void server<endpoint>::listen(const std::string &host, const std::string &service) {
void server<endpoint>::listen(const std::string &host, const std::string &service, size_t n) {
boost::asio::ip::tcp::resolver resolver(m_io_service);
boost::asio::ip::tcp::resolver::query query(host, service);
boost::asio::ip::tcp::resolver::iterator endpoint_iterator = resolver.resolve(query);
@@ -270,7 +292,7 @@ void server<endpoint>::listen(const std::string &host, const std::string &servic
if (endpoint_iterator == end) {
throw std::invalid_argument("Can't resolve host/service to listen");
}
listen(*endpoint_iterator);
listen(*endpoint_iterator,n);
}
template <class endpoint>