merges Tobias' statistics changes

This commit is contained in:
Peter Thorson
2012-03-06 20:23:23 -06:00
parent c3026e4e93
commit 8a5f84dd99
4 changed files with 127 additions and 73 deletions

View File

@@ -82,85 +82,117 @@ public:
}
void end(connection_ptr con) {
uint64_t avg = 0;
/*uint64_t avg = 0;
std::vector<uint64_t> avgs(10);
double squaresum = 0;
uint64_t stddev = 0;
int64_t total = 0;
double seconds = 0;*/
std::vector<double> avgs;
avgs.resize(m_quantile_count, 0);
std::vector<double> quantiles;
quantiles.resize(m_quantile_count, 0);
double avg = 0;
double stddev = 0;
double total = 0;
double seconds = 0;
// TODO: handle weird sizes and error conditions better
if (m_end.size() > 10) {
uint64_t last = m_start.time_since_epoch().count();
if (m_end.size() > m_quantile_count) {
boost::chrono::steady_clock::time_point last = m_start;
// convert RTTs to microsecs
//
std::vector<boost::chrono::steady_clock::time_point>::iterator it;
for (it = m_end.begin(); it != m_end.end(); it++) {
//(*it) -= m_start;
m_times.push_back(it->time_since_epoch().count() - last);
last = it->time_since_epoch().count();
for (it = m_end.begin(); it != m_end.end(); ++it) {
boost::chrono::nanoseconds dur = *it - last;
m_times.push_back(static_cast<double> (dur.count()) / 1000.);
last = *it;
}
std::sort(m_times.begin(),m_times.end());
std::sort(m_times.begin(), m_times.end());
for (uint64_t i = 0; i < m_times.size(); i++) {
size_t samples_per_quantile = m_times.size() / m_quantile_count;
// quantiles
for (size_t i = 0; i < m_quantile_count; ++i) {
quantiles[i] = m_times[((i + 1) * samples_per_quantile) - 1];
}
// total average and quantile averages
for (size_t i = 0; i < m_times.size(); ++i) {
avg += m_times[i];
avgs[i/(m_times.size()/10)] += m_times[i];
squaresum += pow(double(m_times[i]),2);
avgs[i / samples_per_quantile]
+= m_times[i] / static_cast<double>(samples_per_quantile);
}
avg /= m_times.size();
for (uint64_t i = 0; i < m_times.size(); i++) {
stddev += pow(m_times[i]-avg,2);
}
stddev /= m_times.size();
stddev = sqrt(stddev);
avg /= static_cast<double> (m_times.size());
for (int i = 0; i < 10; i++) {
avgs[i] /= (m_times.size()/10);
// standard dev corrected for estimation from sample
for (size_t i = 0; i < m_times.size(); ++i) {
stddev += (m_times[i] - avg) * (m_times[i] - avg);
}
// Bessel's correction
stddev /= static_cast<double> (m_times.size() - 1);
stddev = std::sqrt(stddev);
} else {
m_times.push_back(0);
}
total = m_end[m_end.size()-1].time_since_epoch().count() - m_start.time_since_epoch().count();
seconds = double(total)/1000000000.0;
double kbps = (double(m_bytes)/1000.0)/seconds;
boost::chrono::nanoseconds total_dur = m_end[m_end.size()-1] - m_start;
total = static_cast<double> (total_dur.count()) / 1000.; // microsec
seconds = total / 10000000.;
std::stringstream s;
std::string outcome;
switch (m_pass) {
case FAIL:
s << "{\"name\":\"" << m_name << "\",\"result\":\"fail\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":[";
for (int i = 0; i < 10; i++) {
s << (i > 0 ? "," : "") << avgs[i];
}
s << "]}";
outcome = "fail";
break;
case PASS:
s << "{\"name\":\"" << m_name << "\",\"result\":\"pass\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":[";
for (int i = 0; i < 10; i++) {
s << (i > 0 ? "," : "") << avgs[i];
}
s << "]}";
case PASS:
outcome = "pass";
break;
case TIME_OUT:
s << "{\"name\":\"" << m_name << "\",\"result\":\"time_out\",\"min\":" << m_times[0] << ",\"max\":" << m_times[m_times.size()-1] << ",\"median\":" << m_times[(m_times.size()-1)/2] << ",\"avg\":" << avg << ",\"stddev\":" << stddev << ",\"sqsum\":" << squaresum << ",\"total\":" << total << ",\"KBps\":" << kbps << ",\"quantiles\":[";
for (int i = 0; i < 10; i++) {
s << (i > 0 ? "," : "") << avgs[i];
}
s << "]}";
outcome = "time_out";
break;
case RUNNING:
throw case_exception("end() called from RUNNING state");
break;
}
s << "{\"result\":\"" << outcome
<< "\",\"min\":" << m_times[0]
<< ",\"max\":" << m_times[m_times.size()-1]
<< ",\"median\":" << m_times[(m_times.size()-1)/2]
<< ",\"avg\":" << avg
<< ",\"stddev\":" << stddev
<< ",\"total\":" << total
<< ",\"bytes\":" << m_bytes
<< ",\"quantiles\":[";
for (int i = 0; i < m_quantile_count; i++) {
s << (i > 0 ? "," : "");
s << "[";
s << avgs[i] << "," << quantiles[i];
s << "]";
}
s << "]";
if (m_rtts) {
s << ",\"rtts\":[";
for (size_t i = 0; i < m_times.size(); i++) {
s << (i > 0 ? "," : "") << m_times[i];
}
s << "]";
};
s << "}";
m_data = s.str();
con->close(websocketpp::close::status::NORMAL,"");
@@ -252,6 +284,9 @@ public:
const std::string& get_token() const {
return m_token;
}
const std::string& get_uri() const {
return m_uri;
}
protected:
enum status {
FAIL = 0,
@@ -263,6 +298,8 @@ protected:
std::string m_name;
std::string m_uri;
std::string m_token;
size_t m_quantile_count;
bool m_rtts;
std::string m_data;
status m_pass;
@@ -270,7 +307,7 @@ protected:
boost::chrono::steady_clock::time_point m_start;
std::vector<boost::chrono::steady_clock::time_point> m_end;
std::vector<uint64_t> m_times;
std::vector<double> m_times;
uint64_t m_bytes;
};

View File

@@ -57,18 +57,24 @@ public:
{}
explicit message_test(wscmd::cmd& cmd) {
// message_test:uri=[string]; ws://localhost:9000
// size=[interger]; 4096
// count=[integer]; 1000
// timeout=[integer]; 10000
// binary=[bool]; true/false
// sync=[bool]; true/false
// correctness=[string]; exact/length
// token=[string]; foo
// message_test:uri=[string]; ws://localhost:9000
// token=[string]; foo
// quantile_count=[integer]; 10
// rtts=[bool]; true/false
// size=[interger]; 4096
// count=[integer]; 1000
// timeout=[integer]; 10000
// binary=[bool]; true/false
// sync=[bool]; true/false
// correctness=[string]; exact/length
// generic stuff
m_uri = extract_string(cmd,"uri");
m_token = extract_string(cmd,"token");
m_quantile_count = extract_number<size_t>(cmd,"quantile_count");
m_rtts = extract_bool(cmd,"rtts");
// specific to message_test
m_message_count = extract_number<uint64_t>(cmd,"size");
m_message_size = extract_number<uint64_t>(cmd,"count");
m_timeout = extract_number<uint64_t>(cmd,"timeout");
@@ -121,7 +127,7 @@ public:
m_timer->cancel();
m_msg.reset();
m_pass = FAIL;
std::cout << "foo" << std::endl;
this->end(con);
}

View File

@@ -83,36 +83,47 @@ struct request {
if (command.command == "message_test") {
test = case_handler_ptr(new message_test(command));
token = test->get_token();
uri = test->get_uri();
} else {
writer->write(prepare_response("error","Invalid Command"));
return;
}
writer->write(prepare_response("test_start",""));
client e(test);
e.alog().unset_level(websocketpp::log::alevel::ALL);
e.elog().unset_level(websocketpp::log::elevel::ALL);
e.elog().set_level(websocketpp::log::elevel::RERROR);
e.elog().set_level(websocketpp::log::elevel::FATAL);
e.connect(uri);
e.run();
writer->write(prepare_response_object("test_data",test->get_data()));
writer->write(prepare_response("test_complete",""));
} catch (case_exception& e) {
writer->write(prepare_response("error",e.what()));
return;
} catch (websocketpp::exception& e) {
writer->write(prepare_response("error",e.what()));
return;
} catch (websocketpp::uri_exception& e) {
writer->write(prepare_response("error",e.what()));
return;
}
writer->write(prepare_response("test_start",""));
client e(test);
e.alog().unset_level(websocketpp::log::alevel::ALL);
e.elog().unset_level(websocketpp::log::elevel::ALL);
e.elog().set_level(websocketpp::log::elevel::RERROR);
e.elog().set_level(websocketpp::log::elevel::FATAL);
e.connect(uri);
e.run();
writer->write(prepare_response("test_start",test->get_data()));
writer->write(prepare_response("test_complete",""));
}
std::string prepare_response(std::string type,std::string data) {
return "{\"type\":\"" + type + "\",\"token\":\"" + token + "\",\"data\":\"" + data + "\"}";
}
std::string prepare_response_object(std::string type,std::string data) {
return "{\"type\":\"" + type + "\",\"token\":\"" + token + "\",\"data\":" + data + "}";
}
};
// The coordinator is a simple wrapper around an STL queue. add_request inserts

View File

@@ -54,7 +54,7 @@ using websocketpp::server;
// based on hardware concurrency available and expected load and
// job length.
int main(int argc, char* argv[]) {
unsigned short port = 9003;
unsigned short port = 9050;
unsigned short num_threads = 2;
try {