From 8a5f84dd993be71dd00688236698edc67eced715 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Tue, 6 Mar 2012 20:23:23 -0600 Subject: [PATCH] merges Tobias' statistics changes --- examples/wsperf/case.hpp | 129 +++++++++++++++++++++++------------- examples/wsperf/generic.hpp | 24 ++++--- examples/wsperf/request.hpp | 45 ++++++++----- examples/wsperf/wsperf.cpp | 2 +- 4 files changed, 127 insertions(+), 73 deletions(-) diff --git a/examples/wsperf/case.hpp b/examples/wsperf/case.hpp index cf1fe0520e..2022b4d2ca 100644 --- a/examples/wsperf/case.hpp +++ b/examples/wsperf/case.hpp @@ -82,85 +82,117 @@ public: } void end(connection_ptr con) { - uint64_t avg = 0; + /*uint64_t avg = 0; std::vector avgs(10); double squaresum = 0; uint64_t stddev = 0; int64_t total = 0; + double seconds = 0;*/ + + std::vector avgs; + avgs.resize(m_quantile_count, 0); + + std::vector quantiles; + quantiles.resize(m_quantile_count, 0); + + double avg = 0; + double stddev = 0; + double total = 0; double seconds = 0; // TODO: handle weird sizes and error conditions better - if (m_end.size() > 10) { - uint64_t last = m_start.time_since_epoch().count(); + if (m_end.size() > m_quantile_count) { + boost::chrono::steady_clock::time_point last = m_start; + + // convert RTTs to microsecs + // + std::vector::iterator it; - for (it = m_end.begin(); it != m_end.end(); it++) { - //(*it) -= m_start; - m_times.push_back(it->time_since_epoch().count() - last); - last = it->time_since_epoch().count(); + for (it = m_end.begin(); it != m_end.end(); ++it) { + boost::chrono::nanoseconds dur = *it - last; + m_times.push_back(static_cast (dur.count()) / 1000.); + last = *it; } - std::sort(m_times.begin(),m_times.end()); + std::sort(m_times.begin(), m_times.end()); - for (uint64_t i = 0; i < m_times.size(); i++) { + size_t samples_per_quantile = m_times.size() / m_quantile_count; + + // quantiles + for (size_t i = 0; i < m_quantile_count; ++i) { + quantiles[i] = m_times[((i + 1) * samples_per_quantile) - 1]; + } + + // total average and quantile averages + for (size_t i = 0; i < m_times.size(); ++i) { avg += m_times[i]; - avgs[i/(m_times.size()/10)] += m_times[i]; - squaresum += pow(double(m_times[i]),2); + avgs[i / samples_per_quantile] + += m_times[i] / static_cast(samples_per_quantile); } - avg /= m_times.size(); - for (uint64_t i = 0; i < m_times.size(); i++) { - stddev += pow(m_times[i]-avg,2); - } - stddev /= m_times.size(); - stddev = sqrt(stddev); + avg /= static_cast (m_times.size()); - for (int i = 0; i < 10; i++) { - avgs[i] /= (m_times.size()/10); + // standard dev corrected for estimation from sample + for (size_t i = 0; i < m_times.size(); ++i) { + stddev += (m_times[i] - avg) * (m_times[i] - avg); } + + // Bessel's correction + stddev /= static_cast (m_times.size() - 1); + stddev = std::sqrt(stddev); } else { m_times.push_back(0); } - total = m_end[m_end.size()-1].time_since_epoch().count() - m_start.time_since_epoch().count(); - seconds = double(total)/1000000000.0; - - double kbps = (double(m_bytes)/1000.0)/seconds; + boost::chrono::nanoseconds total_dur = m_end[m_end.size()-1] - m_start; + total = static_cast (total_dur.count()) / 1000.; // microsec + seconds = total / 10000000.; std::stringstream s; + std::string outcome; switch (m_pass) { case FAIL: - s << "{\"name\":\"" << m_name << "\",\"result\":\"fail\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":["; - - for (int i = 0; i < 10; i++) { - s << (i > 0 ? "," : "") << avgs[i]; - } - - s << "]}"; + outcome = "fail"; break; - case PASS: - s << "{\"name\":\"" << m_name << "\",\"result\":\"pass\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":["; - - for (int i = 0; i < 10; i++) { - s << (i > 0 ? "," : "") << avgs[i]; - } - - s << "]}"; + case PASS: + outcome = "pass"; break; case TIME_OUT: - s << "{\"name\":\"" << m_name << "\",\"result\":\"time_out\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":["; - - for (int i = 0; i < 10; i++) { - s << (i > 0 ? "," : "") << avgs[i]; - } - - s << "]}"; + outcome = "time_out"; break; case RUNNING: throw case_exception("end() called from RUNNING state"); break; } + s << "{\"result\":\"" << outcome + << "\",\"min\":" << m_times[0] + << ",\"max\":" << m_times[m_times.size()-1] + << ",\"median\":" << m_times[(m_times.size()-1)/2] + << ",\"avg\":" << avg + << ",\"stddev\":" << stddev + << ",\"total\":" << total + << ",\"bytes\":" << m_bytes + << ",\"quantiles\":["; + + for (int i = 0; i < m_quantile_count; i++) { + s << (i > 0 ? "," : ""); + s << "["; + s << avgs[i] << "," << quantiles[i]; + s << "]"; + } + s << "]"; + + if (m_rtts) { + s << ",\"rtts\":["; + for (size_t i = 0; i < m_times.size(); i++) { + s << (i > 0 ? "," : "") << m_times[i]; + } + s << "]"; + }; + s << "}"; + m_data = s.str(); con->close(websocketpp::close::status::NORMAL,""); @@ -252,6 +284,9 @@ public: const std::string& get_token() const { return m_token; } + const std::string& get_uri() const { + return m_uri; + } protected: enum status { FAIL = 0, @@ -263,6 +298,8 @@ protected: std::string m_name; std::string m_uri; std::string m_token; + size_t m_quantile_count; + bool m_rtts; std::string m_data; status m_pass; @@ -270,7 +307,7 @@ protected: boost::chrono::steady_clock::time_point m_start; std::vector m_end; - std::vector m_times; + std::vector m_times; uint64_t m_bytes; }; diff --git a/examples/wsperf/generic.hpp b/examples/wsperf/generic.hpp index 313be131fd..84470ca865 100644 --- a/examples/wsperf/generic.hpp +++ b/examples/wsperf/generic.hpp @@ -57,18 +57,24 @@ public: {} explicit message_test(wscmd::cmd& cmd) { - // message_test:uri=[string]; ws://localhost:9000 - // size=[interger]; 4096 - // count=[integer]; 1000 - // timeout=[integer]; 10000 - // binary=[bool]; true/false - // sync=[bool]; true/false - // correctness=[string]; exact/length - // token=[string]; foo + // message_test:uri=[string]; ws://localhost:9000 + // token=[string]; foo + // quantile_count=[integer]; 10 + // rtts=[bool]; true/false + // size=[interger]; 4096 + // count=[integer]; 1000 + // timeout=[integer]; 10000 + // binary=[bool]; true/false + // sync=[bool]; true/false + // correctness=[string]; exact/length + // generic stuff m_uri = extract_string(cmd,"uri"); m_token = extract_string(cmd,"token"); + m_quantile_count = extract_number(cmd,"quantile_count"); + m_rtts = extract_bool(cmd,"rtts"); + // specific to message_test m_message_count = extract_number(cmd,"size"); m_message_size = extract_number(cmd,"count"); m_timeout = extract_number(cmd,"timeout"); @@ -121,7 +127,7 @@ public: m_timer->cancel(); m_msg.reset(); m_pass = FAIL; - std::cout << "foo" << std::endl; + this->end(con); } diff --git a/examples/wsperf/request.hpp b/examples/wsperf/request.hpp index 6f5eef14b5..c9c5815318 100644 --- a/examples/wsperf/request.hpp +++ b/examples/wsperf/request.hpp @@ -83,36 +83,47 @@ struct request { if (command.command == "message_test") { test = case_handler_ptr(new message_test(command)); token = test->get_token(); + uri = test->get_uri(); } else { writer->write(prepare_response("error","Invalid Command")); return; } + + writer->write(prepare_response("test_start","")); + + client e(test); + + e.alog().unset_level(websocketpp::log::alevel::ALL); + e.elog().unset_level(websocketpp::log::elevel::ALL); + + e.elog().set_level(websocketpp::log::elevel::RERROR); + e.elog().set_level(websocketpp::log::elevel::FATAL); + + e.connect(uri); + e.run(); + + writer->write(prepare_response_object("test_data",test->get_data())); + + writer->write(prepare_response("test_complete","")); } catch (case_exception& e) { writer->write(prepare_response("error",e.what())); return; + } catch (websocketpp::exception& e) { + writer->write(prepare_response("error",e.what())); + return; + } catch (websocketpp::uri_exception& e) { + writer->write(prepare_response("error",e.what())); + return; } - - writer->write(prepare_response("test_start","")); - - client e(test); - - e.alog().unset_level(websocketpp::log::alevel::ALL); - e.elog().unset_level(websocketpp::log::elevel::ALL); - - e.elog().set_level(websocketpp::log::elevel::RERROR); - e.elog().set_level(websocketpp::log::elevel::FATAL); - - e.connect(uri); - e.run(); - - writer->write(prepare_response("test_start",test->get_data())); - - writer->write(prepare_response("test_complete","")); } std::string prepare_response(std::string type,std::string data) { return "{\"type\":\"" + type + "\",\"token\":\"" + token + "\",\"data\":\"" + data + "\"}"; } + + std::string prepare_response_object(std::string type,std::string data) { + return "{\"type\":\"" + type + "\",\"token\":\"" + token + "\",\"data\":" + data + "}"; + } }; // The coordinator is a simple wrapper around an STL queue. add_request inserts diff --git a/examples/wsperf/wsperf.cpp b/examples/wsperf/wsperf.cpp index 971e01f209..1f020a4e4f 100644 --- a/examples/wsperf/wsperf.cpp +++ b/examples/wsperf/wsperf.cpp @@ -54,7 +54,7 @@ using websocketpp::server; // based on hardware concurrency available and expected load and // job length. int main(int argc, char* argv[]) { - unsigned short port = 9003; + unsigned short port = 9050; unsigned short num_threads = 2; try {