adds client mode and program options to wsperf

This commit is contained in:
Peter Thorson
2012-03-10 09:10:01 -06:00
parent 0360e6669a
commit 7042091a61
6 changed files with 243 additions and 54 deletions

3
.gitignore vendored
View File

@@ -47,4 +47,5 @@ examples/wsperf/wsperf
.sconsign.dblite
build/
build/
examples/wsperf/wsperf_client

View File

@@ -18,13 +18,16 @@ else
endif
ifeq ($(SHARED), 1)
LDFLAGS := $(LDFLAGS) -lwebsocketpp $(darwinlibs) -lboost_system -lboost_thread -lboost_random -lboost_regex -lboost_date_time -boost_chrono
LDFLAGS := $(LDFLAGS) -lwebsocketpp $(darwinlibs) -lboost_system -lboost_thread -lboost_random -lboost_regex -lboost_date_time -boost_chrono -lboost_program_options
else
LDFLAGS := $(LDFLAGS) $(WEBSOCKETPP_PATH)/libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a $(BOOST_LIB_PATH)/libboost_random.a $(BOOST_LIB_PATH)/libboost_chrono.a $(darwinlibs)
LDFLAGS := $(LDFLAGS) $(WEBSOCKETPP_PATH)/libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a $(BOOST_LIB_PATH)/libboost_random.a $(BOOST_LIB_PATH)/libboost_chrono.a $(BOOST_LIB_PATH)/libboost_program_options.a $(darwinlibs)
endif
wsperf: wsperf.o
$(CXX) $(CFLAGS) $^ -o $@ $(LDFLAGS)
wsperf_client: wsperf_client.o
$(CXX) $(CFLAGS) $^ -o $@ $(LDFLAGS)
%.o: %.cpp
$(CXX) -c $(CFLAGS) -o $@ $^
@@ -33,4 +36,4 @@ wsperf: wsperf.o
#
.PHONY: clean
clean:
rm -f *.o wsperf
rm -f *.o wsperf wsperf_client

View File

@@ -43,6 +43,11 @@ using websocketpp::server;
namespace wsperf {
enum request_type {
PERF_TEST = 0,
END_WORKER = 1
};
class writer {
public:
virtual void write(std::string msg) = 0;
@@ -69,6 +74,7 @@ private:
struct request {
writer_ptr writer;
request_type type;
std::string req; // The raw request
std::string token; // Parsed test token. Return in all results
@@ -163,10 +169,37 @@ public:
void on_message(connection_ptr con,message_ptr msg) {
request r;
r.type = PERF_TEST;
r.writer = writer_ptr(new ws_writer<server>(con));
r.req = msg->get_payload();
m_coordinator.add_request(r);
}
void on_fail(connection_ptr con) {
std::cout << "A connection from a command client failed." << std::endl;
}
private:
request_coordinator& m_coordinator;
};
// The WebSocket++ handler in this case reads numbers from connections and packs
// connection pointer + number into a request struct and passes it off to the
// coordinator.
class concurrent_client_handler : public client::handler {
public:
concurrent_client_handler(request_coordinator& c) : m_coordinator(c) {}
void on_message(connection_ptr con,message_ptr msg) {
request r;
r.type = PERF_TEST;
r.writer = writer_ptr(new ws_writer<client>(con));
r.req = msg->get_payload();
m_coordinator.add_request(r);
}
void on_fail(connection_ptr con) {
std::cout << "The connection to the commanding server failed." << std::endl;
}
private:
request_coordinator& m_coordinator;
};
@@ -182,7 +215,11 @@ void process_requests(request_coordinator* coordinator) {
while (1) {
coordinator->get_request(r);
r.process();
if (r.type == PERF_TEST) {
r.process();
} else {
break;
}
}
}

View File

@@ -0,0 +1,17 @@
##
## wsperf config file
##
## MODE (choose one)
server = 1
#client = 1
## GLOBAL OPTIONS
num_threads = 2
## OPTIONS FOR SERVER
port = 9501
## OPTIONS FOR CLIENT
## uri of command server
#uri = "ws://localhost:9005"

View File

@@ -27,73 +27,200 @@
#include "request.hpp"
#include "../../src/roles/client.hpp"
#include "../../src/websocketpp.hpp"
#include <boost/program_options.hpp>
#include <boost/thread/thread.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/thread/condition_variable.hpp>
#include <cstring>
#include <sstream>
#include <fstream>
using websocketpp::server;
// This default will only work on unix systems.
// Windows systems should set this as a compile flag to an appropriate value
#ifndef WSPERF_CONFIG
#define WSPERF_CONFIG "~/.wsperf"
#endif
// concurrent server takes two arguments. A port to bind to and a number of
// worker threads to create. The thread count must be an integer greater than
// or equal to zero.
//
// num_threads=0 Standard non-threaded WebSocket++ mode. Handlers will block
// i/o operations and other handlers.
// num_threads=1 One thread processes requests serially the other handles i/o
// This allows new connections and requests to be made while the
// processing thread is busy, but does allow long jobs to
// monopolize the processor increasing request latency.
// num_threads>1 Multiple processing threads will work on the single queue of
// requests provided by the i/o thread. This enables out of order
// completion of requests. The number of threads can be tuned
// based on hardware concurrency available and expected load and
// job length.
int main(int argc, char* argv[]) {
unsigned short port = 9050;
unsigned short num_threads = 2;
static const std::string user_agent = "wsperf/0.2.0dev WebSocket++/0.2.0dev";
using websocketpp::client;
namespace po = boost::program_options;
int start_server(po::variables_map& vm) {
unsigned short port = vm["port"].as<unsigned short>();
unsigned int num_threads = vm["num_threads"].as<unsigned int>();
std::list< boost::shared_ptr<boost::thread> > threads;
wsperf::request_coordinator rc;
server::handler::ptr h;
if (num_threads == 0) {
std::cout << "bad thread number" << std::endl;
return 1;
} else {
h = server::handler::ptr(new wsperf::concurrent_server_handler(rc));
}
server endpoint(h);
endpoint.alog().unset_level(websocketpp::log::alevel::ALL);
endpoint.elog().unset_level(websocketpp::log::elevel::ALL);
endpoint.elog().set_level(websocketpp::log::elevel::RERROR);
endpoint.elog().set_level(websocketpp::log::elevel::FATAL);
for (unsigned int i = 0; i < num_threads; i++) {
threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&wsperf::process_requests, &rc))));
}
std::cout << "Starting wsperf server on port " << port << " with " << num_threads << " processing threads." << std::endl;
endpoint.listen(port);
return 0;
}
int start_client(po::variables_map& vm) {
if (!vm.count("uri")) {
std::cout << "client mode requires uri" << std::endl;
return 1;
}
std::string uri = vm["uri"].as<std::string>();
unsigned int num_threads = vm["num_threads"].as<unsigned int>();
// Start wsperf
std::list< boost::shared_ptr<boost::thread> > threads;
std::list< boost::shared_ptr<boost::thread> >::iterator thread_it;
wsperf::request_coordinator rc;
client::handler::ptr h;
if (num_threads == 0) {
std::cout << "bad thread number" << std::endl;
return 1;
} else {
h = client::handler::ptr(new wsperf::concurrent_client_handler(rc));
}
client endpoint(h);
endpoint.alog().unset_level(websocketpp::log::alevel::ALL);
endpoint.elog().unset_level(websocketpp::log::elevel::ALL);
endpoint.alog().set_level(websocketpp::log::alevel::CONNECT);
endpoint.elog().set_level(websocketpp::log::elevel::RERROR);
endpoint.elog().set_level(websocketpp::log::elevel::FATAL);
for (unsigned int i = 0; i < num_threads; i++) {
threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&wsperf::process_requests, &rc))));
}
std::cout << "Starting wsperf client connecting to " << uri << " with " << num_threads << " processing threads." << std::endl;
client::connection_ptr con = endpoint.get_connection(uri);
con->add_request_header("User Agent",user_agent);
con->add_subprotocol("wsperf");
endpoint.connect(con);
// This will block until there is an error or the websocket closes
endpoint.run();
// Add a "stop work" request for each outstanding worker thread
for (thread_it = threads.begin(); thread_it != threads.end(); ++thread_it) {
wsperf::request r;
r.type = wsperf::END_WORKER;
rc.add_request(r);
}
// Wait for worker threads to finish quitting.
for (thread_it = threads.begin(); thread_it != threads.end(); ++thread_it) {
(*thread_it)->join();
}
return 0;
}
std::string env_mapper(std::string input) {
if (input == "WSPERF_CONFIG") {
return "config";
}
return "";
}
int main(int argc, char* argv[]) {
try {
std::list< boost::shared_ptr<boost::thread> > threads;
std::string config_file;
if (argc == 2) {
std::stringstream buffer(argv[1]);
buffer >> port;
// Read and Process Command Line Options
po::options_description generic("Generic");
generic.add_options()
("help", "produce this help message")
("version,v", po::value<int>()->implicit_value(1), "Print version information")
("config", po::value<std::string>(&config_file)->default_value(WSPERF_CONFIG),
"Configuration file to use.")
;
po::options_description config("Configuration");
config.add_options()
("server,s", po::value<int>()->implicit_value(1), "Run in server mode")
("client,c", po::value<int>()->implicit_value(1), "Run in client mode")
("port,p", po::value<unsigned short>()->default_value(9050), "Port to listen on in server mode")
("uri,u", po::value<std::string>(), "URI to connect to in client mode")
("num_threads", po::value<unsigned int>()->default_value(2), "Number of worker threads to use")
;
/*po::options_description environment("Environment");
environment.add_options()
("config", po::value<std::string>(&config_file)->default_value("~/.wsperf"),
"Configuration file to use.")
;*/
po::options_description cmdline_options;
cmdline_options.add(generic).add(config);
po::options_description config_file_options;
config_file_options.add(config);
po::options_description visible("Allowed options");
visible.add(generic).add(config);
po::variables_map vm;
po::store(po::parse_command_line(argc, argv, cmdline_options), vm);
//po::store(po::parse_environment(environment,env_mapper), vm);
po::notify(vm);
std::ifstream ifs(config_file.c_str());
if (ifs) {
store(parse_config_file(ifs, config_file_options), vm);
notify(vm);
}
if (argc == 3) {
std::stringstream buffer(argv[2]);
buffer >> num_threads;
}
wsperf::request_coordinator rc;
server::handler::ptr h;
if (num_threads == 0) {
std::cout << "bad thread number" << std::endl;
if (vm.count("help")) {
std::cout << cmdline_options << std::endl;
return 1;
}
if (vm.count("version")) {
std::cout << user_agent << std::endl;
return 1;
}
if (vm.count("server") && vm["server"].as<int>() == 1) {
return start_server(vm);
} else if (vm.count("client") && vm["client"].as<int>() == 1) {
return start_client(vm);
} else {
h = server::handler::ptr(new wsperf::concurrent_server_handler(rc));
std::cout << cmdline_options << std::endl;
return 1;
}
server echo_endpoint(h);
echo_endpoint.alog().unset_level(websocketpp::log::alevel::ALL);
echo_endpoint.elog().unset_level(websocketpp::log::elevel::ALL);
echo_endpoint.elog().set_level(websocketpp::log::elevel::RERROR);
echo_endpoint.elog().set_level(websocketpp::log::elevel::FATAL);
for (int i = 0; i < num_threads; i++) {
threads.push_back(boost::shared_ptr<boost::thread>(new boost::thread(boost::bind(&wsperf::process_requests, &rc))));
}
std::cout << "Starting wsperf server on port " << port << " with " << num_threads << " processing threads." << std::endl;
echo_endpoint.listen(port);
} catch (std::exception& e) {
std::cerr << "Exception: " << e.what() << std::endl;
}

View File

@@ -373,6 +373,8 @@
B6E56D7E150644A3007E1707 /* request.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = request.cpp; path = examples/wsperf/request.cpp; sourceTree = "<group>"; };
B6E56D7F150644A3007E1707 /* request.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = request.hpp; path = examples/wsperf/request.hpp; sourceTree = "<group>"; };
B6E56D821508EACA007E1707 /* SConstruct */ = {isa = PBXFileReference; lastKnownFileType = text; path = SConstruct; sourceTree = "<group>"; };
B6E56D84150B887A007E1707 /* SConscript */ = {isa = PBXFileReference; lastKnownFileType = text; name = SConscript; path = examples/wsperf/SConscript; sourceTree = "<group>"; };
B6E56D85150B9021007E1707 /* wsperf.cfg */ = {isa = PBXFileReference; lastKnownFileType = text; name = wsperf.cfg; path = examples/wsperf/wsperf.cfg; sourceTree = "<group>"; };
B6E7E7731505532E00394909 /* wsperf */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = wsperf; sourceTree = BUILT_PRODUCTS_DIR; };
B6E7E78A150553D000394909 /* libboost_chrono.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = libboost_chrono.dylib; path = usr/local/lib/libboost_chrono.dylib; sourceTree = SDKROOT; };
B6FE8CE2144DE17F00B32547 /* readme.txt */ = {isa = PBXFileReference; lastKnownFileType = text; path = readme.txt; sourceTree = "<group>"; };
@@ -901,7 +903,9 @@
B6E56D6414FEFC43007E1707 /* wsperf */ = {
isa = PBXGroup;
children = (
B6E56D84150B887A007E1707 /* SConscript */,
B6E56D6614FEFC54007E1707 /* Makefile */,
B6E56D85150B9021007E1707 /* wsperf.cfg */,
B6E56D6714FEFC54007E1707 /* wsperf_commander.html */,
B6E56D6814FEFC54007E1707 /* wsperf.cpp */,
B6E56D7E150644A3007E1707 /* request.cpp */,