updates broadcast_server to use common/thread rather than hardcoding boost thread

This commit is contained in:
Peter Thorson
2013-05-16 10:31:02 -05:00
parent 4474868372
commit f988315ea8
2 changed files with 28 additions and 15 deletions

View File

@@ -14,10 +14,10 @@ prgs = []
# if a C++11 environment is avaliable build using that, otherwise use boost
if env_cpp11.has_key('WSPP_CPP11_ENABLED'):
ALL_LIBS = boostlibs(['system','thread'],env_cpp11) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs]
prgs += env_cpp11.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS)
else:
ALL_LIBS = boostlibs(['system','regex'],env) + [platform_libs] + [polyfill_libs]
ALL_LIBS = boostlibs(['system','thread','regex'],env) + [platform_libs] + [polyfill_libs]
prgs += env.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS)
Return('prgs')

View File

@@ -4,9 +4,10 @@
#include <iostream>
#include <boost/thread.hpp>
/*#include <boost/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <boost/thread/condition_variable.hpp>*/
#include <websocketpp/common/thread.hpp>
typedef websocketpp::server<websocketpp::config::asio> server;
@@ -15,6 +16,11 @@ using websocketpp::lib::placeholders::_1;
using websocketpp::lib::placeholders::_2;
using websocketpp::lib::bind;
using websocketpp::lib::thread;
using websocketpp::lib::mutex;
using websocketpp::lib::unique_lock;
using websocketpp::lib::condition_variable;
/* on_open insert connection_hdl into channel
* on_close remove connection_hdl from channel
* on_message queue send to all channels
@@ -67,7 +73,7 @@ public:
}
void on_open(connection_hdl hdl) {
boost::unique_lock<boost::mutex> lock(m_action_lock);
unique_lock<mutex> lock(m_action_lock);
//std::cout << "on_open" << std::endl;
m_actions.push(action(SUBSCRIBE,hdl));
lock.unlock();
@@ -75,7 +81,7 @@ public:
}
void on_close(connection_hdl hdl) {
boost::unique_lock<boost::mutex> lock(m_action_lock);
unique_lock<mutex> lock(m_action_lock);
//std::cout << "on_close" << std::endl;
m_actions.push(action(UNSUBSCRIBE,hdl));
lock.unlock();
@@ -84,7 +90,7 @@ public:
void on_message(connection_hdl hdl, server::message_ptr msg) {
// queue message up for sending by processing thread
boost::unique_lock<boost::mutex> lock(m_action_lock);
unique_lock<mutex> lock(m_action_lock);
//std::cout << "on_message" << std::endl;
m_actions.push(action(MESSAGE,msg));
lock.unlock();
@@ -93,7 +99,7 @@ public:
void process_messages() {
while(1) {
boost::unique_lock<boost::mutex> lock(m_action_lock);
unique_lock<mutex> lock(m_action_lock);
while(m_actions.empty()) {
m_action_cond.wait(lock);
@@ -105,13 +111,13 @@ public:
lock.unlock();
if (a.type == SUBSCRIBE) {
boost::unique_lock<boost::mutex> lock(m_connection_lock);
unique_lock<mutex> lock(m_connection_lock);
m_connections.insert(a.hdl);
} else if (a.type == UNSUBSCRIBE) {
boost::unique_lock<boost::mutex> lock(m_connection_lock);
unique_lock<mutex> lock(m_connection_lock);
m_connections.erase(a.hdl);
} else if (a.type == MESSAGE) {
boost::unique_lock<boost::mutex> lock(m_connection_lock);
unique_lock<mutex> lock(m_connection_lock);
con_list::iterator it;
for (it = m_connections.begin(); it != m_connections.end(); ++it) {
@@ -129,17 +135,24 @@ private:
con_list m_connections;
std::queue<action> m_actions;
boost::mutex m_action_lock;
boost::mutex m_connection_lock;
boost::condition_variable m_action_cond;
mutex m_action_lock;
mutex m_connection_lock;
condition_variable m_action_cond;
};
int main() {
try {
broadcast_server server;
// Start a thread to run the processing loop
boost::thread(bind(&broadcast_server::process_messages,&server));
thread t(bind(&broadcast_server::process_messages,&server));
// Run the asio loop with the main thread
server.run(9002);
t.join();
} catch (std::exception & e) {
std::cout << e.what() << std::endl;
}
}