diff --git a/examples/broadcast_server_tls/broadcast_admin.html b/examples/broadcast_server_tls/broadcast_admin.html
index d63c7521cc..6024cc175c 100644
--- a/examples/broadcast_server_tls/broadcast_admin.html
+++ b/examples/broadcast_server_tls/broadcast_admin.html
@@ -22,6 +22,7 @@ var message_history = [];
var ack_history = [];
var client_history = [];
+var msgs = {};
function connect() {
url = document.getElementById("server_url").value;
@@ -62,11 +63,36 @@ function connect() {
} else if (foo.type == "con") {
document.getElementById("connected_clients").innerHTML = foo.value;
} else if (foo.type == "stats") {
- document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
+ console.log(foo);
+ for (var i in foo.messages) {
+ var hash = foo.messages[i].hash;
+
+ if (hash in msgs) {
+ msgs[hash]["sent"] += foo.messages[i].sent;
+ msgs[hash]["acked"] += foo.messages[i].acked;
+ msgs[hash]["time"] = foo.messages[i].time;
+ } else {
+ msgs[hash] = {"id":foo.messages[i].id,
+ "sent":foo.messages[i].sent,
+ "acked":foo.messages[i].acked,
+ "size":foo.messages[i].size,
+ "time":foo.messages[i].time}
+ }
+ }
+
+ var foo = "";
+
+ for (i in msgs) {
+ foo += "
| "+msgs[i].id+" | "+msgs[i].sent+" | "+msgs[i].acked+" | "+msgs[i].size+" | "+msgs[i].time+" |
";
+ }
+
+ $("#sent_messages").html(foo);
+
+ /*document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s";
document.getElementById("messages_sent").innerHTML = foo.messages_sent;
document.getElementById("messages_acked").innerHTML = foo.messages_acked;
- document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);
+ document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);*/
document.getElementById("admin_connections").innerHTML = foo.admin_connections;
@@ -75,7 +101,7 @@ function connect() {
client_history = client_history.slice(client_history.length-total_points);
}
- data2.push([foo.timestamp,foo.bytes]);
+ /*data2.push([foo.timestamp,foo.bytes]);
if (data2.length > total_points) {
data2 = data2.slice(data2.length-total_points);
}
@@ -83,7 +109,7 @@ function connect() {
message_history.push([foo.timestamp,foo.messages]);
if (message_history.length > total_points) {
message_history = message_history.slice(message_history.length-total_points);
- }
+ }*/
/*ack_history.push([foo.timestamp,foo.messages_acked]);
if (message_history.length > total_points) {
@@ -249,6 +275,17 @@ body,html {
Messages Acked: N/A
Messages Rate: N/A
+
+
+
+
+ | id | sent | acked | size | time |
+
+
+
+
+
+
Data
Bytes Sent: N/A
Data Rate: N/A
diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp
index 308235de5a..c8e222da90 100644
--- a/examples/broadcast_server_tls/broadcast_server_tls.cpp
+++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp
@@ -37,6 +37,18 @@
#include
+struct msg {
+ int id;
+ size_t sent;
+ size_t acked;
+ size_t size;
+ uint64_t time;
+
+ std::string hash;
+ boost::posix_time::ptime time_sent;
+
+};
+
typedef websocketpp::endpoint plain_endpoint_type;
typedef plain_endpoint_type::handler_ptr plain_handler_ptr;
@@ -50,9 +62,11 @@ public:
typedef typename endpoint_type::connection_ptr connection_ptr;
broadcast_server_handler()
- : m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")) {
- m_messages = 0;
- m_data = 0;
+ : m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")),
+ m_nextid(0)
+ {
+ m_messages = 0;
+ m_data = 0;
}
std::string get_password() const {
@@ -98,7 +112,7 @@ public:
std::stringstream foo;
foo << "{\"type\":\"con\""
- << ",\"timestamp\":" << get_ms()
+ << ",\"timestamp\":" << get_ms(m_epoch)
<< ",\"value\":" << m_connections.size()
<< "}";
@@ -116,7 +130,7 @@ public:
std::stringstream foo;
foo << "{\"type\":\"con\""
- << ",\"timestamp\":" << get_ms()
+ << ",\"timestamp\":" << get_ms(m_epoch)
<< ",\"value\":" << m_connections.size()
<< "}";
@@ -130,6 +144,7 @@ public:
typename std::set::iterator it;
if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") {
+ //std::cout << "got ack" << std::endl;
//std::cout << msg->get_payload() << std::endl;
// process a
//
@@ -154,8 +169,7 @@ public:
}
}
- m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
-
+
start = end+1;
end = msg->get_payload().find(",",start);
}
@@ -172,18 +186,35 @@ public:
}
}
+ struct msg& m(m_msgs[msg->get_payload().substr(start+2,32)]);
+
+ m.acked += count;
+
+ if (m.acked == m.sent) {
+ m.time = get_ms(m.time_sent);
+ }
+
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
m_messages_acked += count;
} else {
+ std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
+ struct msg& new_msg(m_msgs[hash]);
+
+ new_msg.id = m_nextid++;
+ new_msg.hash = hash;
+ new_msg.size = msg->get_payload().size();
+ new_msg.time_sent = boost::posix_time::microsec_clock::local_time();
+ new_msg.time = 0;
+
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
- std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
-
//std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl;
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}
+ new_msg.sent = m_connections.size();
+ new_msg.acked = 0;
// broadcast to admins
std::stringstream foo;
@@ -202,16 +233,12 @@ public:
foo << "\"}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
- //m_messages++;
- //m_data += msg->get_payload().size();
(*it)->send(foo.str(),false);
}
}
connection->recycle(msg);
}
-
-
void http(connection_ptr connection) {
std::stringstream foo;
@@ -221,9 +248,9 @@ public:
connection->set_body(foo.str());
}
- long get_ms() {
+ long get_ms(boost::posix_time::ptime s) {
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
- boost::posix_time::time_period period(m_epoch,now);
+ boost::posix_time::time_period period(s,now);
return period.length().total_milliseconds();
}
@@ -235,32 +262,72 @@ public:
//boost::posix_time::time_period period(m_last_time,now);
//m_last_time = now;
+
+
+ /*{
+ type: stats
+ connections: int
+ admin_connections: int
+ messages: [
+ {
+ id: int
+ hash: string
+ sent: int
+ acked: int
+
+ }
+ ]
+ }*/
+
- long milli_seconds = get_ms();
+ long milli_seconds = get_ms(m_epoch);
//double seconds = milli_seconds/1000.0;
- m_messages_cache = m_messages;
- m_data_cache = m_data;
+ //m_messages_cache = m_messages;
+ //m_data_cache = m_data;
- m_messages_sent += m_messages;
- m_data_sent += m_data;
+ //m_messages_sent += m_messages;
+ //m_data_sent += m_data;
//std::cout << "m: " << m_messages
// << " milli: " << milli_seconds
// << std::endl;
+ /*
+ << ",\"messages\":" << m_messages
+ << ",\"bytes\":" << m_data
+ << ",\"messages_sent\":" << m_messages_sent
+ << ",\"messages_acked\":" << m_messages_acked
+ << ",\"bytes_sent\":" << m_data_sent
+ */
+
std::stringstream foo;
foo << "{\"type\":\"stats\""
<< ",\"timestamp\":" << milli_seconds
- << ",\"messages\":" << m_messages
- << ",\"bytes\":" << m_data
- << ",\"messages_sent\":" << m_messages_sent
- << ",\"messages_acked\":" << m_messages_acked
- << ",\"bytes_sent\":" << m_data_sent
<< ",\"connections\":" << m_connections.size()
<< ",\"admin_connections\":" << m_admin_connections.size()
- << "}";
+ << ",\"messages\":[";
+
+ std::map::iterator msg_it;
+ std::map::iterator last = m_msgs.end();
+ if (m_msgs.size() > 0) {
+ last--;
+ }
+
+ for (msg_it = m_msgs.begin(); msg_it != m_msgs.end(); msg_it++) {
+ foo << "{\"id\":" << (*msg_it).second.id
+ << ",\"hash\":\"" << (*msg_it).second.hash << "\""
+ << ",\"sent\":" << (*msg_it).second.sent
+ << ",\"acked\":" << (*msg_it).second.acked
+ << ",\"size\":" << (*msg_it).second.size
+ << ",\"time\":" << (*msg_it).second.time
+ << "}";
+ }
+
+ foo << "]}";
+
+ m_msgs.clear();
//<< ((m_messages_cache * seconds)*1000) << ",\"data\":"
//<< ((m_data_cache * seconds)*1000) << ",\"messages_sent\":"
@@ -272,8 +339,8 @@ public:
(*it)->send(foo.str(),false);
}
- m_messages = 0;
- m_data = 0;
+ //m_messages = 0;
+ //m_data = 0;
//}
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
@@ -297,6 +364,9 @@ private:
size_t m_messages_acked;
std::map m_ack_stats;
+ int m_nextid;
+ std::map m_msgs;
+
std::set m_connections;
std::set m_admin_connections;
};
diff --git a/examples/stress_client/stress_client.cpp b/examples/stress_client/stress_client.cpp
index 7d098e84ff..5c35e34f89 100644
--- a/examples/stress_client/stress_client.cpp
+++ b/examples/stress_client/stress_client.cpp
@@ -47,36 +47,18 @@ public:
typedef plain_endpoint_type::connection_ptr connection_ptr;
void on_open(connection_ptr connection) {
- //std::cout << "on_open: " << std::endl;
if (!m_timer) {
m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
- m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
+ m_timer->expires_from_now(boost::posix_time::milliseconds(250));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
}
}
- void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
- // SHA1 or MD5 the message and
-
- //std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl;
-
- m_total++;
+ void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++;
-
- /*if (connection->get_resource() == "/getCaseCount") {
- std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl;
- m_case_count = atoi(msg->get_payload().c_str());
- } else {
- connection->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
- }*/
-
connection->recycle(msg);
}
- void http(connection_ptr connection) {
- //connection->set_body("HTTP Response!!");
- }
-
void on_fail(connection_ptr connection) {
std::cout << "connection failed" << std::endl;
}
@@ -87,9 +69,23 @@ public:
return;
}
- //std::cout << "on_timer: " << m_total << std::endl;
- std::stringstream foo;
- foo << "{\"type\":\"acks\",\"messages\":[";
+ send_stats_update(connection);
+
+ m_timer->expires_from_now(boost::posix_time::milliseconds(250));
+ m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
+ }
+
+ void on_close(connection_ptr connection) {
+ m_timer->cancel();
+ }
+private:
+ void send_stats_update(connection_ptr connection) {
+ if (m_msg_stats.empty()) {
+ return;
+ }
+
+ std::stringstream msg;
+ msg << "{\"type\":\"acks\",\"messages\":[";
std::map::iterator it;
std::map::iterator last = m_msg_stats.end();
@@ -98,25 +94,16 @@ public:
}
for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) {
- foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
+ msg << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
}
- foo << "]}";
+ msg << "]}";
- //std::cout << "Sending " << foo.str() << std::endl;
-
- connection->send(foo.str(),false);
-
- m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
- m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
+ connection->send(msg.str(),false);
+ m_msg_stats.clear();
}
- void on_close(connection_ptr connection) {
- m_timer->cancel();
- }
-
- size_t m_total;
- std::map m_msg_stats;
- boost::shared_ptr m_timer;
+ std::map m_msg_stats;
+ boost::shared_ptr m_timer;
};
@@ -179,7 +166,6 @@ int main(int argc, char* argv[]) {
std::set connections;
-
connections.insert(endpoint.connect(uri));
boost::thread t(boost::bind(&plain_endpoint_type::run, &endpoint));
@@ -195,18 +181,8 @@ int main(int argc, char* argv[]) {
std::cout << "complete" << std::endl;
- //endpoint.run();
-
- /*char line[512];
- while (std::cin.getline(line, 512)) {
- std::iterator
-
- c->send(line);
- }*/
-
t.join();
-
std::cout << "done" << std::endl;
} catch (std::exception& e) {
diff --git a/src/md5/md5.h b/src/md5/md5.h
index d76ddcf2cf..3f4d426025 100644
--- a/src/md5/md5.h
+++ b/src/md5/md5.h
@@ -60,6 +60,8 @@
* efficiently on either one than if ARCH_IS_BIG_ENDIAN is defined.
*/
+#include
+
typedef unsigned char md5_byte_t; /* 8-bit byte */
typedef unsigned int md5_word_t; /* 32-bit word */