From 002e75891e1b38dcb913fcce378e106698cf50cd Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 4 Jun 2012 08:46:54 -0500 Subject: [PATCH] adds preliminary stress_test mode to wsperf --- examples/wsperf/request.cpp | 16 +++- examples/wsperf/stress_aggregate.cpp | 4 +- examples/wsperf/stress_aggregate.hpp | 3 + examples/wsperf/stress_handler.cpp | 114 ++++++++++++++++++++++++++- examples/wsperf/stress_handler.hpp | 69 +++++++++++++++- 5 files changed, 197 insertions(+), 9 deletions(-) diff --git a/examples/wsperf/request.cpp b/examples/wsperf/request.cpp index 7c94947a55..c20689bbad 100644 --- a/examples/wsperf/request.cpp +++ b/examples/wsperf/request.cpp @@ -96,17 +96,27 @@ void request::process(unsigned int id) { // create connections for (size_t i = 0; i < connection_count; i++) { - e.connect(uri); + client::connection_ptr con; + + con = e.get_connection(uri); + + shandler->on_connect(con); + + e.connect(con); } for (;;) { // send update writer->write(prepare_response_object("test_data",shandler->get_data())); + // check for too few connections + + shandler->maintenance(); + + // check for done-ness + sleep(1); } - - // loop over sending updates } writer->write(prepare_response("test_complete","")); diff --git a/examples/wsperf/stress_aggregate.cpp b/examples/wsperf/stress_aggregate.cpp index 82ea397dcc..765cb949cd 100644 --- a/examples/wsperf/stress_aggregate.cpp +++ b/examples/wsperf/stress_aggregate.cpp @@ -61,7 +61,9 @@ stress_aggregate::stress_aggregate(wscmd::cmd& cmd) : stress_handler(cmd) {} -void stress_aggregate::start(connection_ptr con) {} +void stress_aggregate::start(connection_ptr con) { + +} void stress_aggregate::on_message(connection_ptr con,websocketpp::message::data_ptr msg) { std::string hash = websocketpp::md5_hash_hex(msg->get_payload()); diff --git a/examples/wsperf/stress_aggregate.hpp b/examples/wsperf/stress_aggregate.hpp index fca6dcf99b..ccc990371a 100644 --- a/examples/wsperf/stress_aggregate.hpp +++ b/examples/wsperf/stress_aggregate.hpp @@ -36,6 +36,7 @@ class stress_aggregate : public stress_handler { public: typedef stress_aggregate type; + /// Construct a stress test from a wscmd command explicit stress_aggregate(wscmd::cmd& cmd); @@ -47,6 +48,8 @@ public: const std::string get_data() const; protected: std::map m_msg_stats; + + }; typedef boost::shared_ptr stress_aggregate_ptr; diff --git a/examples/wsperf/stress_handler.cpp b/examples/wsperf/stress_handler.cpp index 07be09405a..4a562a13c1 100644 --- a/examples/wsperf/stress_handler.cpp +++ b/examples/wsperf/stress_handler.cpp @@ -58,13 +58,32 @@ using wsperf::stress_handler; * primarily useful for debugging. */ stress_handler::stress_handler(wscmd::cmd& cmd) - : m_current_connections(0) - , m_max_connections(0) - , m_total_connections(0) - , m_failed_connections(0) + : m_current_connections(0) + , m_max_connections(0) + , m_total_connections(0) + , m_failed_connections(0) + , m_next_con_id(0) + , m_init(boost::chrono::steady_clock::now()) { } +void stress_handler::on_connect(connection_ptr con) { + boost::lock_guard lock(m_lock); + + m_con_data[con] = con_data(m_next_con_id++, m_init); + m_con_data[con].start = boost::chrono::steady_clock::now(); + m_dirty.push_back(con); +} + +void stress_handler::on_handshake_init(connection_ptr con) { + boost::lock_guard lock(m_lock); + + m_con_data[con].tcp_established = boost::chrono::steady_clock::now(); + m_dirty.push_back(con); + + // TODO: log close reason? +} + void stress_handler::on_open(connection_ptr con) { { boost::lock_guard lock(m_lock); @@ -75,6 +94,10 @@ void stress_handler::on_open(connection_ptr con) { if (m_current_connections > m_max_connections) { m_max_connections = m_current_connections; } + + m_con_data[con].on_open = boost::chrono::steady_clock::now(); + m_con_data[con].status = "Open"; + m_dirty.push_back(con); } start(con); @@ -85,6 +108,10 @@ void stress_handler::on_close(connection_ptr con) { m_current_connections--; + m_con_data[con].on_close = boost::chrono::steady_clock::now(); + m_con_data[con].status = "Closed"; + m_dirty.push_back(con); + // TODO: log close reason? } @@ -93,11 +120,26 @@ void stress_handler::on_fail(connection_ptr con) { m_failed_connections++; + m_con_data[con].on_fail = boost::chrono::steady_clock::now(); + m_con_data[con].status = "Failed"; + m_dirty.push_back(con); + // TODO: log failure reason } void stress_handler::start(connection_ptr con) {} +void stress_handler::close(connection_ptr con) { + //boost::lock_guard lock(m_lock); + + m_con_data[con].close_sent = boost::chrono::steady_clock::now(); + m_con_data[con].status = "Closing"; + m_dirty.push_back(con); + + con->close(websocketpp::close::status::NORMAL); + // TODO: log close reason? +} + std::string stress_handler::get_data() const { std::stringstream data; @@ -109,9 +151,73 @@ std::string stress_handler::get_data() const { data << ",\"max_connections\":" << m_max_connections; data << ",\"total_connections\":" << m_total_connections; data << ",\"failed_connections\":" << m_failed_connections; + + data << ",\"connection_data\":["; + + // for each item in m_dirty + std::string sep = ""; + std::list::const_iterator it; + for (it = m_dirty.begin(); it != m_dirty.end(); it++) { + std::map::const_iterator element; + + element = m_con_data.find(*it); + + if (element == m_con_data.end()) { + continue; + } + + data << sep << element->second.print(); + sep = ","; + } + m_dirty.clear(); + + data << "]"; } data << "}"; return data.str(); } + +void stress_handler::maintenance() { + std::list to_process; + + { + boost::lock_guard lock(m_lock); + + std::map::iterator it; + for (it = m_con_data.begin(); it != m_con_data.end(); it++) { + to_process.push_back((*it).first); + } + } + + time_point now = boost::chrono::steady_clock::now(); + + std::list::iterator it; + for (it = to_process.begin(); it != to_process.end(); it++) { + connection_ptr con = (*it); + std::map::iterator element; + + boost::lock_guard lock(m_lock); + + element = m_con_data.find(con); + + if (element == m_con_data.end()) { + continue; + } + + con_data& data = element->second; + + // check the connection state + if (con->get_state() != websocketpp::session::state::OPEN) { + continue; + } + + boost::chrono::nanoseconds dur = now - data.on_open; + size_t milliseconds = dur.count() / 1000000.; + + if (milliseconds > 5000) { + close(con); + } + } +} \ No newline at end of file diff --git a/examples/wsperf/stress_handler.hpp b/examples/wsperf/stress_handler.hpp index c93db854a1..bb44ccee71 100644 --- a/examples/wsperf/stress_handler.hpp +++ b/examples/wsperf/stress_handler.hpp @@ -42,27 +42,94 @@ using websocketpp::client; namespace wsperf { +struct con_data { + typedef boost::chrono::steady_clock::time_point time_point; + + con_data() {} + + con_data(size_t id,time_point init) + : id(id) + , init(init) + , start(init) + , tcp_established(init) + , on_open(init) + , on_fail(init) + , close_sent(init) + , on_close(init) + , status("Connecting") + { + } + + std::string print() const { + std::stringstream o; + + o << "{"; + o << "\"id\":" << id; + o << ",\"status\":\"" << status << "\""; + o << ",\"start\":" << get_rel_microseconds(start); + o << ",\"tcp\":" << get_rel_microseconds(tcp_established); + o << ",\"open\":" << get_rel_microseconds(on_open); + o << ",\"fail\":" << get_rel_microseconds(on_fail); + o << ",\"close_sent\":" << get_rel_microseconds(close_sent); + o << ",\"close\":" << get_rel_microseconds(on_close); + + o << "}"; + + return o.str(); + } + + double get_rel_microseconds(time_point t) const { + boost::chrono::nanoseconds dur = t - init; + return static_cast (dur.count()) / 1000.; + } + + size_t id; + time_point init; + time_point start; + time_point tcp_established; + time_point on_open; + time_point on_fail; + time_point close_sent; + time_point on_close; + std::string status; +}; + class stress_handler : public client::handler { public: typedef stress_handler type; + typedef boost::chrono::steady_clock::time_point time_point; + typedef std::map time_map; /// Construct a stress test from a wscmd command explicit stress_handler(wscmd::cmd& cmd); + void on_connect(connection_ptr con); + + + void on_handshake_init(connection_ptr con); void on_open(connection_ptr con); void on_close(connection_ptr con); void on_fail(connection_ptr con); void start(connection_ptr con); + void close(connection_ptr con); void end(); std::string get_data() const; + virtual void maintenance(); protected: size_t m_current_connections; size_t m_max_connections; size_t m_total_connections; size_t m_failed_connections; - + + size_t m_next_con_id; + time_point m_init; + + // connection related timestamps + std::map m_con_data; + mutable std::list m_dirty; + // Stats update timer size_t m_timeout; boost::shared_ptr m_timer;