diff --git a/examples/concurrent_server/concurrent_server b/examples/concurrent_server/concurrent_server new file mode 100755 index 0000000000..b85d15b65c Binary files /dev/null and b/examples/concurrent_server/concurrent_server differ diff --git a/examples/concurrent_server/concurrent_server.cpp b/examples/concurrent_server/concurrent_server.cpp index 6373c981e6..9a072230bb 100644 --- a/examples/concurrent_server/concurrent_server.cpp +++ b/examples/concurrent_server/concurrent_server.cpp @@ -32,14 +32,34 @@ #include #include +#include using websocketpp::server; +// A request encapsulates all of the information necesssary to perform a request +// the coordinator will fill in this information from the websocket connection +// and add it to the processing queue. Sleeping in this example is a placeholder +// for any long serial task. struct request { server::handler::connection_ptr con; int value; + + void process() { + std::stringstream response; + response << "Sleeping for " << value << " milliseconds!"; + con->send(response.str()); + + boost::this_thread::sleep(boost::posix_time::milliseconds(value)); + + response.str(""); + response << "Done sleeping for " << value << " milliseconds!"; + con->send(response.str()); + } }; +// The coordinator is a simple wrapper around an STL queue. add_request inserts +// a new request. get_request returns the next available request and blocks +// (using condition variables) in the case that the queue is empty. class request_coordinator { public: void add_request(const request& r) { @@ -65,6 +85,9 @@ private: boost::condition_variable m_cond; }; +// The WebSocket++ handler in this case reads numbers from connections and packs +// connection pointer + number into a request struct and passes it off to the +// coordinator. class concurrent_server_handler : public server::handler { public: concurrent_server_handler(request_coordinator& c) : m_coordinator(c) {} @@ -85,8 +108,25 @@ private: request_coordinator& m_coordinator; }; +class server_handler : public server::handler { +public: + void on_message(connection_ptr con,message_ptr msg) { + int value = atoi(msg->get_payload().c_str()); + + if (value == 0) { + con->send("Invalid sleep value."); + } else { + request r; + r.con = con; + r.value = value; + r.process(); + } + } +}; - +// process_requests is the body function for a processing thread. It loops +// forever reading requests, processing them serially, then reading another +// request. void process_requests(request_coordinator* coordinator); void process_requests(request_coordinator* coordinator) { @@ -95,34 +135,49 @@ void process_requests(request_coordinator* coordinator) { while (1) { coordinator->get_request(r); - std::stringstream response; - response << "Sleeping for " << r.value << " milliseconds!"; - r.con->send(response.str()); - - boost::this_thread::sleep(boost::posix_time::milliseconds(r.value)); - - response.str(""); - response << "Done sleeping for " << r.value << " milliseconds!"; - r.con->send(response.str()); + r.process(); } } +// concurrent server takes two arguments. A port to bind to and a number of +// worker threads to create. The thread count must be an integer greater than +// or equal to zero. +// +// num_threads=0 Standard non-threaded WebSocket++ mode. Handlers will block +// i/o operations and other handlers. +// num_threads=1 One thread processes requests serially the other handles i/o +// This allows new connections and requests to be made while the +// processing thread is busy, but does allow long jobs to +// monopolize the processor increasing request latency. +// num_threads>1 Multiple processing threads will work on the single queue of +// requests provided by the i/o thread. This enables out of order +// completion of requests. The number of threads can be tuned +// based on hardware concurrency available and expected load and +// job length. int main(int argc, char* argv[]) { unsigned short port = 9002; - - if (argc == 2) { - port = atoi(argv[1]); - - if (port == 0) { - std::cout << "Unable to parse port input " << argv[1] << std::endl; - return 1; - } - } + unsigned short num_threads = 2; - try { + try { + if (argc == 2) { + std::stringstream buffer(argv[1]); + buffer >> port; + } + + if (argc == 3) { + std::stringstream buffer(argv[2]); + buffer >> num_threads; + } + request_coordinator rc; - server::handler::ptr h(new concurrent_server_handler(rc)); + server::handler::ptr h; + if (num_threads == 0) { + h = server::handler::ptr(new server_handler()); + } else { + h = server::handler::ptr(new concurrent_server_handler(rc)); + } + server echo_endpoint(h); echo_endpoint.alog().unset_level(websocketpp::log::alevel::ALL); @@ -131,10 +186,13 @@ int main(int argc, char* argv[]) { echo_endpoint.elog().set_level(websocketpp::log::elevel::ERROR); echo_endpoint.elog().set_level(websocketpp::log::elevel::FATAL); - boost::thread t1(boost::bind(&process_requests, &rc)); - boost::thread t2(boost::bind(&process_requests, &rc)); + std::list > threads; - std::cout << "Starting WebSocket sleep server on port " << port << std::endl; + for (int i = 0; i < num_threads; i++) { + threads.push_back(boost::shared_ptr(new boost::thread(boost::bind(&process_requests, &rc)))); + } + + std::cout << "Starting WebSocket sleep server on port " << port << " with " << num_threads << " processing threads." << std::endl; echo_endpoint.listen(port); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl;