From 10b28259f58f18eb917f34dfde461d4504a34988 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Wed, 14 Dec 2011 19:55:10 -0600 Subject: [PATCH] work on acknowledging broadcasted messages --- .../broadcast_server_tls/broadcast_admin.html | 45 ++++++- .../broadcast_server_tls.cpp | 126 ++++++++++++++---- examples/stress_client/stress_client.cpp | 74 ++++------ src/md5/md5.h | 2 + 4 files changed, 166 insertions(+), 81 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_admin.html b/examples/broadcast_server_tls/broadcast_admin.html index d63c7521cc..6024cc175c 100644 --- a/examples/broadcast_server_tls/broadcast_admin.html +++ b/examples/broadcast_server_tls/broadcast_admin.html @@ -22,6 +22,7 @@ var message_history = []; var ack_history = []; var client_history = []; +var msgs = {}; function connect() { url = document.getElementById("server_url").value; @@ -62,11 +63,36 @@ function connect() { } else if (foo.type == "con") { document.getElementById("connected_clients").innerHTML = foo.value; } else if (foo.type == "stats") { - document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s"; + console.log(foo); + for (var i in foo.messages) { + var hash = foo.messages[i].hash; + + if (hash in msgs) { + msgs[hash]["sent"] += foo.messages[i].sent; + msgs[hash]["acked"] += foo.messages[i].acked; + msgs[hash]["time"] = foo.messages[i].time; + } else { + msgs[hash] = {"id":foo.messages[i].id, + "sent":foo.messages[i].sent, + "acked":foo.messages[i].acked, + "size":foo.messages[i].size, + "time":foo.messages[i].time} + } + } + + var foo = ""; + + for (i in msgs) { + foo += ""+msgs[i].id+""+msgs[i].sent+""+msgs[i].acked+""+msgs[i].size+""+msgs[i].time+""; + } + + $("#sent_messages").html(foo); + + /*document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s"; document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s"; document.getElementById("messages_sent").innerHTML = foo.messages_sent; document.getElementById("messages_acked").innerHTML = foo.messages_acked; - document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent); + document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);*/ document.getElementById("admin_connections").innerHTML = foo.admin_connections; @@ -75,7 +101,7 @@ function connect() { client_history = client_history.slice(client_history.length-total_points); } - data2.push([foo.timestamp,foo.bytes]); + /*data2.push([foo.timestamp,foo.bytes]); if (data2.length > total_points) { data2 = data2.slice(data2.length-total_points); } @@ -83,7 +109,7 @@ function connect() { message_history.push([foo.timestamp,foo.messages]); if (message_history.length > total_points) { message_history = message_history.slice(message_history.length-total_points); - } + }*/ /*ack_history.push([foo.timestamp,foo.messages_acked]); if (message_history.length > total_points) { @@ -249,6 +275,17 @@ body,html {
Messages Acked: N/A
Messages Rate: N/A
+ +
+ + + + + + +
idsentackedsizetime
+
+

Data

Bytes Sent: N/A
Data Rate: N/A
diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp index 308235de5a..c8e222da90 100644 --- a/examples/broadcast_server_tls/broadcast_server_tls.cpp +++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp @@ -37,6 +37,18 @@ #include +struct msg { + int id; + size_t sent; + size_t acked; + size_t size; + uint64_t time; + + std::string hash; + boost::posix_time::ptime time_sent; + +}; + typedef websocketpp::endpoint plain_endpoint_type; typedef plain_endpoint_type::handler_ptr plain_handler_ptr; @@ -50,9 +62,11 @@ public: typedef typename endpoint_type::connection_ptr connection_ptr; broadcast_server_handler() - : m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")) { - m_messages = 0; - m_data = 0; + : m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")), + m_nextid(0) + { + m_messages = 0; + m_data = 0; } std::string get_password() const { @@ -98,7 +112,7 @@ public: std::stringstream foo; foo << "{\"type\":\"con\"" - << ",\"timestamp\":" << get_ms() + << ",\"timestamp\":" << get_ms(m_epoch) << ",\"value\":" << m_connections.size() << "}"; @@ -116,7 +130,7 @@ public: std::stringstream foo; foo << "{\"type\":\"con\"" - << ",\"timestamp\":" << get_ms() + << ",\"timestamp\":" << get_ms(m_epoch) << ",\"value\":" << m_connections.size() << "}"; @@ -130,6 +144,7 @@ public: typename std::set::iterator it; if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") { + //std::cout << "got ack" << std::endl; //std::cout << msg->get_payload() << std::endl; // process a // @@ -154,8 +169,7 @@ public: } } - m_ack_stats[msg->get_payload().substr(start+2,32)] = count; - + start = end+1; end = msg->get_payload().find(",",start); } @@ -172,18 +186,35 @@ public: } } + struct msg& m(m_msgs[msg->get_payload().substr(start+2,32)]); + + m.acked += count; + + if (m.acked == m.sent) { + m.time = get_ms(m.time_sent); + } + m_ack_stats[msg->get_payload().substr(start+2,32)] = count; m_messages_acked += count; } else { + std::string hash = websocketpp::md5_hash_hex(msg->get_payload()); + struct msg& new_msg(m_msgs[hash]); + + new_msg.id = m_nextid++; + new_msg.hash = hash; + new_msg.size = msg->get_payload().size(); + new_msg.time_sent = boost::posix_time::microsec_clock::local_time(); + new_msg.time = 0; + // broadcast to clients for (it = m_connections.begin(); it != m_connections.end(); it++) { - std::string hash = websocketpp::md5_hash_hex(msg->get_payload()); - //std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl; m_messages++; m_data += msg->get_payload().size(); (*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY)); } + new_msg.sent = m_connections.size(); + new_msg.acked = 0; // broadcast to admins std::stringstream foo; @@ -202,16 +233,12 @@ public: foo << "\"}"; for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) { - //m_messages++; - //m_data += msg->get_payload().size(); (*it)->send(foo.str(),false); } } connection->recycle(msg); } - - void http(connection_ptr connection) { std::stringstream foo; @@ -221,9 +248,9 @@ public: connection->set_body(foo.str()); } - long get_ms() { + long get_ms(boost::posix_time::ptime s) { boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time(); - boost::posix_time::time_period period(m_epoch,now); + boost::posix_time::time_period period(s,now); return period.length().total_milliseconds(); } @@ -235,32 +262,72 @@ public: //boost::posix_time::time_period period(m_last_time,now); //m_last_time = now; + + + /*{ + type: stats + connections: int + admin_connections: int + messages: [ + { + id: int + hash: string + sent: int + acked: int + + } + ] + }*/ + - long milli_seconds = get_ms(); + long milli_seconds = get_ms(m_epoch); //double seconds = milli_seconds/1000.0; - m_messages_cache = m_messages; - m_data_cache = m_data; + //m_messages_cache = m_messages; + //m_data_cache = m_data; - m_messages_sent += m_messages; - m_data_sent += m_data; + //m_messages_sent += m_messages; + //m_data_sent += m_data; //std::cout << "m: " << m_messages // << " milli: " << milli_seconds // << std::endl; + /* + << ",\"messages\":" << m_messages + << ",\"bytes\":" << m_data + << ",\"messages_sent\":" << m_messages_sent + << ",\"messages_acked\":" << m_messages_acked + << ",\"bytes_sent\":" << m_data_sent + */ + std::stringstream foo; foo << "{\"type\":\"stats\"" << ",\"timestamp\":" << milli_seconds - << ",\"messages\":" << m_messages - << ",\"bytes\":" << m_data - << ",\"messages_sent\":" << m_messages_sent - << ",\"messages_acked\":" << m_messages_acked - << ",\"bytes_sent\":" << m_data_sent << ",\"connections\":" << m_connections.size() << ",\"admin_connections\":" << m_admin_connections.size() - << "}"; + << ",\"messages\":["; + + std::map::iterator msg_it; + std::map::iterator last = m_msgs.end(); + if (m_msgs.size() > 0) { + last--; + } + + for (msg_it = m_msgs.begin(); msg_it != m_msgs.end(); msg_it++) { + foo << "{\"id\":" << (*msg_it).second.id + << ",\"hash\":\"" << (*msg_it).second.hash << "\"" + << ",\"sent\":" << (*msg_it).second.sent + << ",\"acked\":" << (*msg_it).second.acked + << ",\"size\":" << (*msg_it).second.size + << ",\"time\":" << (*msg_it).second.time + << "}"; + } + + foo << "]}"; + + m_msgs.clear(); //<< ((m_messages_cache * seconds)*1000) << ",\"data\":" //<< ((m_data_cache * seconds)*1000) << ",\"messages_sent\":" @@ -272,8 +339,8 @@ public: (*it)->send(foo.str(),false); } - m_messages = 0; - m_data = 0; + //m_messages = 0; + //m_data = 0; //} m_timer->expires_from_now(boost::posix_time::milliseconds(1000)); @@ -297,6 +364,9 @@ private: size_t m_messages_acked; std::map m_ack_stats; + int m_nextid; + std::map m_msgs; + std::set m_connections; std::set m_admin_connections; }; diff --git a/examples/stress_client/stress_client.cpp b/examples/stress_client/stress_client.cpp index 7d098e84ff..5c35e34f89 100644 --- a/examples/stress_client/stress_client.cpp +++ b/examples/stress_client/stress_client.cpp @@ -47,36 +47,18 @@ public: typedef plain_endpoint_type::connection_ptr connection_ptr; void on_open(connection_ptr connection) { - //std::cout << "on_open: " << std::endl; if (!m_timer) { m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0))); - m_timer->expires_from_now(boost::posix_time::milliseconds(1000)); + m_timer->expires_from_now(boost::posix_time::milliseconds(250)); m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error)); } } - void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) { - // SHA1 or MD5 the message and - - //std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl; - - m_total++; + void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) { m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++; - - /*if (connection->get_resource() == "/getCaseCount") { - std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl; - m_case_count = atoi(msg->get_payload().c_str()); - } else { - connection->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY)); - }*/ - connection->recycle(msg); } - void http(connection_ptr connection) { - //connection->set_body("HTTP Response!!"); - } - void on_fail(connection_ptr connection) { std::cout << "connection failed" << std::endl; } @@ -87,9 +69,23 @@ public: return; } - //std::cout << "on_timer: " << m_total << std::endl; - std::stringstream foo; - foo << "{\"type\":\"acks\",\"messages\":["; + send_stats_update(connection); + + m_timer->expires_from_now(boost::posix_time::milliseconds(250)); + m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error)); + } + + void on_close(connection_ptr connection) { + m_timer->cancel(); + } +private: + void send_stats_update(connection_ptr connection) { + if (m_msg_stats.empty()) { + return; + } + + std::stringstream msg; + msg << "{\"type\":\"acks\",\"messages\":["; std::map::iterator it; std::map::iterator last = m_msg_stats.end(); @@ -98,25 +94,16 @@ public: } for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) { - foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : ""); + msg << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : ""); } - foo << "]}"; + msg << "]}"; - //std::cout << "Sending " << foo.str() << std::endl; - - connection->send(foo.str(),false); - - m_timer->expires_from_now(boost::posix_time::milliseconds(1000)); - m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error)); + connection->send(msg.str(),false); + m_msg_stats.clear(); } - void on_close(connection_ptr connection) { - m_timer->cancel(); - } - - size_t m_total; - std::map m_msg_stats; - boost::shared_ptr m_timer; + std::map m_msg_stats; + boost::shared_ptr m_timer; }; @@ -179,7 +166,6 @@ int main(int argc, char* argv[]) { std::set connections; - connections.insert(endpoint.connect(uri)); boost::thread t(boost::bind(&plain_endpoint_type::run, &endpoint)); @@ -195,18 +181,8 @@ int main(int argc, char* argv[]) { std::cout << "complete" << std::endl; - //endpoint.run(); - - /*char line[512]; - while (std::cin.getline(line, 512)) { - std::iterator - - c->send(line); - }*/ - t.join(); - std::cout << "done" << std::endl; } catch (std::exception& e) { diff --git a/src/md5/md5.h b/src/md5/md5.h index d76ddcf2cf..3f4d426025 100644 --- a/src/md5/md5.h +++ b/src/md5/md5.h @@ -60,6 +60,8 @@ * efficiently on either one than if ARCH_IS_BIG_ENDIAN is defined. */ +#include + typedef unsigned char md5_byte_t; /* 8-bit byte */ typedef unsigned int md5_word_t; /* 32-bit word */