major work on the broadcast admin and stress client

This commit is contained in:
Peter Thorson
2011-12-11 14:05:08 -06:00
parent e7d3879df4
commit ebf0fa614c
79 changed files with 19994 additions and 16 deletions

View File

@@ -29,6 +29,8 @@
#include "../../src/roles/server.hpp"
#include "../../src/sockets/ssl.hpp"
#include <boost/date_time/posix_time/posix_time.hpp>
#include <cstring>
#include <set>
@@ -45,6 +47,12 @@ class broadcast_server_handler : public endpoint_type::handler {
public:
typedef broadcast_server_handler<endpoint_type> type;
typedef typename endpoint_type::connection_ptr connection_ptr;
broadcast_server_handler()
: m_epoch(boost::posix_time::time_from_string("1970-01-01 00:00:00.000")) {
m_messages = 0;
m_data = 0;
}
std::string get_password() const {
return "test";
@@ -72,7 +80,12 @@ public:
}
void on_open(connection_ptr connection) {
if (!m_timer) {
m_timer.reset(new boost::asio::deadline_timer(connection->get_io_service(),boost::posix_time::seconds(0)));
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->async_wait(boost::bind(&type::on_timer,this,boost::asio::placeholders::error));
m_last_time = boost::posix_time::microsec_clock::local_time();
}
if (connection->get_resource() == "/admin") {
m_admin_connections.insert(connection);
@@ -83,7 +96,10 @@ public:
typename std::set<connection_ptr>::iterator it;
std::stringstream foo;
foo << "{\"type\":\"con\",\"value\":" << m_connections.size() << "}";
foo << "{\"type\":\"con\""
<< ",\"timestamp\":" << get_ms()
<< ",\"value\":" << m_connections.size()
<< "}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
(*it)->send(foo.str(),false);
@@ -98,8 +114,11 @@ public:
typename std::set<connection_ptr>::iterator it;
std::stringstream foo;
foo << "{\"type\":\"con\",\"value\":" << m_connections.size() << "}";
foo << "{\"type\":\"con\""
<< ",\"timestamp\":" << get_ms()
<< ",\"value\":" << m_connections.size()
<< "}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
(*it)->send(foo.str(),false);
}
@@ -109,8 +128,12 @@ public:
void on_message(connection_ptr connection,websocketpp::message::data_ptr msg) {
typename std::set<connection_ptr>::iterator it;
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(msg->get_payload(),(msg->get_opcode() == websocketpp::frame::opcode::BINARY));
}
@@ -119,14 +142,20 @@ public:
foo << "{\"type\":\"message\",\"value\":\"";
if (msg->get_opcode() == websocketpp::frame::opcode::BINARY) {
foo << "[Binary Message length: " << msg->get_payload().size() << "]";
foo << "[Binary Message, length: " << msg->get_payload().size() << "]";
} else {
foo << msg->get_payload();
if (msg->get_payload().size() > 126) {
foo << "[UTF8 Message, length: " << msg->get_payload().size() << "]";
} else {
foo << msg->get_payload();
}
}
foo << "\"}";
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
m_messages++;
m_data += msg->get_payload().size();
(*it)->send(foo.str(),false);
}
@@ -141,12 +170,80 @@ public:
connection->set_body(foo.str());
}
long get_ms() {
boost::posix_time::ptime now = boost::posix_time::microsec_clock::local_time();
boost::posix_time::time_period period(m_epoch,now);
return period.length().total_milliseconds();
}
void on_timer(const boost::system::error_code& error) {
// there is new data. This is the first time that there is no new data
//if (m_messages != m_messages_cache || m_data != m_data_cache) {
//boost::posix_time::time_period period(m_last_time,now);
//m_last_time = now;
long milli_seconds = get_ms();
//double seconds = milli_seconds/1000.0;
m_messages_cache = m_messages;
m_data_cache = m_data;
m_messages_sent += m_messages;
m_data_sent += m_data;
//std::cout << "m: " << m_messages
// << " milli: " << milli_seconds
// << std::endl;
std::stringstream foo;
foo << "{\"type\":\"stats\""
<< ",\"timestamp\":" << milli_seconds
<< ",\"messages\":" << m_messages
<< ",\"bytes\":" << m_data
<< ",\"messages_sent\":" << m_messages_sent
<< ",\"bytes_sent\":" << m_data_sent
<< ",\"connections\":" << m_connections.size()
<< ",\"admin_connections\":" << m_admin_connections.size()
<< "}";
//<< ((m_messages_cache * seconds)*1000) << ",\"data\":"
//<< ((m_data_cache * seconds)*1000) << ",\"messages_sent\":"
//<< m_messages_sent <<",\"data_sent\":" << m_data_sent << "}";
typename std::set<connection_ptr>::iterator it;
for (it = m_admin_connections.begin(); it != m_admin_connections.end(); it++) {
(*it)->send(foo.str(),false);
}
m_messages = 0;
m_data = 0;
//}
m_timer->expires_from_now(boost::posix_time::milliseconds(1000));
m_timer->async_wait(boost::bind(&type::on_timer,this,boost::asio::placeholders::error));
}
void on_fail(connection_ptr connection) {
std::cout << "connection failed" << std::endl;
}
private:
std::set<connection_ptr> m_connections;
std::set<connection_ptr> m_admin_connections;
unsigned int m_messages;
size_t m_data;
unsigned int m_messages_cache;
size_t m_data_cache;
unsigned int m_messages_sent;
size_t m_data_sent;
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
boost::posix_time::ptime m_epoch;
boost::posix_time::ptime m_last_time;
std::set<connection_ptr> m_connections;
std::set<connection_ptr> m_admin_connections;
};
int main(int argc, char* argv[]) {