mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
stress_test now supports message sending & stats
This commit is contained in:
@@ -28,6 +28,7 @@
|
||||
#include "request.hpp"
|
||||
#include "stress_aggregate.hpp"
|
||||
|
||||
#include <algorithm>
|
||||
#include <sstream>
|
||||
|
||||
using wsperf::request;
|
||||
@@ -36,6 +37,7 @@ void request::process(unsigned int id) {
|
||||
case_handler_ptr test;
|
||||
stress_handler_ptr shandler;
|
||||
std::string uri;
|
||||
size_t connections_opened = 0;
|
||||
size_t connection_count;
|
||||
|
||||
wscmd::cmd command = wscmd::parse(req);
|
||||
@@ -48,6 +50,7 @@ void request::process(unsigned int id) {
|
||||
} else if (command.command == "stress_test") {
|
||||
shandler = stress_handler_ptr(new stress_aggregate(command));
|
||||
|
||||
// todo make sure this isn't 0
|
||||
if(!wscmd::extract_number<size_t>(command, "connection_count",connection_count)) {
|
||||
connection_count = 1;
|
||||
}
|
||||
@@ -92,6 +95,14 @@ void request::process(unsigned int id) {
|
||||
} else if (command.command == "stress_test") {
|
||||
client e(shandler);
|
||||
|
||||
e.alog().unset_level(websocketpp::log::alevel::ALL);
|
||||
e.elog().unset_level(websocketpp::log::elevel::ALL);
|
||||
|
||||
/*client::connection_ptr con;
|
||||
con = e.get_connection(uri);
|
||||
shandler->on_connect(con);
|
||||
e.connect(con);*/
|
||||
|
||||
boost::thread t(boost::bind(&client::run, &e, true));
|
||||
|
||||
size_t handshake_delay;
|
||||
@@ -99,37 +110,53 @@ void request::process(unsigned int id) {
|
||||
handshake_delay = 10;
|
||||
}
|
||||
|
||||
// create connections
|
||||
// open n connections
|
||||
for (size_t i = 0; i < connection_count; i++) {
|
||||
client::connection_ptr con;
|
||||
|
||||
con = e.get_connection(uri);
|
||||
|
||||
shandler->on_connect(con);
|
||||
|
||||
e.connect(con);
|
||||
|
||||
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(handshake_delay));
|
||||
}
|
||||
|
||||
for (;;) {
|
||||
// send update
|
||||
writer->write(prepare_response_object("test_data",shandler->get_data()));
|
||||
// start sending messages
|
||||
shandler->start_message_test();
|
||||
|
||||
e.end_perpetual();
|
||||
|
||||
t.join();
|
||||
|
||||
std::cout << "writing data" << std::endl;
|
||||
writer->write(prepare_response_object("test_data",shandler->get_data()));
|
||||
|
||||
/*for (;;) {
|
||||
// tell the handler to perform its event loop
|
||||
|
||||
// check for too few connections
|
||||
bool quit = false;
|
||||
|
||||
bool quit = shandler->maintenance();
|
||||
if (connections_opened == connection_count) {
|
||||
std::cout << "maintenance loop" << std::endl;
|
||||
quit = shandler->maintenance();
|
||||
}
|
||||
|
||||
// check for done-ness
|
||||
if (quit) {
|
||||
// send update to command
|
||||
std::cout << "writing data" << std::endl;
|
||||
writer->write(prepare_response_object("test_data",shandler->get_data()));
|
||||
break;
|
||||
}
|
||||
|
||||
// unless we know we have something to do, sleep for a bit.
|
||||
if (connections_opened < connection_count) {
|
||||
continue;
|
||||
}
|
||||
|
||||
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
|
||||
}
|
||||
e.end_perpetual();
|
||||
t.join();
|
||||
}*/
|
||||
//e.end_perpetual();
|
||||
|
||||
}
|
||||
|
||||
writer->write(prepare_response("test_complete",""));
|
||||
|
||||
@@ -65,12 +65,12 @@ void stress_aggregate::start(connection_ptr con) {
|
||||
|
||||
}
|
||||
|
||||
void stress_aggregate::on_message(connection_ptr con,websocketpp::message::data_ptr msg) {
|
||||
/*void stress_aggregate::on_message(connection_ptr con,websocketpp::message::data_ptr msg) {
|
||||
std::string hash = websocketpp::md5_hash_hex(msg->get_payload());
|
||||
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
m_msg_stats[hash]++;
|
||||
}
|
||||
}*/
|
||||
|
||||
/*std::string stress_aggregate::get_data() const {
|
||||
std::stringstream data;
|
||||
|
||||
@@ -40,7 +40,7 @@ public:
|
||||
/// Construct a stress test from a wscmd command
|
||||
explicit stress_aggregate(wscmd::cmd& cmd);
|
||||
|
||||
void on_message(connection_ptr con,websocketpp::message::data_ptr msg);
|
||||
//void on_message(connection_ptr con,websocketpp::message::data_ptr msg);
|
||||
|
||||
void start(connection_ptr con);
|
||||
void end();
|
||||
|
||||
@@ -64,6 +64,7 @@ stress_handler::stress_handler(wscmd::cmd& cmd)
|
||||
, m_failed_connections(0)
|
||||
, m_next_con_id(0)
|
||||
, m_init(boost::chrono::steady_clock::now())
|
||||
, m_next_msg_id(0)
|
||||
, m_con_sync(false)
|
||||
{
|
||||
if (!wscmd::extract_number<size_t>(cmd,"msg_count",m_msg_count)) {
|
||||
@@ -112,14 +113,6 @@ stress_handler::stress_handler(wscmd::cmd& cmd)
|
||||
// TODO: choose random number between 0 and max_dur
|
||||
m_con_duration = max_dur;
|
||||
}
|
||||
|
||||
std::cout << "con_duration: " << m_con_duration
|
||||
<< "msg_count: " << m_msg_count
|
||||
<< "msg_size: " << m_msg_size
|
||||
<< "m_con_sync: " << m_con_sync
|
||||
<< "m_con_lifetime: " << m_con_lifetime
|
||||
<< "m_msg_mode: " << m_msg_mode
|
||||
<< std::endl;
|
||||
}
|
||||
|
||||
void stress_handler::on_connect(connection_ptr con) {
|
||||
@@ -127,18 +120,76 @@ void stress_handler::on_connect(connection_ptr con) {
|
||||
|
||||
m_con_data[con] = con_data(m_next_con_id++, m_init);
|
||||
m_con_data[con].start = boost::chrono::steady_clock::now();
|
||||
m_dirty.push_back(con);
|
||||
}
|
||||
|
||||
void stress_handler::on_handshake_init(connection_ptr con) {
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
m_con_data[con].tcp_established = boost::chrono::steady_clock::now();
|
||||
m_dirty.push_back(con);
|
||||
|
||||
// TODO: log close reason?
|
||||
}
|
||||
|
||||
void stress_handler::start_message_test() {
|
||||
m_msg.reset(new std::string());
|
||||
m_msg->assign(m_msg_size,'*');
|
||||
|
||||
// for each connection send the first message
|
||||
std::map<connection_ptr,con_data>::iterator it;
|
||||
for (it = m_con_data.begin(); it != m_con_data.end(); it++) {
|
||||
connection_ptr con = (*it).first;
|
||||
con_data& data = (*it).second;
|
||||
|
||||
/*data.msg = con->get_data_message();
|
||||
std::string msg;
|
||||
msg.assign(m_msg_size,'*');
|
||||
|
||||
data.msg->set_payload(msg);
|
||||
|
||||
data.msg->reset(websocketpp::frame::opcode::BINARY);*/
|
||||
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
msg_data foo;
|
||||
|
||||
foo.msg_id = m_next_msg_id++;
|
||||
foo.send_time = boost::chrono::steady_clock::now();
|
||||
data.messages.push_back(foo);
|
||||
|
||||
con->send(*m_msg);
|
||||
}
|
||||
}
|
||||
|
||||
void stress_handler::on_message(connection_ptr con,websocketpp::message::data_ptr msg) {
|
||||
time_point mark = boost::chrono::steady_clock::now();
|
||||
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
std::map<connection_ptr,con_data>::iterator element = m_con_data.find(con);
|
||||
|
||||
if (element == m_con_data.end()) {
|
||||
std::cout << "Bad Bad Bad" << std::endl;
|
||||
return;
|
||||
}
|
||||
|
||||
con_data& data = (*element).second;
|
||||
|
||||
data.messages.back().recv_time = mark;
|
||||
|
||||
|
||||
if (data.messages.size() < m_msg_count) {
|
||||
msg_data foo;
|
||||
|
||||
foo.msg_id = m_next_msg_id++;
|
||||
foo.send_time = boost::chrono::steady_clock::now();
|
||||
data.messages.push_back(foo);
|
||||
|
||||
con->send(*m_msg);
|
||||
} else {
|
||||
close(con);
|
||||
}
|
||||
}
|
||||
|
||||
void stress_handler::on_open(connection_ptr con) {
|
||||
{
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
@@ -152,7 +203,6 @@ void stress_handler::on_open(connection_ptr con) {
|
||||
|
||||
m_con_data[con].on_open = boost::chrono::steady_clock::now();
|
||||
m_con_data[con].status = "Open";
|
||||
m_dirty.push_back(con);
|
||||
}
|
||||
|
||||
start(con);
|
||||
@@ -165,7 +215,6 @@ void stress_handler::on_close(connection_ptr con) {
|
||||
|
||||
m_con_data[con].on_close = boost::chrono::steady_clock::now();
|
||||
m_con_data[con].status = "Closed";
|
||||
m_dirty.push_back(con);
|
||||
|
||||
// TODO: log close reason?
|
||||
}
|
||||
@@ -177,19 +226,19 @@ void stress_handler::on_fail(connection_ptr con) {
|
||||
|
||||
m_con_data[con].on_fail = boost::chrono::steady_clock::now();
|
||||
m_con_data[con].status = "Failed";
|
||||
m_dirty.push_back(con);
|
||||
|
||||
// TODO: log failure reason
|
||||
}
|
||||
|
||||
void stress_handler::start(connection_ptr con) {}
|
||||
void stress_handler::start(connection_ptr con) {
|
||||
//close(con);
|
||||
}
|
||||
|
||||
void stress_handler::close(connection_ptr con) {
|
||||
//boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
m_con_data[con].close_sent = boost::chrono::steady_clock::now();
|
||||
m_con_data[con].status = "Closing";
|
||||
m_dirty.push_back(con);
|
||||
|
||||
con->close(websocketpp::close::status::NORMAL);
|
||||
// TODO: log close reason?
|
||||
@@ -198,10 +247,12 @@ void stress_handler::close(connection_ptr con) {
|
||||
std::string stress_handler::get_data() const {
|
||||
std::stringstream data;
|
||||
|
||||
|
||||
data << "{";
|
||||
|
||||
{
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
data << "\"current_connections\":" << m_current_connections;
|
||||
data << ",\"max_connections\":" << m_max_connections;
|
||||
data << ",\"total_connections\":" << m_total_connections;
|
||||
@@ -211,20 +262,11 @@ std::string stress_handler::get_data() const {
|
||||
|
||||
// for each item in m_dirty
|
||||
std::string sep = "";
|
||||
std::list<connection_ptr>::const_iterator it;
|
||||
for (it = m_dirty.begin(); it != m_dirty.end(); it++) {
|
||||
std::map<connection_ptr,con_data>::const_iterator element;
|
||||
|
||||
element = m_con_data.find(*it);
|
||||
|
||||
if (element == m_con_data.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
data << sep << element->second.print();
|
||||
std::map<connection_ptr,con_data>::const_iterator it;
|
||||
for (it = m_con_data.begin(); it != m_con_data.end(); it++) {
|
||||
data << sep << (*it).second.print();
|
||||
sep = ",";
|
||||
}
|
||||
m_dirty.clear();
|
||||
|
||||
data << "]";
|
||||
}
|
||||
@@ -235,45 +277,29 @@ std::string stress_handler::get_data() const {
|
||||
}
|
||||
|
||||
bool stress_handler::maintenance() {
|
||||
std::list<connection_ptr> to_process;
|
||||
std::cout << "locking..." << std::endl;
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
{
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
bool quit = true;
|
||||
|
||||
std::map<connection_ptr,con_data>::iterator it;
|
||||
for (it = m_con_data.begin(); it != m_con_data.end(); it++) {
|
||||
to_process.push_back((*it).first);
|
||||
if ((*it).first->get_state() != websocketpp::session::state::CLOSED) {
|
||||
quit = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (quit) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
bool quit = true;
|
||||
|
||||
time_point now = boost::chrono::steady_clock::now();
|
||||
|
||||
std::list<connection_ptr>::iterator it;
|
||||
for (it = to_process.begin(); it != to_process.end(); it++) {
|
||||
connection_ptr con = (*it);
|
||||
std::map<connection_ptr,con_data>::iterator element;
|
||||
|
||||
boost::lock_guard<boost::mutex> lock(m_lock);
|
||||
|
||||
element = m_con_data.find(con);
|
||||
|
||||
if (element == m_con_data.end()) {
|
||||
continue;
|
||||
std::cout << "found " << m_con_data.size() << " connections" << std::endl;
|
||||
|
||||
std::map<connection_ptr,con_data>::iterator it;
|
||||
for (it = m_con_data.begin(); it != m_con_data.end(); it++) {
|
||||
if ((*it).first->get_state() != websocketpp::session::state::CLOSED) {
|
||||
quit = false;
|
||||
}
|
||||
|
||||
con_data& data = element->second;
|
||||
connection_ptr con = (*it).first;
|
||||
con_data& data = (*it).second;
|
||||
|
||||
std::cout << "processing " << data.id << "...";
|
||||
|
||||
// check the connection state
|
||||
if (con->get_state() != websocketpp::session::state::OPEN) {
|
||||
std::cout << "ignored" << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
@@ -283,7 +309,50 @@ bool stress_handler::maintenance() {
|
||||
if (milliseconds > m_con_duration) {
|
||||
close(con);
|
||||
}
|
||||
std::cout << "closed" << std::endl;
|
||||
}
|
||||
|
||||
if (quit) {
|
||||
return true;
|
||||
}
|
||||
|
||||
|
||||
/*std::cout << "found " << to_process.size() << " connections" << std::endl;
|
||||
|
||||
|
||||
|
||||
|
||||
std::list<connection_ptr>::iterator it2;
|
||||
for (it2 = to_process.begin(); it2 != to_process.end(); it++) {
|
||||
connection_ptr con = (*it2);
|
||||
std::map<connection_ptr,con_data>::iterator element;
|
||||
|
||||
|
||||
|
||||
element = m_con_data.find(con);
|
||||
|
||||
if (element == m_con_data.end()) {
|
||||
continue;
|
||||
}
|
||||
|
||||
con_data& data = element->second;
|
||||
|
||||
|
||||
|
||||
// check the connection state
|
||||
if (con->get_state() != websocketpp::session::state::OPEN) {
|
||||
std::cout << "ignored" << std::endl;
|
||||
continue;
|
||||
}
|
||||
|
||||
boost::chrono::nanoseconds dur = now - data.on_open;
|
||||
size_t milliseconds = static_cast<size_t>(dur.count()) / 1000000;
|
||||
|
||||
if (milliseconds > m_con_duration) {
|
||||
close(con);
|
||||
}
|
||||
std::cout << "closed" << std::endl;
|
||||
}*/
|
||||
|
||||
return false;
|
||||
}
|
||||
@@ -60,6 +60,14 @@ namespace msg_mode {
|
||||
};
|
||||
}
|
||||
|
||||
struct msg_data {
|
||||
typedef boost::chrono::steady_clock::time_point time_point;
|
||||
|
||||
size_t msg_id;
|
||||
time_point send_time;
|
||||
time_point recv_time;
|
||||
//size_t payload_len;
|
||||
};
|
||||
|
||||
struct con_data {
|
||||
typedef boost::chrono::steady_clock::time_point time_point;
|
||||
@@ -91,6 +99,19 @@ struct con_data {
|
||||
o << ",\"fail\":" << get_rel_microseconds(on_fail);
|
||||
o << ",\"close_sent\":" << get_rel_microseconds(close_sent);
|
||||
o << ",\"close\":" << get_rel_microseconds(on_close);
|
||||
|
||||
o << ",\"messages\":[";
|
||||
std::string sep = "";
|
||||
std::vector<msg_data>::const_iterator it;
|
||||
for (it = messages.begin(); it != messages.end(); it++) {
|
||||
o << sep << "["
|
||||
<< get_rel_microseconds((*it).send_time) << ","
|
||||
<< get_rel_microseconds((*it).recv_time)
|
||||
<< "]";
|
||||
sep = ",";
|
||||
}
|
||||
|
||||
o << "]";
|
||||
|
||||
o << "}";
|
||||
|
||||
@@ -111,8 +132,12 @@ struct con_data {
|
||||
time_point close_sent;
|
||||
time_point on_close;
|
||||
std::string status;
|
||||
std::vector<msg_data> messages;
|
||||
//stress_handler::message_ptr msg;
|
||||
};
|
||||
|
||||
|
||||
|
||||
class stress_handler : public client::handler {
|
||||
public:
|
||||
typedef stress_handler type;
|
||||
@@ -124,6 +149,7 @@ public:
|
||||
|
||||
void on_connect(connection_ptr con);
|
||||
|
||||
void on_message(connection_ptr con,websocketpp::message::data_ptr msg);
|
||||
|
||||
void on_handshake_init(connection_ptr con);
|
||||
void on_open(connection_ptr con);
|
||||
@@ -133,9 +159,11 @@ public:
|
||||
void start(connection_ptr con);
|
||||
void close(connection_ptr con);
|
||||
void end();
|
||||
|
||||
|
||||
std::string get_data() const;
|
||||
virtual bool maintenance();
|
||||
|
||||
void start_message_test();
|
||||
protected:
|
||||
size_t m_current_connections;
|
||||
size_t m_max_connections;
|
||||
@@ -153,6 +181,9 @@ protected:
|
||||
size_t m_timeout;
|
||||
boost::shared_ptr<boost::asio::deadline_timer> m_timer;
|
||||
|
||||
size_t m_next_msg_id;
|
||||
boost::shared_ptr<std::string> m_msg;
|
||||
|
||||
// test settings pulled from the command
|
||||
con_lifetime::value m_con_lifetime;
|
||||
size_t m_con_duration;
|
||||
|
||||
@@ -191,7 +191,7 @@ int start_client(po::variables_map& vm) {
|
||||
int main(int argc, char* argv[]) {
|
||||
try {
|
||||
// 12288 is max OS X limit without changing kernal settings
|
||||
/*const rlim_t ideal_size = 10000;
|
||||
const rlim_t ideal_size = 10000;
|
||||
rlim_t old_size;
|
||||
rlim_t old_max;
|
||||
|
||||
@@ -223,7 +223,7 @@ int main(int argc, char* argv[]) {
|
||||
std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << " system max: " << old_max << std::endl;
|
||||
}
|
||||
}
|
||||
}*/
|
||||
}
|
||||
|
||||
std::string config_file;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user