From 3405a91e56e42e3efd871ba5b1f5bd5eff8317d0 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sat, 17 Dec 2011 08:35:37 -0600 Subject: [PATCH] begins work on wscommand processing --- .../broadcast_server_tls/broadcast_admin.html | 33 +++++-- .../broadcast_server_tls.cpp | 96 ++++++++++++++++++- src/connection.hpp | 4 +- src/roles/server.hpp | 4 +- 4 files changed, 124 insertions(+), 13 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_admin.html b/examples/broadcast_server_tls/broadcast_admin.html index 0d18cd6155..4363118c91 100644 --- a/examples/broadcast_server_tls/broadcast_admin.html +++ b/examples/broadcast_server_tls/broadcast_admin.html @@ -59,14 +59,24 @@ function connect() { if (foo.type == "message") { if (options.console_enabled) { document.getElementById("messages").innerHTML += "Broadcasted Message: "+foo.value+"
"; + } + } else if (foo.type == "error") { + if (options.console_enabled) { + document.getElementById("messages").innerHTML += "Command Error: "+foo.value+"
"; } } else if (foo.type == "con") { document.getElementById("connected_clients").innerHTML = foo.value; } else if (foo.type == "stats") { + var msg_delta = 0; + var data_delta = 0; + for (var i in foo.messages) { var hash = foo.messages[i].hash; if (hash in msgs) { + msg_delta += foo.messages[i].acked-msgs[hash]["acked"]; + data_delta += msg_delta*foo.messages[i].size; + msgs[hash]["sent"] = foo.messages[i].sent; msgs[hash]["acked"] = foo.messages[i].acked; msgs[hash]["time"] = foo.messages[i].time; @@ -76,6 +86,8 @@ function connect() { "acked":foo.messages[i].acked, "size":foo.messages[i].size, "time":foo.messages[i].time} + msg_delta += foo.messages[i].acked; + data_delta += msg_delta*foo.messages[i].size; } } @@ -103,20 +115,15 @@ function connect() { } - /*data2.push([foo.timestamp,foo.bytes]); + data2.push([foo.timestamp,data_delta]); if (data2.length > total_points) { data2 = data2.slice(data2.length-total_points); } - message_history.push([foo.timestamp,foo.messages]); + message_history.push([foo.timestamp,msg_delta]); if (message_history.length > total_points) { message_history = message_history.slice(message_history.length-total_points); - }*/ - - /*ack_history.push([foo.timestamp,foo.messages_acked]); - if (message_history.length > total_points) { - message_history = message_history.slice(message_history.length-total_points); - }*/ + } } else { document.getElementById("messages").innerHTML += "Unrecognized Server Command.
"; } @@ -157,6 +164,16 @@ function send() { document.getElementById("msg").value = ""; } +function send_command(command,args) { + var cmd = command+":"; + + ws.send(cmd); +} + +function send_test_message(size,type) { + ws.send((new Array(size)).join("*")); +} + function format_data(val) { if (val > 1000000) return (val / 1000000).toFixed(2) + " MB"; diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp index 6ea4cab6a2..f4b3c444d4 100644 --- a/examples/broadcast_server_tls/broadcast_server_tls.cpp +++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp @@ -143,6 +143,69 @@ public: void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) { typename std::set::iterator it; + + // command structure + // command:arg1=val1;arg2=val2;arg3=val3; + + // commands + // ack: messages to ack + // example: `ack:e3458d0aceff8b70a3e5c0afec632881=38;e3458d0aceff8b70a3e5c0afec632881=42;` + + // send: [vals] + // message; opcode=X; payload="X" + // frame; [fuzzer stuff] + + // close:code=1000;reason=msg; + // (instructs the opposite end to close with given optional code/msg) + + const std::string &m(msg->get_payload()); + + std::string::size_type start; + std::string::size_type end; + + start = m.find(":",0); + + if (start != std::string::npos) { + std::string command = m.substr(0,start); + + // parse args + std::map args; + + start++; // skip the colon + end = m.find(";",start); + + // find all semicolons + while (end != std::string::npos) { + std::string arg; + std::string val; + + std::string::size_type sep = m.find("=",start); + + if (sep != std::string::npos) { + arg = m.substr(start,sep-start); + val = m.substr(sep+1,end-sep-1); + } else { + arg = m.substr(start,end-start); + val = ""; + } + + args[arg] = val; + + start = end+1; + end = m.find(";",start); + } + + if (command == "close") { + handle_close(connection,args); + } else if (command == "send") { + handle_send(connection,args); + } else { + command_error(connection,"Unrecognized command: "+command); + } + } else { + command_error(connection,"Invalid command syntax"); + } + if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") { //std::cout << "got ack" << std::endl; //std::cout << msg->get_payload() << std::endl; @@ -265,7 +328,38 @@ public: connection->recycle(msg); } + + void command_error(connection_ptr connection,const std::string msg) { + connection->send("{\"type\":\"error\",\"value\":\""+msg+"\"}"); + } + // in order to keep parsing this command language as simple as possible + // the following values must be escaped (with \) if they are to appear + // literally in string arguments: :,;=\ + + // close: [reason; code=1000; msg=X], [all] + // (instructs the opposite end to close with given optional code/msg) + void handle_close(connection_ptr connection, + const std::map args) + { + if (args.size() == 0) { + typename std::set::iterator it; + + for (it = m_connections.begin(); it != m_connections.end(); it++) { + + (*it)->close(websocketpp::close::status::NORMAL); + } + } else { + command_error(connection,"close arguments not supported"); + } + } + + void handle_send(connection_ptr connection, + const std::map args) + { + + } + void http(connection_ptr connection) { std::stringstream foo; @@ -450,7 +544,7 @@ int main(int argc, char* argv[]) { plain_handler_ptr h(new broadcast_server_handler()); plain_endpoint_type e(h); - e.alog().unset_level(websocketpp::log::alevel::ALL); + e.alog().set_level(websocketpp::log::alevel::ALL); e.elog().set_level(websocketpp::log::elevel::ALL); std::cout << "Starting WebSocket broadcast server on port " << port << std::endl; diff --git a/src/connection.hpp b/src/connection.hpp index fa2c4ad13b..eadf37f3bc 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -166,8 +166,8 @@ public: type::shared_from_this(), msg)); } - void close(close::status::value code, const utf8_string& reason) { - // TODO: + void close(close::status::value code, const utf8_string& reason = "") { + // TODO: overloads without code or reason? send_close(code, reason); } void ping(const binary_string& payload) { diff --git a/src/roles/server.hpp b/src/roles/server.hpp index 9acfcab889..5ba7774020 100644 --- a/src/roles/server.hpp +++ b/src/roles/server.hpp @@ -352,7 +352,7 @@ void server::connection::handle_read_request( } // TODO: is there a way to short circuit this or something? - //m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl; + m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl; std::string h = m_request.header("Upgrade"); if (boost::ifind_first(h,"websocket")) { @@ -539,7 +539,7 @@ void server::connection::log_open_result() { << ep << " " << (m_version == -1 ? "" : version.str()) << (get_request_header("User-Agent") == "" ? "NULL" : get_request_header("User-Agent")) - << " " << m_uri->get_resource() << " " << m_response.get_status_code() + << " " << (m_uri ? m_uri->get_resource() : "uri is NULL") << " " << m_response.get_status_code() << log::endl; }