mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
additional work on the broadcast server example
This commit is contained in:
@@ -188,6 +188,9 @@ echo_server = SConscript('#/examples/echo_server/SConscript',variant_dir = build
|
||||
# echo_server_tls
|
||||
echo_server_tls = SConscript('#/examples/echo_server_tls/SConscript',variant_dir = builddir + 'echo_server_tls',duplicate = 0)
|
||||
|
||||
# broadcast_server
|
||||
broadcast_server = SConscript('#/examples/broadcast_server/SConscript',variant_dir = builddir + 'broadcast_server',duplicate = 0)
|
||||
#
|
||||
#wsperf = SConscript('#/examples/wsperf/SConscript',
|
||||
# variant_dir = builddir + 'wsperf',
|
||||
# duplicate = 0)
|
||||
|
||||
@@ -14,7 +14,7 @@ prgs = []
|
||||
|
||||
# if a C++11 environment is avaliable build using that, otherwise use boost
|
||||
if env_cpp11.has_key('WSPP_CPP11_ENABLED'):
|
||||
ALL_LIBS = boostlibs(['system'],env_cpp11) + [platform_libs] + [polyfill_libs]
|
||||
ALL_LIBS = boostlibs(['system','thread'],env_cpp11) + [platform_libs] + [polyfill_libs]
|
||||
prgs += env_cpp11.Program('broadcast_server', ["broadcast_server.cpp"], LIBS = ALL_LIBS)
|
||||
else:
|
||||
ALL_LIBS = boostlibs(['system','regex'],env) + [platform_libs] + [polyfill_libs]
|
||||
|
||||
@@ -4,8 +4,13 @@
|
||||
|
||||
#include <iostream>
|
||||
|
||||
#include <boost/thread.hpp>
|
||||
#include <boost/thread/mutex.hpp>
|
||||
#include <boost/thread/condition_variable.hpp>
|
||||
|
||||
typedef websocketpp::server<websocketpp::config::asio> server;
|
||||
|
||||
using websocketpp::connection_hdl;
|
||||
using websocketpp::lib::placeholders::_1;
|
||||
using websocketpp::lib::placeholders::_2;
|
||||
using websocketpp::lib::bind;
|
||||
@@ -15,6 +20,21 @@ using websocketpp::lib::bind;
|
||||
* on_message queue send to all channels
|
||||
*/
|
||||
|
||||
enum action_type {
|
||||
SUBSCRIBE,
|
||||
UNSUBSCRIBE,
|
||||
MESSAGE
|
||||
};
|
||||
|
||||
struct action {
|
||||
action(action_type t, connection_hdl h) : type(t), hdl(h) {}
|
||||
action(action_type t, server::message_ptr m) : type(t), msg(m) {}
|
||||
|
||||
action_type type;
|
||||
websocketpp::connection_hdl hdl;
|
||||
server::message_ptr msg;
|
||||
};
|
||||
|
||||
class broadcast_server {
|
||||
public:
|
||||
broadcast_server() {
|
||||
@@ -22,9 +42,9 @@ public:
|
||||
m_server.init_asio();
|
||||
|
||||
// Register handler callbacks
|
||||
m_server.set_open_handler(bind(&on_open,this,::_1));
|
||||
m_server.set_close_handler(bind(&on_message,this,::_1));
|
||||
m_server.set_message_handler(bind(&on_message,this,::_1,::_2));
|
||||
m_server.set_open_handler(bind(&broadcast_server::on_open,this,::_1));
|
||||
m_server.set_close_handler(bind(&broadcast_server::on_close,this,::_1));
|
||||
m_server.set_message_handler(bind(&broadcast_server::on_message,this,::_1,::_2));
|
||||
}
|
||||
|
||||
void run(uint16_t port) {
|
||||
@@ -32,11 +52,11 @@ public:
|
||||
m_server.listen(port);
|
||||
|
||||
// Start the server accept loop
|
||||
echo_server.start_accept();
|
||||
m_server.start_accept();
|
||||
|
||||
// Start the ASIO io_service run loop
|
||||
try {
|
||||
echo_server.run();
|
||||
m_server.run();
|
||||
} catch (const std::exception & e) {
|
||||
std::cout << e.what() << std::endl;
|
||||
} catch (websocketpp::lib::error_code e) {
|
||||
@@ -47,47 +67,72 @@ public:
|
||||
}
|
||||
|
||||
void on_open(connection_hdl hdl) {
|
||||
boost::unique_lock<boost::mutex> lock(m_lock);
|
||||
m_connections.insert(hdl);
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
//std::cout << "on_open" << std::endl;
|
||||
m_actions.push(action(SUBSCRIBE,hdl));
|
||||
lock.unlock();
|
||||
m_action_cond.notify_one();
|
||||
}
|
||||
|
||||
void on_close(connection_hdl hdl) {
|
||||
boost::unique_lock<boost::mutex> lock(m_lock);
|
||||
m_connections.remove(hdl);
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
//std::cout << "on_close" << std::endl;
|
||||
m_actions.push(action(UNSUBSCRIBE,hdl));
|
||||
lock.unlock();
|
||||
m_action_cond.notify_one();
|
||||
}
|
||||
|
||||
void on_message(connection_hdl hdl, server::message_ptr msg) {
|
||||
// queue message up for sending by processing thread
|
||||
boost::unique_lock<boost::mutex> lock(m_lock);
|
||||
m_message_queue.push(msg);
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
//std::cout << "on_message" << std::endl;
|
||||
m_actions.push(action(MESSAGE,msg));
|
||||
lock.unlock();
|
||||
m_cond.notify_one();
|
||||
m_action_cond.notify_one();
|
||||
}
|
||||
|
||||
void process_messages() {
|
||||
while(1) {
|
||||
boost::unique_lock<boost::mutex> lock(m_lock);
|
||||
boost::unique_lock<boost::mutex> lock(m_action_lock);
|
||||
|
||||
while(m_message_queue.empty()) {
|
||||
m_cond.wait(m_lock);
|
||||
while(m_actions.empty()) {
|
||||
m_action_cond.wait(lock);
|
||||
}
|
||||
|
||||
message_ptr msg = m_message_queue.front();
|
||||
m_message_queue.pop();
|
||||
action a = m_actions.front();
|
||||
m_actions.pop();
|
||||
|
||||
lock.unlock();
|
||||
|
||||
|
||||
if (a.type == SUBSCRIBE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
m_connections.insert(a.hdl);
|
||||
} else if (a.type == UNSUBSCRIBE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
m_connections.erase(a.hdl);
|
||||
} else if (a.type == MESSAGE) {
|
||||
boost::unique_lock<boost::mutex> lock(m_connection_lock);
|
||||
|
||||
con_list::iterator it;
|
||||
for (it = m_connections.begin(); it != m_connections.end(); ++it) {
|
||||
m_server.send(*it,a.msg);
|
||||
}
|
||||
} else {
|
||||
// undefined.
|
||||
}
|
||||
}
|
||||
}
|
||||
private:
|
||||
server m_server;
|
||||
std::set<connection_hdl> m_connections;
|
||||
std::queue<server::message_ptr> m_message_queue;
|
||||
typedef std::set<connection_hdl,std::owner_less<connection_hdl>> con_list;
|
||||
|
||||
boost::mutex m_mutex;
|
||||
boost::condition_variable m_cond;
|
||||
}
|
||||
server m_server;
|
||||
con_list m_connections;
|
||||
std::queue<action> m_actions;
|
||||
|
||||
boost::mutex m_action_lock;
|
||||
boost::mutex m_connection_lock;
|
||||
boost::condition_variable m_action_cond;
|
||||
};
|
||||
|
||||
int main() {
|
||||
broadcast_server server;
|
||||
|
||||
@@ -233,6 +233,10 @@ void connection<config>::close(const close::status::value code,
|
||||
}
|
||||
}
|
||||
|
||||
/// Trigger the on_interrupt handler
|
||||
/**
|
||||
* This is thread safe if the transport is thread safe
|
||||
*/
|
||||
template <typename config>
|
||||
lib::error_code connection<config>::interrupt() {
|
||||
std::cout << "connection::interrupt" << std::endl;
|
||||
|
||||
Reference in New Issue
Block a user