diff --git a/examples/broadcast_server_tls/broadcast_admin.html b/examples/broadcast_server_tls/broadcast_admin.html
index abc21d0056..d63c7521cc 100644
--- a/examples/broadcast_server_tls/broadcast_admin.html
+++ b/examples/broadcast_server_tls/broadcast_admin.html
@@ -19,6 +19,7 @@ var data2 = [], total_points = 60;
var stale_data = null;
var message_history = [];
+var ack_history = [];
var client_history = [];
@@ -64,6 +65,7 @@ function connect() {
document.getElementById("messages_per_sec").innerHTML = foo.messages+"/s";
document.getElementById("bytes_per_sec").innerHTML = format_data(foo.bytes)+"/s";
document.getElementById("messages_sent").innerHTML = foo.messages_sent;
+ document.getElementById("messages_acked").innerHTML = foo.messages_acked;
document.getElementById("bytes_sent").innerHTML = format_data(foo.bytes_sent);
document.getElementById("admin_connections").innerHTML = foo.admin_connections;
@@ -82,6 +84,11 @@ function connect() {
if (message_history.length > total_points) {
message_history = message_history.slice(message_history.length-total_points);
}
+
+ /*ack_history.push([foo.timestamp,foo.messages_acked]);
+ if (message_history.length > total_points) {
+ message_history = message_history.slice(message_history.length-total_points);
+ }*/
} else {
document.getElementById("messages").innerHTML += "Unrecognized Server Command.
";
}
@@ -97,6 +104,7 @@ function clear_hud() {
document.getElementById("bytes_per_sec").innerHTML = "N/A";
document.getElementById("messages_sent").innerHTML = "N/A";
document.getElementById("bytes_sent").innerHTML = "N/A";
+ document.getElementById("messages_acked").innerHTML = "N/A";
}
function disconnect() {
@@ -238,6 +246,7 @@ body,html {
Messages
Messages Sent: N/A
+ Messages Acked: N/A
Messages Rate: N/A
Data
diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp
index 0df71c0532..308235de5a 100644
--- a/examples/broadcast_server_tls/broadcast_server_tls.cpp
+++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp
@@ -28,6 +28,7 @@
#include "../../src/endpoint.hpp"
#include "../../src/roles/server.hpp"
#include "../../src/sockets/ssl.hpp"
+#include "../../src/md5/md5.hpp"
#include
@@ -128,39 +129,89 @@ public:
void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) {
typename std::set::iterator it;
-
-
- // broadcast to clients
- for (it = m_connections.begin(); it != m_connections.end(); it++) {
- m_messages++;
- m_data += msg->get_payload().size();
- (*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
- }
-
- // broadcast to admins
- std::stringstream foo;
- foo << "{\"type\":\"message\",\"value\":\"";
-
- if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) {
- foo << "[Binary Message, length: " << msg->get_payload().size() << "]";
- } else {
- if (msg->get_payload().size() > 126) {
- foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]";
- } else {
- foo << msg->get_payload();
+ if (msg->get_payload().substr(0,27) == "{\"type\":\"acks\",\"messages\":[") {
+ //std::cout << msg->get_payload() << std::endl;
+ // process a
+ //
+
+ //{"type":"acks","messages":[{"e3458d0aceff8b70a3e5c0afec632881":38},{"e3458d0aceff8b70a3e5c0afec632881":38}]}
+
+ std::string::size_type start = 27;
+ std::string::size_type end = msg->get_payload().find(",",start);
+ size_t count;
+
+ m_messages_acked = 0;
+
+ while (end != std::string::npos) {
+ if (end-start < 38) {
+ // error, not the input we were expecting
+ continue;
+ } else {
+ count = atol(msg->get_payload().substr(start+36,end-2).c_str());
+ if (count == 0) {
+ // error parsing number
+ continue;
+ }
+ }
+
+ m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
+
+ start = end+1;
+ end = msg->get_payload().find(",",start);
}
- }
-
- foo << "\"}";
-
- for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
- m_messages++;
- m_data += msg->get_payload().size();
- (*it)->send(foo.str(),false);
- }
+
+ end = msg->get_payload().size();
+
+ // get the last value
+ if (end-start < 38) {
+ // error, not the input we were expecting
+ } else {
+ count = atol(msg->get_payload().substr(start+36,end-4).c_str());
+ if (count == 0) {
+ // error parsing number
+ }
+ }
+
+ m_ack_stats[msg->get_payload().substr(start+2,32)] = count;
+ m_messages_acked += count;
+ } else {
+ // broadcast to clients
+ for (it = m_connections.begin(); it != m_connections.end(); it++) {
+ std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
+
+ //std::cout << "sending message: (" << hash.size() << ") " << hash << std::endl;
+ m_messages++;
+ m_data += msg->get_payload().size();
+ (*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
+ }
+
+ // broadcast to admins
+ std::stringstream foo;
+ foo << "{\"type\":\"message\",\"value\":\"";
+
+ if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) {
+ foo << "[Binary Message, length: " << msg->get_payload().size() << "]";
+ } else {
+ if (msg->get_payload().size() > 126) {
+ foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]";
+ } else {
+ foo << msg->get_payload();
+ }
+ }
+
+ foo << "\"}";
+
+ for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
+ //m_messages++;
+ //m_data += msg->get_payload().size();
+ (*it)->send(foo.str(),false);
+ }
+ }
connection->recycle(msg);
}
+
+
void http(connection_ptr connection) {
std::stringstream foo;
@@ -205,6 +256,7 @@ public:
<< ",\"messages\":" << m_messages
<< ",\"bytes\":" << m_data
<< ",\"messages_sent\":" << m_messages_sent
+ << ",\"messages_acked\":" << m_messages_acked
<< ",\"bytes_sent\":" << m_data_sent
<< ",\"connections\":" << m_connections.size()
<< ",\"admin_connections\":" << m_admin_connections.size()
@@ -242,6 +294,9 @@ private:
boost::posix_time::ptime m_epoch;
boost::posix_time::ptime m_last_time;
+ size_t m_messages_acked;
+ std::map m_ack_stats;
+
std::set m_connections;
std::set m_admin_connections;
};
diff --git a/examples/stress_client/stress_client.cpp b/examples/stress_client/stress_client.cpp
index 5b568a5c8a..7d098e84ff 100644
--- a/examples/stress_client/stress_client.cpp
+++ b/examples/stress_client/stress_client.cpp
@@ -27,6 +27,7 @@
#include "../../src/endpoint.hpp"
#include "../../src/roles/client.hpp"
+#include "../../src/md5/md5.hpp"
#include
@@ -45,8 +46,24 @@ public:
typedef echo_client_handler type;
typedef plain_endpoint_type::connection_ptr connection_ptr;
- void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) {
- /*if (connection->get_resource() == "/getCaseCount") {
+ void on_open(connection_ptr connection) {
+ //std::cout << "on_open: " << std::endl;
+ if (!m_timer) {
+ m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
+ m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
+ m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
+ }
+ }
+
+ void on_message(connection_ptr connection, websocketpp::message::data_ptr msg) {
+ // SHA1 or MD5 the message and
+
+ //std::cout << "got message: " << websocketpp::md5_hash_hex(msg->get_payload()) << std::endl;
+
+ m_total++;
+ m_msg_stats[websocketpp::md5_hash_hex(msg->get_payload())]++;
+
+ /*if (connection->get_resource() == "/getCaseCount") {
std::cout << "Detected " << msg->get_payload() << " test cases." << std::endl;
m_case_count = atoi(msg->get_payload().c_str());
} else {
@@ -64,15 +81,49 @@ public:
std::cout << "connection failed" << std::endl;
}
-
- int m_case_count;
+ void on_timer(connection_ptr connection,const boost::system::error_code& error) {
+ if (error) {
+ std::cout << "on_timer error" << std::endl;
+ return;
+ }
+
+ //std::cout << "on_timer: " << m_total << std::endl;
+ std::stringstream foo;
+ foo << "{\"type\":\"acks\",\"messages\":[";
+
+ std::map::iterator it;
+ std::map::iterator last = m_msg_stats.end();
+ if (m_msg_stats.size() > 0) {
+ last--;
+ }
+
+ for (it = m_msg_stats.begin(); it != m_msg_stats.end(); it++) {
+ foo << "{\"" << (*it).first << "\":" << (*it).second << "}" << (it != last ? "," : "");
+ }
+ foo << "]}";
+
+ //std::cout << "Sending " << foo.str() << std::endl;
+
+ connection->send(foo.str(),false);
+
+ m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
+ m_timer->async_wait(boost::bind(&type::on_timer,this,connection,boost::asio::placeholders::error));
+ }
+
+ void on_close(connection_ptr connection) {
+ m_timer->cancel();
+ }
+
+ size_t m_total;
+ std::map m_msg_stats;
+ boost::shared_ptr m_timer;
};
int main(int argc, char* argv[]) {
std::string uri = "ws://localhost:9002/";
int num_batches = 1;
- int batch_size = 50;
+ int batch_size = 1;
if (argc != 4) {
std::cout << "Usage: `echo_client test_url num_batches batch_size`" << std::endl;