From 61be3c182c167e6d69fbad7111fe1e1f9146b60c Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Thu, 7 Jun 2012 13:44:06 -0500 Subject: [PATCH] stress_test now supports message sending & stats --- examples/wsperf/request.cpp | 53 ++++++-- examples/wsperf/stress_aggregate.cpp | 4 +- examples/wsperf/stress_aggregate.hpp | 2 +- examples/wsperf/stress_handler.cpp | 183 ++++++++++++++++++--------- examples/wsperf/stress_handler.hpp | 33 ++++- examples/wsperf/wsperf.cpp | 4 +- 6 files changed, 203 insertions(+), 76 deletions(-) diff --git a/examples/wsperf/request.cpp b/examples/wsperf/request.cpp index 55e330488a..db798d14d0 100644 --- a/examples/wsperf/request.cpp +++ b/examples/wsperf/request.cpp @@ -28,6 +28,7 @@ #include "request.hpp" #include "stress_aggregate.hpp" +#include #include using wsperf::request; @@ -36,6 +37,7 @@ void request::process(unsigned int id) { case_handler_ptr test; stress_handler_ptr shandler; std::string uri; + size_t connections_opened = 0; size_t connection_count; wscmd::cmd command = wscmd::parse(req); @@ -48,6 +50,7 @@ void request::process(unsigned int id) { } else if (command.command == "stress_test") { shandler = stress_handler_ptr(new stress_aggregate(command)); + // todo make sure this isn't 0 if(!wscmd::extract_number(command, "connection_count",connection_count)) { connection_count = 1; } @@ -92,6 +95,14 @@ void request::process(unsigned int id) { } else if (command.command == "stress_test") { client e(shandler); + e.alog().unset_level(websocketpp::log::alevel::ALL); + e.elog().unset_level(websocketpp::log::elevel::ALL); + + /*client::connection_ptr con; + con = e.get_connection(uri); + shandler->on_connect(con); + e.connect(con);*/ + boost::thread t(boost::bind(&client::run, &e, true)); size_t handshake_delay; @@ -99,37 +110,53 @@ void request::process(unsigned int id) { handshake_delay = 10; } - // create connections + // open n connections for (size_t i = 0; i < connection_count; i++) { client::connection_ptr con; - con = e.get_connection(uri); - shandler->on_connect(con); - e.connect(con); - + boost::this_thread::sleep(boost::posix_time::milliseconds(handshake_delay)); } - for (;;) { - // send update - writer->write(prepare_response_object("test_data",shandler->get_data())); + // start sending messages + shandler->start_message_test(); + + e.end_perpetual(); + + t.join(); + + std::cout << "writing data" << std::endl; + writer->write(prepare_response_object("test_data",shandler->get_data())); + + /*for (;;) { + // tell the handler to perform its event loop - // check for too few connections + bool quit = false; - bool quit = shandler->maintenance(); + if (connections_opened == connection_count) { + std::cout << "maintenance loop" << std::endl; + quit = shandler->maintenance(); + } // check for done-ness if (quit) { + // send update to command + std::cout << "writing data" << std::endl; + writer->write(prepare_response_object("test_data",shandler->get_data())); break; } + // unless we know we have something to do, sleep for a bit. + if (connections_opened < connection_count) { + continue; + } boost::this_thread::sleep(boost::posix_time::milliseconds(1000)); - } - e.end_perpetual(); - t.join(); + }*/ + //e.end_perpetual(); + } writer->write(prepare_response("test_complete","")); diff --git a/examples/wsperf/stress_aggregate.cpp b/examples/wsperf/stress_aggregate.cpp index 765cb949cd..ea4bab8b3d 100644 --- a/examples/wsperf/stress_aggregate.cpp +++ b/examples/wsperf/stress_aggregate.cpp @@ -65,12 +65,12 @@ void stress_aggregate::start(connection_ptr con) { } -void stress_aggregate::on_message(connection_ptr con,websocketpp::message::data_ptr msg) { +/*void stress_aggregate::on_message(connection_ptr con,websocketpp::message::data_ptr msg) { std::string hash = websocketpp::md5_hash_hex(msg->get_payload()); boost::lock_guard lock(m_lock); m_msg_stats[hash]++; -} +}*/ /*std::string stress_aggregate::get_data() const { std::stringstream data; diff --git a/examples/wsperf/stress_aggregate.hpp b/examples/wsperf/stress_aggregate.hpp index ccc990371a..3778c8c443 100644 --- a/examples/wsperf/stress_aggregate.hpp +++ b/examples/wsperf/stress_aggregate.hpp @@ -40,7 +40,7 @@ public: /// Construct a stress test from a wscmd command explicit stress_aggregate(wscmd::cmd& cmd); - void on_message(connection_ptr con,websocketpp::message::data_ptr msg); + //void on_message(connection_ptr con,websocketpp::message::data_ptr msg); void start(connection_ptr con); void end(); diff --git a/examples/wsperf/stress_handler.cpp b/examples/wsperf/stress_handler.cpp index cf49111ee6..1cb89d993a 100644 --- a/examples/wsperf/stress_handler.cpp +++ b/examples/wsperf/stress_handler.cpp @@ -64,6 +64,7 @@ stress_handler::stress_handler(wscmd::cmd& cmd) , m_failed_connections(0) , m_next_con_id(0) , m_init(boost::chrono::steady_clock::now()) + , m_next_msg_id(0) , m_con_sync(false) { if (!wscmd::extract_number(cmd,"msg_count",m_msg_count)) { @@ -112,14 +113,6 @@ stress_handler::stress_handler(wscmd::cmd& cmd) // TODO: choose random number between 0 and max_dur m_con_duration = max_dur; } - - std::cout << "con_duration: " << m_con_duration - << "msg_count: " << m_msg_count - << "msg_size: " << m_msg_size - << "m_con_sync: " << m_con_sync - << "m_con_lifetime: " << m_con_lifetime - << "m_msg_mode: " << m_msg_mode - << std::endl; } void stress_handler::on_connect(connection_ptr con) { @@ -127,18 +120,76 @@ void stress_handler::on_connect(connection_ptr con) { m_con_data[con] = con_data(m_next_con_id++, m_init); m_con_data[con].start = boost::chrono::steady_clock::now(); - m_dirty.push_back(con); } void stress_handler::on_handshake_init(connection_ptr con) { boost::lock_guard lock(m_lock); m_con_data[con].tcp_established = boost::chrono::steady_clock::now(); - m_dirty.push_back(con); // TODO: log close reason? } +void stress_handler::start_message_test() { + m_msg.reset(new std::string()); + m_msg->assign(m_msg_size,'*'); + + // for each connection send the first message + std::map::iterator it; + for (it = m_con_data.begin(); it != m_con_data.end(); it++) { + connection_ptr con = (*it).first; + con_data& data = (*it).second; + + /*data.msg = con->get_data_message(); + std::string msg; + msg.assign(m_msg_size,'*'); + + data.msg->set_payload(msg); + + data.msg->reset(websocketpp::frame::opcode::BINARY);*/ + + boost::lock_guard lock(m_lock); + + msg_data foo; + + foo.msg_id = m_next_msg_id++; + foo.send_time = boost::chrono::steady_clock::now(); + data.messages.push_back(foo); + + con->send(*m_msg); + } +} + +void stress_handler::on_message(connection_ptr con,websocketpp::message::data_ptr msg) { + time_point mark = boost::chrono::steady_clock::now(); + + boost::lock_guard lock(m_lock); + + std::map::iterator element = m_con_data.find(con); + + if (element == m_con_data.end()) { + std::cout << "Bad Bad Bad" << std::endl; + return; + } + + con_data& data = (*element).second; + + data.messages.back().recv_time = mark; + + + if (data.messages.size() < m_msg_count) { + msg_data foo; + + foo.msg_id = m_next_msg_id++; + foo.send_time = boost::chrono::steady_clock::now(); + data.messages.push_back(foo); + + con->send(*m_msg); + } else { + close(con); + } +} + void stress_handler::on_open(connection_ptr con) { { boost::lock_guard lock(m_lock); @@ -152,7 +203,6 @@ void stress_handler::on_open(connection_ptr con) { m_con_data[con].on_open = boost::chrono::steady_clock::now(); m_con_data[con].status = "Open"; - m_dirty.push_back(con); } start(con); @@ -165,7 +215,6 @@ void stress_handler::on_close(connection_ptr con) { m_con_data[con].on_close = boost::chrono::steady_clock::now(); m_con_data[con].status = "Closed"; - m_dirty.push_back(con); // TODO: log close reason? } @@ -177,19 +226,19 @@ void stress_handler::on_fail(connection_ptr con) { m_con_data[con].on_fail = boost::chrono::steady_clock::now(); m_con_data[con].status = "Failed"; - m_dirty.push_back(con); // TODO: log failure reason } -void stress_handler::start(connection_ptr con) {} +void stress_handler::start(connection_ptr con) { + //close(con); +} void stress_handler::close(connection_ptr con) { //boost::lock_guard lock(m_lock); m_con_data[con].close_sent = boost::chrono::steady_clock::now(); m_con_data[con].status = "Closing"; - m_dirty.push_back(con); con->close(websocketpp::close::status::NORMAL); // TODO: log close reason? @@ -198,10 +247,12 @@ void stress_handler::close(connection_ptr con) { std::string stress_handler::get_data() const { std::stringstream data; + data << "{"; { boost::lock_guard lock(m_lock); + data << "\"current_connections\":" << m_current_connections; data << ",\"max_connections\":" << m_max_connections; data << ",\"total_connections\":" << m_total_connections; @@ -211,20 +262,11 @@ std::string stress_handler::get_data() const { // for each item in m_dirty std::string sep = ""; - std::list::const_iterator it; - for (it = m_dirty.begin(); it != m_dirty.end(); it++) { - std::map::const_iterator element; - - element = m_con_data.find(*it); - - if (element == m_con_data.end()) { - continue; - } - - data << sep << element->second.print(); + std::map::const_iterator it; + for (it = m_con_data.begin(); it != m_con_data.end(); it++) { + data << sep << (*it).second.print(); sep = ","; } - m_dirty.clear(); data << "]"; } @@ -235,45 +277,29 @@ std::string stress_handler::get_data() const { } bool stress_handler::maintenance() { - std::list to_process; + std::cout << "locking..." << std::endl; + boost::lock_guard lock(m_lock); - { - boost::lock_guard lock(m_lock); - - bool quit = true; - - std::map::iterator it; - for (it = m_con_data.begin(); it != m_con_data.end(); it++) { - to_process.push_back((*it).first); - if ((*it).first->get_state() != websocketpp::session::state::CLOSED) { - quit = false; - } - } - - if (quit) { - return true; - } - } + bool quit = true; time_point now = boost::chrono::steady_clock::now(); - std::list::iterator it; - for (it = to_process.begin(); it != to_process.end(); it++) { - connection_ptr con = (*it); - std::map::iterator element; - - boost::lock_guard lock(m_lock); - - element = m_con_data.find(con); - - if (element == m_con_data.end()) { - continue; + std::cout << "found " << m_con_data.size() << " connections" << std::endl; + + std::map::iterator it; + for (it = m_con_data.begin(); it != m_con_data.end(); it++) { + if ((*it).first->get_state() != websocketpp::session::state::CLOSED) { + quit = false; } - con_data& data = element->second; + connection_ptr con = (*it).first; + con_data& data = (*it).second; + + std::cout << "processing " << data.id << "..."; // check the connection state if (con->get_state() != websocketpp::session::state::OPEN) { + std::cout << "ignored" << std::endl; continue; } @@ -283,7 +309,50 @@ bool stress_handler::maintenance() { if (milliseconds > m_con_duration) { close(con); } + std::cout << "closed" << std::endl; } + if (quit) { + return true; + } + + + /*std::cout << "found " << to_process.size() << " connections" << std::endl; + + + + + std::list::iterator it2; + for (it2 = to_process.begin(); it2 != to_process.end(); it++) { + connection_ptr con = (*it2); + std::map::iterator element; + + + + element = m_con_data.find(con); + + if (element == m_con_data.end()) { + continue; + } + + con_data& data = element->second; + + + + // check the connection state + if (con->get_state() != websocketpp::session::state::OPEN) { + std::cout << "ignored" << std::endl; + continue; + } + + boost::chrono::nanoseconds dur = now - data.on_open; + size_t milliseconds = static_cast(dur.count()) / 1000000; + + if (milliseconds > m_con_duration) { + close(con); + } + std::cout << "closed" << std::endl; + }*/ + return false; } \ No newline at end of file diff --git a/examples/wsperf/stress_handler.hpp b/examples/wsperf/stress_handler.hpp index e13344e1c3..7fc4f98277 100644 --- a/examples/wsperf/stress_handler.hpp +++ b/examples/wsperf/stress_handler.hpp @@ -60,6 +60,14 @@ namespace msg_mode { }; } +struct msg_data { + typedef boost::chrono::steady_clock::time_point time_point; + + size_t msg_id; + time_point send_time; + time_point recv_time; + //size_t payload_len; +}; struct con_data { typedef boost::chrono::steady_clock::time_point time_point; @@ -91,6 +99,19 @@ struct con_data { o << ",\"fail\":" << get_rel_microseconds(on_fail); o << ",\"close_sent\":" << get_rel_microseconds(close_sent); o << ",\"close\":" << get_rel_microseconds(on_close); + + o << ",\"messages\":["; + std::string sep = ""; + std::vector::const_iterator it; + for (it = messages.begin(); it != messages.end(); it++) { + o << sep << "[" + << get_rel_microseconds((*it).send_time) << "," + << get_rel_microseconds((*it).recv_time) + << "]"; + sep = ","; + } + + o << "]"; o << "}"; @@ -111,8 +132,12 @@ struct con_data { time_point close_sent; time_point on_close; std::string status; + std::vector messages; + //stress_handler::message_ptr msg; }; + + class stress_handler : public client::handler { public: typedef stress_handler type; @@ -124,6 +149,7 @@ public: void on_connect(connection_ptr con); + void on_message(connection_ptr con,websocketpp::message::data_ptr msg); void on_handshake_init(connection_ptr con); void on_open(connection_ptr con); @@ -133,9 +159,11 @@ public: void start(connection_ptr con); void close(connection_ptr con); void end(); - + std::string get_data() const; virtual bool maintenance(); + + void start_message_test(); protected: size_t m_current_connections; size_t m_max_connections; @@ -153,6 +181,9 @@ protected: size_t m_timeout; boost::shared_ptr m_timer; + size_t m_next_msg_id; + boost::shared_ptr m_msg; + // test settings pulled from the command con_lifetime::value m_con_lifetime; size_t m_con_duration; diff --git a/examples/wsperf/wsperf.cpp b/examples/wsperf/wsperf.cpp index eb978e9755..fca6889f0e 100644 --- a/examples/wsperf/wsperf.cpp +++ b/examples/wsperf/wsperf.cpp @@ -191,7 +191,7 @@ int start_client(po::variables_map& vm) { int main(int argc, char* argv[]) { try { // 12288 is max OS X limit without changing kernal settings - /*const rlim_t ideal_size = 10000; + const rlim_t ideal_size = 10000; rlim_t old_size; rlim_t old_max; @@ -223,7 +223,7 @@ int main(int argc, char* argv[]) { std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << " system max: " << old_max << std::endl; } } - }*/ + } std::string config_file;