From 3e9dc3f9df8c5edb221edf66e877079d0e5862ed Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Wed, 14 Dec 2011 15:09:00 -0600 Subject: [PATCH] begins work to track broadcast message acks --- .../broadcast_server_tls/broadcast_admin.html | 9 ++ .../broadcast_server_tls.cpp | 113 +++++++++++++----- examples/stress_client/stress_client.cpp | 61 +++++++++- 3 files changed, 149 insertions(+), 34 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_admin.html b/examples/broadcast_server_tls/broadcast_admin.html index abc21d0056..d63c7521cc 100644 --- a/examples/broadcast_server_tls/broadcast_admin.html +++ b/examples/broadcast_server_tls/broadcast_admin.html @@ -19,6 +19,7 @@ var data2 = [], total_points = 60; var stale_data = null; var message_history = []; +var ack_history = []; var client_history = []; @@ -64,6 +65,7 @@ function connect() { document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s"; document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s"; document.getElementById("messages_sent").innerHTML = foo.messages_sent; + document.getElementById("messages_acked").innerHTML = foo.messages_acked; document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent); document.getElementById("admin_connections").innerHTML = foo.admin_connections; @@ -82,6 +84,11 @@ function connect() { if (message_history.length > total_points) { message_history = message_history.slice(message_history.length-total_points); } + + /*ack_history.push([foo.timestamp,foo.messages_acked]); + if (message_history.length > total_points) { + message_history = message_history.slice(message_history.length-total_points); + }*/ } else { document.getElementById("messages").innerHTML += "Unrecognized Server Command.
"; } @@ -97,6 +104,7 @@ function clear_hud() { document.getElementById("bytes_per_sec").innerHTML = "N/A"; document.getElementById("messages_sent").innerHTML = "N/A"; document.getElementById("bytes_sent").innerHTML = "N/A"; + document.getElementById("messages_acked").innerHTML = "N/A"; } function disconnect() { @@ -238,6 +246,7 @@ body,html {

Messages

Messages Sent: N/A
+
Messages Acked: N/A
Messages Rate: N/A

Data

diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp index 0df71c0532..308235de5a 100644 --- a/examples/broadcast_server_tls/broadcast_server_tls.cpp +++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp @@ -28,6 +28,7 @@ #include "../../src/endpoint.hpp" #include "../../src/roles/server.hpp" #include "../../src/sockets/ssl.hpp" +#include "../../src/md5/md5.hpp" #include @@ -128,39 +129,89 @@ public: void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) { typename std::set::iterator it; - - - // broadcast to clients - for (it = m_connections.begin(); it != m_connections.end(); it++) { - m_messages++; - m_data += msg->get_payload().size(); - (*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY)); - } - - // broadcast to admins - std::stringstream foo; - foo << "{\"type\":\"message\",\"value\":\""; - - if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) { - foo << "[Binary Message, length: " << msg->get_payload().size() << "]"; - } else { - if (msg->get_payload().size() > 126) { - foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]"; - } else { - foo << msg->get_payload(); + if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") { + //std::cout << msg->get_payload() << std::endl; + // process a + // + + //{"type":"acks","messages":[{"e3458d0aceff8b70a3e5c0afec632881":38},{"e3458d0aceff8b70a3e5c0afec632881":38}]} + + std::string::size_type start = 27; + std::string::size_type end = msg->get_payload().find(",",start); + size_t count; + + m_messages_acked = 0; + + while (end != std::string::npos) { + if (end-start < 38) { + // error, not the input we were expecting + continue; + } else { + count = atol(msg->get_payload().substr(start+36,end-2).c_str()); + if (count == 0) { + // error parsing number + continue; + } + } + + m_ack_stats[msg->get_payload().substr(start+2,32)] = count; + + start = end+1; + end = msg->get_payload().find(",",start); } - } - - foo << "\"}"; - - for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) { - m_messages++; - m_data += msg->get_payload().size(); - (*it)->send(foo.str(),false); - } + + end = msg->get_payload().size(); + + // get the last value + if (end-start < 38) { + // error, not the input we were expecting + } else { + count = atol(msg->get_payload().substr(start+36,end-4).c_str()); + if (count == 0) { + // error parsing number + } + } + + m_ack_stats[msg->get_payload().substr(start+2,32)] = count; + m_messages_acked += count; + } else { + // broadcast to clients + for (it = m_connections.begin(); it != m_connections.end(); it++) { + std::string hash = websocketpp::md5_hash_hex(msg->get_payload()); + + //std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl; + m_messages++; + m_data += msg->get_payload().size(); + (*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY)); + } + + // broadcast to admins + std::stringstream foo; + foo << "{\"type\":\"message\",\"value\":\""; + + if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) { + foo << "[Binary Message, length: " << msg->get_payload().size() << "]"; + } else { + if (msg->get_payload().size() > 126) { + foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]"; + } else { + foo << msg->get_payload(); + } + } + + foo << "\"}"; + + for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) { + //m_messages++; + //m_data += msg->get_payload().size(); + (*it)->send(foo.str(),false); + } + } connection->recycle(msg); } + + void http(connection_ptr connection) { std::stringstream foo; @@ -205,6 +256,7 @@ public: << ",\"messages\":" << m_messages << ",\"bytes\":" << m_data << ",\"messages_sent\":" << m_messages_sent + << ",\"messages_acked\":" << m_messages_acked << ",\"bytes_sent\":" << m_data_sent << ",\"connections\":" << m_connections.size() << ",\"admin_connections\":" << m_admin_connections.size() @@ -242,6 +294,9 @@ private: boost::posix_time::ptime m_epoch; boost::posix_time::ptime m_last_time; + size_t m_messages_acked; + std::map m_ack_stats; + std::set m_connections; std::set m_admin_connections; }; diff --git a/examples/stress_client/stress_client.cpp b/examples/stress_client/stress_client.cpp index 5b568a5c8a..7d098e84ff 100644 --- a/examples/stress_client/stress_client.cpp +++ b/examples/stress_client/stress_client.cpp @@ -27,6 +27,7 @@ #include "../../src/endpoint.hpp" #include "../../src/roles/client.hpp" +#include "../../src/md5/md5.hpp" #include @@ -45,8 +46,24 @@ public: typedef echo_client_handler type; typedef plain_endpoint_type::connection_ptr connection_ptr; - void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) { - /*if (connection->get_resource() == "/getCaseCount") { + void on_open(connection_ptr connection) { + //std::cout << "on_open: " << std::endl; + if (!m_timer) { + m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0))); + m_timer->expires_from_now(boost::posix_time::milliseconds(1000)); + m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error)); + } + } + + void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) { + // SHA1 or MD5 the message and + + //std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl; + + m_total++; + m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++; + + /*if (connection->get_resource() == "/getCaseCount") { std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl; m_case_count = atoi(msg->get_payload().c_str()); } else { @@ -64,15 +81,49 @@ public: std::cout << "connection failed" << std::endl; } - - int m_case_count; + void on_timer(connection_ptr connection,const boost::system::error_code& error) { + if (error) { + std::cout << "on_timer error" << std::endl; + return; + } + + //std::cout << "on_timer: " << m_total << std::endl; + std::stringstream foo; + foo << "{\"type\":\"acks\",\"messages\":["; + + std::map::iterator it; + std::map::iterator last = m_msg_stats.end(); + if (m_msg_stats.size() > 0) { + last--; + } + + for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) { + foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : ""); + } + foo << "]}"; + + //std::cout << "Sending " << foo.str() << std::endl; + + connection->send(foo.str(),false); + + m_timer->expires_from_now(boost::posix_time::milliseconds(1000)); + m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error)); + } + + void on_close(connection_ptr connection) { + m_timer->cancel(); + } + + size_t m_total; + std::map m_msg_stats; + boost::shared_ptr m_timer; }; int main(int argc, char* argv[]) { std::string uri = "ws://localhost:9002/"; int num_batches = 1; - int batch_size = 50; + int batch_size = 1; if (argc != 4) { std::cout << "Usage: `echo_client test_url num_batches batch_size`" << std::endl;