work on acknowledging broadcasted messages

This commit is contained in:
Peter Thorson
2011-12-14 19:55:10 -06:00
parent 3e9dc3f9df
commit 10b28259f5
4 changed files with 166 additions and 81 deletions

View File

@@ -22,6 +22,7 @@ var message_history = [];
var ack_history = [];
var client_history = [];
var msgs = {};
function connect() {
url = document.getElementById("server_url").value;
@@ -62,11 +63,36 @@ function connect() {
} else if (foo.type == "con") {
document.getElementById("connected_clients").innerHTML = foo.value;
} else if (foo.type == "stats") {
document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
console.log(foo);
for (var i in foo.messages) {
var hash = foo.messages[i].hash;
if (hash in msgs) {
msgs[hash]["sent"] += foo.messages[i].sent;
msgs[hash]["acked"] += foo.messages[i].acked;
msgs[hash]["time"] = foo.messages[i].time;
} else {
msgs[hash] = {"id":foo.messages[i].id,
"sent":foo.messages[i].sent,
"acked":foo.messages[i].acked,
"size":foo.messages[i].size,
"time":foo.messages[i].time}
}
}
var foo = "";
for (i in msgs) {
foo += "<tr><td>"+msgs[i].id+"</td><td>"+msgs[i].sent+"</td><td>"+msgs[i].acked+"</td><td>"+msgs[i].size+"</td><td>"+msgs[i].time+"</td></tr>";
}
$("#sent_messages").html(foo);
/*document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s";
document.getElementById("messages_sent").innerHTML = foo.messages_sent;
document.getElementById("messages_acked").innerHTML = foo.messages_acked;
document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);
document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);*/
document.getElementById("admin_connections").innerHTML = foo.admin_connections;
@@ -75,7 +101,7 @@ function connect() {
client_history = client_history.slice(client_history.length-total_points);
}
data2.push([foo.timestamp,foo.bytes]);
/*data2.push([foo.timestamp,foo.bytes]);
if (data2.length > total_points) {
data2 = data2.slice(data2.length-total_points);
}
@@ -83,7 +109,7 @@ function connect() {
message_history.push([foo.timestamp,foo.messages]);
if (message_history.length > total_points) {
message_history = message_history.slice(message_history.length-total_points);
}
}*/
/*ack_history.push([foo.timestamp,foo.messages_acked]);
if (message_history.length > total_points) {
@@ -249,6 +275,17 @@ body,html {
<div>Messages Acked: <span id="messages_acked">N/A</span><br /></div>
<div>Messages Rate: <span id="messages_per_sec">N/A</span><br /></div>
<div id="message_history" style="width:320px;height:200px;"></div>
<div>
<table>
<thead>
<tr><th>id</th><th>sent</th><th>acked</th><th>size</th><th>time</th></tr>
</thead>
<tbody id="sent_messages">
</tbody>
</table>
</div>
<h3>Data</h3>
<div>Bytes Sent: <span id="bytes_sent">N/A</span><br /></div>
<div>Data Rate: <span id="bytes_per_sec">N/A</span><br /></div>

View File

@@ -37,6 +37,18 @@
#include <sys/resource.h>
struct msg {
int id;
size_t sent;
size_t acked;
size_t size;
uint64_t time;
std::string hash;
boost::posix_time::ptime time_sent;
};
typedef websocketpp::endpoint<websocketpp::role::server,websocketpp::socket::plain> plain_endpoint_type;
typedef plain_endpoint_type::handler_ptr plain_handler_ptr;
@@ -50,9 +62,11 @@ public:
typedef typename endpoint_type::connection_ptr connection_ptr;
broadcast_server_handler()
: m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")) {
m_messages = 0;
m_data = 0;
: m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")),
m_nextid(0)
{
m_messages = 0;
m_data = 0;
}
std::string get_password() const {
@@ -98,7 +112,7 @@ public:
std::stringstream foo;
foo << "{\"type\":\"con\""
<< ",\"timestamp\":" << get_ms()
<< ",\"timestamp\":" << get_ms(m_epoch)
<< ",\"value\":" << m_connections.size()
<< "}";
@@ -116,7 +130,7 @@ public:
std::stringstream foo;
foo << "{\"type\":\"con\""
<< ",\"timestamp\":" << get_ms()
<< ",\"timestamp\":" << get_ms(m_epoch)
<< ",\"value\":" << m_connections.size()
<< "}";
@@ -130,6 +144,7 @@ public:
typename std::set<connection_ptr>::iterator it;
if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") {
//std::cout << "got ack" << std::endl;
//std::cout << msg->get_payload() << std::endl;
// process a
//
@@ -154,8 +169,7 @@ public:
}
}
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
start = end+1;
end = msg->get_payload().find(",",start);
}
@@ -172,18 +186,35 @@ public:
}
}
struct msg& m(m_msgs[msg->get_payload().substr(start+2,32)]);
m.acked += count;
if (m.acked == m.sent) {
m.time = get_ms(m.time_sent);
}
m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
m_messages_acked += count;
} else {
std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
struct msg& new_msg(m_msgs[hash]);
new_msg.id = m_nextid++;
new_msg.hash = hash;
new_msg.size = msg->get_payload().size();
new_msg.time_sent = boost::posix_time::microsec_clock::local_time();
new_msg.time = 0;
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
//std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl;
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}
new_msg.sent = m_connections.size();
new_msg.acked = 0;
// broadcast to admins
std::stringstream foo;
@@ -202,16 +233,12 @@ public:
foo << "\"}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
//m_messages++;
//m_data += msg->get_payload().size();
(*it)->send(foo.str(),false);
}
}
connection->recycle(msg);
}
void http(connection_ptr connection) {
std::stringstream foo;
@@ -221,9 +248,9 @@ public:
connection->set_body(foo.str());
}
long get_ms() {
long get_ms(boost::posix_time::ptime s) {
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_period period(m_epoch,now);
boost::posix_time::time_period period(s,now);
return period.length().total_milliseconds();
}
@@ -235,32 +262,72 @@ public:
//boost::posix_time::time_period period(m_last_time,now);
//m_last_time = now;
/*{
type: stats
connections: int
admin_connections: int
messages: [
{
id: int
hash: string
sent: int
acked: int
}
]
}*/
long milli_seconds = get_ms();
long milli_seconds = get_ms(m_epoch);
//double seconds = milli_seconds/1000.0;
m_messages_cache = m_messages;
m_data_cache = m_data;
//m_messages_cache = m_messages;
//m_data_cache = m_data;
m_messages_sent += m_messages;
m_data_sent += m_data;
//m_messages_sent += m_messages;
//m_data_sent += m_data;
//std::cout << "m: " << m_messages
// << " milli: " << milli_seconds
// << std::endl;
/*
<< ",\"messages\":" << m_messages
<< ",\"bytes\":" << m_data
<< ",\"messages_sent\":" << m_messages_sent
<< ",\"messages_acked\":" << m_messages_acked
<< ",\"bytes_sent\":" << m_data_sent
*/
std::stringstream foo;
foo << "{\"type\":\"stats\""
<< ",\"timestamp\":" << milli_seconds
<< ",\"messages\":" << m_messages
<< ",\"bytes\":" << m_data
<< ",\"messages_sent\":" << m_messages_sent
<< ",\"messages_acked\":" << m_messages_acked
<< ",\"bytes_sent\":" << m_data_sent
<< ",\"connections\":" << m_connections.size()
<< ",\"admin_connections\":" << m_admin_connections.size()
<< "}";
<< ",\"messages\":[";
std::map<std::string,struct msg>::iterator msg_it;
std::map<std::string,struct msg>::iterator last = m_msgs.end();
if (m_msgs.size() > 0) {
last--;
}
for (msg_it = m_msgs.begin(); msg_it != m_msgs.end(); msg_it++) {
foo << "{\"id\":" << (*msg_it).second.id
<< ",\"hash\":\"" << (*msg_it).second.hash << "\""
<< ",\"sent\":" << (*msg_it).second.sent
<< ",\"acked\":" << (*msg_it).second.acked
<< ",\"size\":" << (*msg_it).second.size
<< ",\"time\":" << (*msg_it).second.time
<< "}";
}
foo << "]}";
m_msgs.clear();
//<< ((m_messages_cache * seconds)*1000) << ",\"data\":"
//<< ((m_data_cache * seconds)*1000) << ",\"messages_sent\":"
@@ -272,8 +339,8 @@ public:
(*it)->send(foo.str(),false);
}
m_messages = 0;
m_data = 0;
//m_messages = 0;
//m_data = 0;
//}
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
@@ -297,6 +364,9 @@ private:
size_t m_messages_acked;
std::map<std::string,size_t> m_ack_stats;
int m_nextid;
std::map<std::string,struct msg> m_msgs;
std::set<connection_ptr> m_connections;
std::set<connection_ptr> m_admin_connections;
};

View File

@@ -47,36 +47,18 @@ public:
typedef plain_endpoint_type::connection_ptr connection_ptr;
void on_open(connection_ptr connection) {
//std::cout << "on_open: " << std::endl;
if (!m_timer) {
m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->expires_from_now(boost::posix_time::milliseconds(250));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
}
}
void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
// SHA1 or MD5 the message and
//std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl;
m_total++;
void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++;
/*if (connection->get_resource() == "/getCaseCount") {
std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl;
m_case_count = atoi(msg->get_payload().c_str());
} else {
connection->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}*/
connection->recycle(msg);
}
void http(connection_ptr connection) {
//connection->set_body("HTTP Response!!");
}
void on_fail(connection_ptr connection) {
std::cout << "connection failed" << std::endl;
}
@@ -87,9 +69,23 @@ public:
return;
}
//std::cout << "on_timer: " << m_total << std::endl;
std::stringstream foo;
foo << "{\"type\":\"acks\",\"messages\":[";
send_stats_update(connection);
m_timer->expires_from_now(boost::posix_time::milliseconds(250));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
}
void on_close(connection_ptr connection) {
m_timer->cancel();
}
private:
void send_stats_update(connection_ptr connection) {
if (m_msg_stats.empty()) {
return;
}
std::stringstream msg;
msg << "{\"type\":\"acks\",\"messages\":[";
std::map<std::string,size_t>::iterator it;
std::map<std::string,size_t>::iterator last = m_msg_stats.end();
@@ -98,25 +94,16 @@ public:
}
for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) {
foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
msg << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
}
foo << "]}";
msg << "]}";
//std::cout << "Sending " << foo.str() << std::endl;
connection->send(foo.str(),false);
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
connection->send(msg.str(),false);
m_msg_stats.clear();
}
void on_close(connection_ptr connection) {
m_timer->cancel();
}
size_t m_total;
std::map<std::string,size_t> m_msg_stats;
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
std::map<std::string,size_t> m_msg_stats;
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
};
@@ -179,7 +166,6 @@ int main(int argc, char* argv[]) {
std::set<connection_ptr> connections;
connections.insert(endpoint.connect(uri));
boost::thread t(boost::bind(&plain_endpoint_type::run, &endpoint));
@@ -195,18 +181,8 @@ int main(int argc, char* argv[]) {
std::cout << "complete" << std::endl;
//endpoint.run();
/*char line[512];
while (std::cin.getline(line, 512)) {
std::iterator
c->send(line);
}*/
t.join();
std::cout << "done" << std::endl;
} catch (std::exception& e) {

View File

@@ -60,6 +60,8 @@
* efficiently on either one than if ARCH_IS_BIG_ENDIAN is defined.
*/
#include <stddef.h>
typedef unsigned char md5_byte_t; /* 8-bit byte */
typedef unsigned int md5_word_t; /* 32-bit word */