From 18fef37837e5a3082409b3e046dbae30b7a580d0 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 20 Jan 2013 14:31:54 -0600 Subject: [PATCH] additional work on the broadcast server example --- SConstruct | 3 + examples/broadcast_server/SConscript | 2 +- .../broadcast_server/broadcast_server.cpp | 93 ++++++++++++++----- websocketpp/impl/connection_impl.hpp | 4 + 4 files changed, 77 insertions(+), 25 deletions(-) diff --git a/SConstruct b/SConstruct index 9a9d632e1a..aa9c8017e5 100644 --- a/SConstruct +++ b/SConstruct @@ -188,6 +188,9 @@ echo_server = SConscript('#/examples/echo_server/SConscript',variant_dir = build # echo_server_tls echo_server_tls = SConscript('#/examples/echo_server_tls/SConscript',variant_dir = builddir + 'echo_server_tls',duplicate = 0) +# broadcast_server +broadcast_server = SConscript('#/examples/broadcast_server/SConscript',variant_dir = builddir + 'broadcast_server',duplicate = 0) +# #wsperf = SConscript('#/examples/wsperf/SConscript', # variant_dir = builddir + 'wsperf', # duplicate = 0) diff --git a/examples/broadcast_server/SConscript b/examples/broadcast_server/SConscript index 5b25973de6..4ec5759dba 100644 --- a/examples/broadcast_server/SConscript +++ b/examples/broadcast_server/SConscript @@ -14,7 +14,7 @@ prgs = [] # if a C++11 environment is avaliable build using that, otherwise use boost if env_cpp11.has_key('WSPP_CPP11_ENABLED'): - ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs] + ALL_LIBS = boostlibs(['system','thread'],env_cpp11) + [platform_libs] + [polyfill_libs] prgs += env_cpp11.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS) else: ALL_LIBS = boostlibs(['system','regex'],env) + [platform_libs] + [polyfill_libs] diff --git a/examples/broadcast_server/broadcast_server.cpp b/examples/broadcast_server/broadcast_server.cpp index 9eed749efe..871c79ecf1 100644 --- a/examples/broadcast_server/broadcast_server.cpp +++ b/examples/broadcast_server/broadcast_server.cpp @@ -4,8 +4,13 @@ #include +#include +#include +#include + typedef websocketpp::server server; +using websocketpp::connection_hdl; using websocketpp::lib::placeholders::_1; using websocketpp::lib::placeholders::_2; using websocketpp::lib::bind; @@ -15,6 +20,21 @@ using websocketpp::lib::bind; * on_message queue send to all channels */ +enum action_type { + SUBSCRIBE, + UNSUBSCRIBE, + MESSAGE +}; + +struct action { + action(action_type t, connection_hdl h) : type(t), hdl(h) {} + action(action_type t, server::message_ptr m) : type(t), msg(m) {} + + action_type type; + websocketpp::connection_hdl hdl; + server::message_ptr msg; +}; + class broadcast_server { public: broadcast_server() { @@ -22,9 +42,9 @@ public: m_server.init_asio(); // Register handler callbacks - m_server.set_open_handler(bind(&on_open,this,::_1)); - m_server.set_close_handler(bind(&on_message,this,::_1)); - m_server.set_message_handler(bind(&on_message,this,::_1,::_2)); + m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1)); + m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1)); + m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2)); } void run(uint16_t port) { @@ -32,11 +52,11 @@ public: m_server.listen(port); // Start the server accept loop - echo_server.start_accept(); + m_server.start_accept(); // Start the ASIO io_service run loop try { - echo_server.run(); + m_server.run(); } catch (const std::exception & e) { std::cout << e.what() << std::endl; } catch (websocketpp::lib::error_code e) { @@ -47,47 +67,72 @@ public: } void on_open(connection_hdl hdl) { - boost::unique_lock lock(m_lock); - m_connections.insert(hdl); + boost::unique_lock lock(m_action_lock); + //std::cout << "on_open" << std::endl; + m_actions.push(action(SUBSCRIBE,hdl)); + lock.unlock(); + m_action_cond.notify_one(); } void on_close(connection_hdl hdl) { - boost::unique_lock lock(m_lock); - m_connections.remove(hdl); + boost::unique_lock lock(m_action_lock); + //std::cout << "on_close" << std::endl; + m_actions.push(action(UNSUBSCRIBE,hdl)); + lock.unlock(); + m_action_cond.notify_one(); } void on_message(connection_hdl hdl, server::message_ptr msg) { // queue message up for sending by processing thread - boost::unique_lock lock(m_lock); - m_message_queue.push(msg); + boost::unique_lock lock(m_action_lock); + //std::cout << "on_message" << std::endl; + m_actions.push(action(MESSAGE,msg)); lock.unlock(); - m_cond.notify_one(); + m_action_cond.notify_one(); } void process_messages() { while(1) { - boost::unique_lock lock(m_lock); + boost::unique_lock lock(m_action_lock); - while(m_message_queue.empty()) { - m_cond.wait(m_lock); + while(m_actions.empty()) { + m_action_cond.wait(lock); } - message_ptr msg = m_message_queue.front(); - m_message_queue.pop(); + action a = m_actions.front(); + m_actions.pop(); lock.unlock(); - + if (a.type == SUBSCRIBE) { + boost::unique_lock lock(m_connection_lock); + m_connections.insert(a.hdl); + } else if (a.type == UNSUBSCRIBE) { + boost::unique_lock lock(m_connection_lock); + m_connections.erase(a.hdl); + } else if (a.type == MESSAGE) { + boost::unique_lock lock(m_connection_lock); + + con_list::iterator it; + for (it = m_connections.begin(); it != m_connections.end(); ++it) { + m_server.send(*it,a.msg); + } + } else { + // undefined. + } } } private: - server m_server; - std::set m_connections; - std::queue m_message_queue; + typedef std::set> con_list; - boost::mutex m_mutex; - boost::condition_variable m_cond; -} + server m_server; + con_list m_connections; + std::queue m_actions; + + boost::mutex m_action_lock; + boost::mutex m_connection_lock; + boost::condition_variable m_action_cond; +}; int main() { broadcast_server server; diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp index dea846b48b..6876a45139 100644 --- a/websocketpp/impl/connection_impl.hpp +++ b/websocketpp/impl/connection_impl.hpp @@ -233,6 +233,10 @@ void connection::close(const close::status::value code, } } +/// Trigger the on_interrupt handler +/** + * This is thread safe if the transport is thread safe + */ template lib::error_code connection::interrupt() { std::cout << "connection::interrupt" << std::endl;