begins work to track broadcast message acks

This commit is contained in:
Peter Thorson
2011-12-14 15:09:00 -06:00
parent 9e14fba2f7
commit 3e9dc3f9df
3 changed files with 149 additions and 34 deletions

View File

@@ -19,6 +19,7 @@ var data2 = [], total_points = 60;
var stale_data = null;
var message_history = [];
var ack_history = [];
var client_history = [];
@@ -64,6 +65,7 @@ function connect() {
document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s";
document.getElementById("messages_sent").innerHTML = foo.messages_sent;
document.getElementById("messages_acked").innerHTML = foo.messages_acked;
document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);
document.getElementById("admin_connections").innerHTML = foo.admin_connections;
@@ -82,6 +84,11 @@ function connect() {
if (message_history.length > total_points) {
message_history = message_history.slice(message_history.length-total_points);
}
/*ack_history.push([foo.timestamp,foo.messages_acked]);
if (message_history.length > total_points) {
message_history = message_history.slice(message_history.length-total_points);
}*/
} else {
document.getElementById("messages").innerHTML += "Unrecognized Server Command.<br />";
}
@@ -97,6 +104,7 @@ function clear_hud() {
document.getElementById("bytes_per_sec").innerHTML = "N/A";
document.getElementById("messages_sent").innerHTML = "N/A";
document.getElementById("bytes_sent").innerHTML = "N/A";
document.getElementById("messages_acked").innerHTML = "N/A";
}
function disconnect() {
@@ -238,6 +246,7 @@ body,html {
<div id="client_history" style="width:320px;height:200px;"></div>
<h3>Messages</h3>
<div>Messages Sent: <span id="messages_sent">N/A</span><br /></div>
<div>Messages Acked: <span id="messages_acked">N/A</span><br /></div>
<div>Messages Rate: <span id="messages_per_sec">N/A</span><br /></div>
<div id="message_history" style="width:320px;height:200px;"></div>
<h3>Data</h3>

View File

@@ -28,6 +28,7 @@
#include "../../src/endpoint.hpp"
#include "../../src/roles/server.hpp"
#include "../../src/sockets/ssl.hpp"
#include "../../src/md5/md5.hpp"
#include <boost/date_time/posix_time/posix_time.hpp>
@@ -128,39 +129,89 @@ public:
void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) {
typename std::set<connection_ptr>::iterator it;
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}
// broadcast to admins
std::stringstream foo;
foo << "{\"type\":\"message\",\"value\":\"";
if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) {
foo << "[Binary Message, length: " << msg->get_payload().size() << "]";
} else {
if (msg->get_payload().size() > 126) {
foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]";
} else {
foo << msg->get_payload();
if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") {
//std::cout << msg->get_payload() << std::endl;
// process a
//
//{"type":"acks","messages":[{"e3458d0aceff8b70a3e5c0afec632881":38},{"e3458d0aceff8b70a3e5c0afec632881":38}]}
std::string::size_type start = 27;
std::string::size_type end = msg->get_payload().find(",",start);
size_t count;
m_messages_acked = 0;
while (end != std::string::npos) {
if (end-start < 38) {
// error, not the input we were expecting
continue;
} else {
count = atol(msg->get_payload().substr(start+36,end-2).c_str());
if (count == 0) {
// error parsing number
continue;
}
}
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
start = end+1;
end = msg->get_payload().find(",",start);
}
}
foo << "\"}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(foo.str(),false);
}
end = msg->get_payload().size();
// get the last value
if (end-start < 38) {
// error, not the input we were expecting
} else {
count = atol(msg->get_payload().substr(start+36,end-4).c_str());
if (count == 0) {
// error parsing number
}
}
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
m_messages_acked += count;
} else {
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
//std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl;
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}
// broadcast to admins
std::stringstream foo;
foo << "{\"type\":\"message\",\"value\":\"";
if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) {
foo << "[Binary Message, length: " << msg->get_payload().size() << "]";
} else {
if (msg->get_payload().size() > 126) {
foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]";
} else {
foo << msg->get_payload();
}
}
foo << "\"}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
//m_messages++;
//m_data += msg->get_payload().size();
(*it)->send(foo.str(),false);
}
}
connection->recycle(msg);
}
void http(connection_ptr connection) {
std::stringstream foo;
@@ -205,6 +256,7 @@ public:
<< ",\"messages\":" << m_messages
<< ",\"bytes\":" << m_data
<< ",\"messages_sent\":" << m_messages_sent
<< ",\"messages_acked\":" << m_messages_acked
<< ",\"bytes_sent\":" << m_data_sent
<< ",\"connections\":" << m_connections.size()
<< ",\"admin_connections\":" << m_admin_connections.size()
@@ -242,6 +294,9 @@ private:
boost::posix_time::ptime m_epoch;
boost::posix_time::ptime m_last_time;
size_t m_messages_acked;
std::map<std::string,size_t> m_ack_stats;
std::set<connection_ptr> m_connections;
std::set<connection_ptr> m_admin_connections;
};

View File

@@ -27,6 +27,7 @@
#include "../../src/endpoint.hpp"
#include "../../src/roles/client.hpp"
#include "../../src/md5/md5.hpp"
#include <boost/thread.hpp>
@@ -45,8 +46,24 @@ public:
typedef echo_client_handler type;
typedef plain_endpoint_type::connection_ptr connection_ptr;
void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) {
/*if (connection->get_resource() == "/getCaseCount") {
void on_open(connection_ptr connection) {
//std::cout << "on_open: " << std::endl;
if (!m_timer) {
m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
}
}
void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
// SHA1 or MD5 the message and
//std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl;
m_total++;
m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++;
/*if (connection->get_resource() == "/getCaseCount") {
std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl;
m_case_count = atoi(msg->get_payload().c_str());
} else {
@@ -64,15 +81,49 @@ public:
std::cout << "connection failed" << std::endl;
}
int m_case_count;
void on_timer(connection_ptr connection,const boost::system::error_code& error) {
if (error) {
std::cout << "on_timer error" << std::endl;
return;
}
//std::cout << "on_timer: " << m_total << std::endl;
std::stringstream foo;
foo << "{\"type\":\"acks\",\"messages\":[";
std::map<std::string,size_t>::iterator it;
std::map<std::string,size_t>::iterator last = m_msg_stats.end();
if (m_msg_stats.size() > 0) {
last--;
}
for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) {
foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
}
foo << "]}";
//std::cout << "Sending " << foo.str() << std::endl;
connection->send(foo.str(),false);
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
}
void on_close(connection_ptr connection) {
m_timer->cancel();
}
size_t m_total;
std::map<std::string,size_t> m_msg_stats;
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
};
int main(int argc, char* argv[]) {
std::string uri = "ws://localhost:9002/";
int num_batches = 1;
int batch_size = 50;
int batch_size = 1;
if (argc != 4) {
std::cout << "Usage: `echo_client test_url num_batches batch_size`" << std::endl;