From 7042091a614b460ef75d0e3ca8dfa1ea8333f1ef Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sat, 10 Mar 2012 09:10:01 -0600 Subject: [PATCH] adds client mode and program options to wsperf --- .gitignore | 3 +- examples/wsperf/Makefile | 9 +- examples/wsperf/request.hpp | 39 ++++- examples/wsperf/wsperf.cfg | 17 ++ examples/wsperf/wsperf.cpp | 225 ++++++++++++++++++++------ websocketpp.xcodeproj/project.pbxproj | 4 + 6 files changed, 243 insertions(+), 54 deletions(-) create mode 100644 examples/wsperf/wsperf.cfg diff --git a/.gitignore b/.gitignore index 14928e0e84..1e6f1faab8 100644 --- a/.gitignore +++ b/.gitignore @@ -47,4 +47,5 @@ examples/wsperf/wsperf .sconsign.dblite -build/ \ No newline at end of file +build/ +examples/wsperf/wsperf_client diff --git a/examples/wsperf/Makefile b/examples/wsperf/Makefile index 95e330108a..08e8356fb7 100644 --- a/examples/wsperf/Makefile +++ b/examples/wsperf/Makefile @@ -18,13 +18,16 @@ else endif ifeq ($(SHARED), 1) - LDFLAGS := $(LDFLAGS) -lwebsocketpp $(darwinlibs) -lboost_system -lboost_thread -lboost_random -lboost_regex -lboost_date_time -boost_chrono + LDFLAGS := $(LDFLAGS) -lwebsocketpp $(darwinlibs) -lboost_system -lboost_thread -lboost_random -lboost_regex -lboost_date_time -boost_chrono -lboost_program_options else - LDFLAGS := $(LDFLAGS) $(WEBSOCKETPP_PATH)/libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a $(BOOST_LIB_PATH)/libboost_random.a $(BOOST_LIB_PATH)/libboost_chrono.a $(darwinlibs) + LDFLAGS := $(LDFLAGS) $(WEBSOCKETPP_PATH)/libwebsocketpp.a $(BOOST_LIB_PATH)/libboost_system.a $(BOOST_LIB_PATH)/libboost_date_time.a $(BOOST_LIB_PATH)/libboost_regex.a $(BOOST_LIB_PATH)/libboost_thread.a $(BOOST_LIB_PATH)/libboost_random.a $(BOOST_LIB_PATH)/libboost_chrono.a $(BOOST_LIB_PATH)/libboost_program_options.a $(darwinlibs) endif wsperf: wsperf.o $(CXX) $(CFLAGS) $^ -o $@ $(LDFLAGS) + +wsperf_client: wsperf_client.o + $(CXX) $(CFLAGS) $^ -o $@ $(LDFLAGS) %.o: %.cpp $(CXX) -c $(CFLAGS) -o $@ $^ @@ -33,4 +36,4 @@ wsperf: wsperf.o # .PHONY: clean clean: - rm -f *.o wsperf + rm -f *.o wsperf wsperf_client diff --git a/examples/wsperf/request.hpp b/examples/wsperf/request.hpp index f1484cae54..c0c8bc664a 100644 --- a/examples/wsperf/request.hpp +++ b/examples/wsperf/request.hpp @@ -43,6 +43,11 @@ using websocketpp::server; namespace wsperf { +enum request_type { + PERF_TEST = 0, + END_WORKER = 1 +}; + class writer { public: virtual void write(std::string msg) = 0; @@ -69,6 +74,7 @@ private: struct request { writer_ptr writer; + request_type type; std::string req; // The raw request std::string token; // Parsed test token. Return in all results @@ -163,10 +169,37 @@ public: void on_message(connection_ptr con,message_ptr msg) { request r; + r.type = PERF_TEST; r.writer = writer_ptr(new ws_writer(con)); r.req = msg->get_payload(); m_coordinator.add_request(r); } + + void on_fail(connection_ptr con) { + std::cout << "A connection from a command client failed." << std::endl; + } +private: + request_coordinator& m_coordinator; +}; + +// The WebSocket++ handler in this case reads numbers from connections and packs +// connection pointer + number into a request struct and passes it off to the +// coordinator. +class concurrent_client_handler : public client::handler { +public: + concurrent_client_handler(request_coordinator& c) : m_coordinator(c) {} + + void on_message(connection_ptr con,message_ptr msg) { + request r; + r.type = PERF_TEST; + r.writer = writer_ptr(new ws_writer(con)); + r.req = msg->get_payload(); + m_coordinator.add_request(r); + } + + void on_fail(connection_ptr con) { + std::cout << "The connection to the commanding server failed." << std::endl; + } private: request_coordinator& m_coordinator; }; @@ -182,7 +215,11 @@ void process_requests(request_coordinator* coordinator) { while (1) { coordinator->get_request(r); - r.process(); + if (r.type == PERF_TEST) { + r.process(); + } else { + break; + } } } diff --git a/examples/wsperf/wsperf.cfg b/examples/wsperf/wsperf.cfg new file mode 100644 index 0000000000..c44b728d97 --- /dev/null +++ b/examples/wsperf/wsperf.cfg @@ -0,0 +1,17 @@ +## +## wsperf config file +## + +## MODE (choose one) +server = 1 +#client = 1 + +## GLOBAL OPTIONS +num_threads = 2 + +## OPTIONS FOR SERVER +port = 9501 + +## OPTIONS FOR CLIENT +## uri of command server +#uri = "ws://localhost:9005" \ No newline at end of file diff --git a/examples/wsperf/wsperf.cpp b/examples/wsperf/wsperf.cpp index 1f020a4e4f..838b1f1d47 100644 --- a/examples/wsperf/wsperf.cpp +++ b/examples/wsperf/wsperf.cpp @@ -27,73 +27,200 @@ #include "request.hpp" +#include "../../src/roles/client.hpp" #include "../../src/websocketpp.hpp" +#include #include #include #include #include #include +#include -using websocketpp::server; +// This default will only work on unix systems. +// Windows systems should set this as a compile flag to an appropriate value +#ifndef WSPERF_CONFIG + #define WSPERF_CONFIG "~/.wsperf" +#endif -// concurrent server takes two arguments. A port to bind to and a number of -// worker threads to create. The thread count must be an integer greater than -// or equal to zero. -// -// num_threads=0 Standard non-threaded WebSocket++ mode. Handlers will block -// i/o operations and other handlers. -// num_threads=1 One thread processes requests serially the other handles i/o -// This allows new connections and requests to be made while the -// processing thread is busy, but does allow long jobs to -// monopolize the processor increasing request latency. -// num_threads>1 Multiple processing threads will work on the single queue of -// requests provided by the i/o thread. This enables out of order -// completion of requests. The number of threads can be tuned -// based on hardware concurrency available and expected load and -// job length. -int main(int argc, char* argv[]) { - unsigned short port = 9050; - unsigned short num_threads = 2; +static const std::string user_agent = "wsperf/0.2.0dev WebSocket++/0.2.0dev"; + +using websocketpp::client; +namespace po = boost::program_options; + +int start_server(po::variables_map& vm) { + unsigned short port = vm["port"].as(); + unsigned int num_threads = vm["num_threads"].as(); + std::list< boost::shared_ptr > threads; + + wsperf::request_coordinator rc; + + server::handler::ptr h; + if (num_threads == 0) { + std::cout << "bad thread number" << std::endl; + return 1; + } else { + h = server::handler::ptr(new wsperf::concurrent_server_handler(rc)); + } + + server endpoint(h); + + endpoint.alog().unset_level(websocketpp::log::alevel::ALL); + endpoint.elog().unset_level(websocketpp::log::elevel::ALL); + + endpoint.elog().set_level(websocketpp::log::elevel::RERROR); + endpoint.elog().set_level(websocketpp::log::elevel::FATAL); + + for (unsigned int i = 0; i < num_threads; i++) { + threads.push_back(boost::shared_ptr(new boost::thread(boost::bind(&wsperf::process_requests, &rc)))); + } + + std::cout << "Starting wsperf server on port " << port << " with " << num_threads << " processing threads." << std::endl; + endpoint.listen(port); + + return 0; +} + +int start_client(po::variables_map& vm) { + if (!vm.count("uri")) { + std::cout << "client mode requires uri" << std::endl; + return 1; + } + + std::string uri = vm["uri"].as(); + unsigned int num_threads = vm["num_threads"].as(); + + // Start wsperf + std::list< boost::shared_ptr > threads; + std::list< boost::shared_ptr >::iterator thread_it; + + wsperf::request_coordinator rc; + + client::handler::ptr h; + if (num_threads == 0) { + std::cout << "bad thread number" << std::endl; + return 1; + } else { + h = client::handler::ptr(new wsperf::concurrent_client_handler(rc)); + } + + client endpoint(h); + + endpoint.alog().unset_level(websocketpp::log::alevel::ALL); + endpoint.elog().unset_level(websocketpp::log::elevel::ALL); + + endpoint.alog().set_level(websocketpp::log::alevel::CONNECT); + + endpoint.elog().set_level(websocketpp::log::elevel::RERROR); + endpoint.elog().set_level(websocketpp::log::elevel::FATAL); + + for (unsigned int i = 0; i < num_threads; i++) { + threads.push_back(boost::shared_ptr(new boost::thread(boost::bind(&wsperf::process_requests, &rc)))); + } + + std::cout << "Starting wsperf client connecting to " << uri << " with " << num_threads << " processing threads." << std::endl; + client::connection_ptr con = endpoint.get_connection(uri); + + con->add_request_header("User Agent",user_agent); + con->add_subprotocol("wsperf"); + + endpoint.connect(con); + + // This will block until there is an error or the websocket closes + endpoint.run(); + + // Add a "stop work" request for each outstanding worker thread + for (thread_it = threads.begin(); thread_it != threads.end(); ++thread_it) { + wsperf::request r; + r.type = wsperf::END_WORKER; + rc.add_request(r); + } + + // Wait for worker threads to finish quitting. + for (thread_it = threads.begin(); thread_it != threads.end(); ++thread_it) { + (*thread_it)->join(); + } + + return 0; +} + +std::string env_mapper(std::string input) { + if (input == "WSPERF_CONFIG") { + return "config"; + } + return ""; +} + +int main(int argc, char* argv[]) { try { - std::list< boost::shared_ptr > threads; + std::string config_file; - if (argc == 2) { - std::stringstream buffer(argv[1]); - buffer >> port; + // Read and Process Command Line Options + po::options_description generic("Generic"); + generic.add_options() + ("help", "produce this help message") + ("version,v", po::value()->implicit_value(1), "Print version information") + ("config", po::value(&config_file)->default_value(WSPERF_CONFIG), + "Configuration file to use.") + ; + + po::options_description config("Configuration"); + config.add_options() + ("server,s", po::value()->implicit_value(1), "Run in server mode") + ("client,c", po::value()->implicit_value(1), "Run in client mode") + ("port,p", po::value()->default_value(9050), "Port to listen on in server mode") + ("uri,u", po::value(), "URI to connect to in client mode") + ("num_threads", po::value()->default_value(2), "Number of worker threads to use") + ; + + /*po::options_description environment("Environment"); + environment.add_options() + ("config", po::value(&config_file)->default_value("~/.wsperf"), + "Configuration file to use.") + ;*/ + + po::options_description cmdline_options; + cmdline_options.add(generic).add(config); + + po::options_description config_file_options; + config_file_options.add(config); + + po::options_description visible("Allowed options"); + visible.add(generic).add(config); + + + po::variables_map vm; + po::store(po::parse_command_line(argc, argv, cmdline_options), vm); + //po::store(po::parse_environment(environment,env_mapper), vm); + po::notify(vm); + + std::ifstream ifs(config_file.c_str()); + if (ifs) { + store(parse_config_file(ifs, config_file_options), vm); + notify(vm); } - if (argc == 3) { - std::stringstream buffer(argv[2]); - buffer >> num_threads; - } - - wsperf::request_coordinator rc; - - server::handler::ptr h; - if (num_threads == 0) { - std::cout << "bad thread number" << std::endl; + if (vm.count("help")) { + std::cout << cmdline_options << std::endl; return 1; + } + + if (vm.count("version")) { + std::cout << user_agent << std::endl; + return 1; + } + + if (vm.count("server") && vm["server"].as() == 1) { + return start_server(vm); + } else if (vm.count("client") && vm["client"].as() == 1) { + return start_client(vm); } else { - h = server::handler::ptr(new wsperf::concurrent_server_handler(rc)); + std::cout << cmdline_options << std::endl; + return 1; } - - server echo_endpoint(h); - - echo_endpoint.alog().unset_level(websocketpp::log::alevel::ALL); - echo_endpoint.elog().unset_level(websocketpp::log::elevel::ALL); - - echo_endpoint.elog().set_level(websocketpp::log::elevel::RERROR); - echo_endpoint.elog().set_level(websocketpp::log::elevel::FATAL); - - for (int i = 0; i < num_threads; i++) { - threads.push_back(boost::shared_ptr(new boost::thread(boost::bind(&wsperf::process_requests, &rc)))); - } - - std::cout << "Starting wsperf server on port " << port << " with " << num_threads << " processing threads." << std::endl; - echo_endpoint.listen(port); } catch (std::exception& e) { std::cerr << "Exception: " << e.what() << std::endl; } diff --git a/websocketpp.xcodeproj/project.pbxproj b/websocketpp.xcodeproj/project.pbxproj index c2c49f7059..9ae1d7fe98 100644 --- a/websocketpp.xcodeproj/project.pbxproj +++ b/websocketpp.xcodeproj/project.pbxproj @@ -373,6 +373,8 @@ B6E56D7E150644A3007E1707 /* request.cpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.cpp; name = request.cpp; path = examples/wsperf/request.cpp; sourceTree = ""; }; B6E56D7F150644A3007E1707 /* request.hpp */ = {isa = PBXFileReference; fileEncoding = 4; lastKnownFileType = sourcecode.cpp.h; name = request.hpp; path = examples/wsperf/request.hpp; sourceTree = ""; }; B6E56D821508EACA007E1707 /* SConstruct */ = {isa = PBXFileReference; lastKnownFileType = text; path = SConstruct; sourceTree = ""; }; + B6E56D84150B887A007E1707 /* SConscript */ = {isa = PBXFileReference; lastKnownFileType = text; name = SConscript; path = examples/wsperf/SConscript; sourceTree = ""; }; + B6E56D85150B9021007E1707 /* wsperf.cfg */ = {isa = PBXFileReference; lastKnownFileType = text; name = wsperf.cfg; path = examples/wsperf/wsperf.cfg; sourceTree = ""; }; B6E7E7731505532E00394909 /* wsperf */ = {isa = PBXFileReference; explicitFileType = "compiled.mach-o.executable"; includeInIndex = 0; path = wsperf; sourceTree = BUILT_PRODUCTS_DIR; }; B6E7E78A150553D000394909 /* libboost_chrono.dylib */ = {isa = PBXFileReference; lastKnownFileType = "compiled.mach-o.dylib"; name = libboost_chrono.dylib; path = usr/local/lib/libboost_chrono.dylib; sourceTree = SDKROOT; }; B6FE8CE2144DE17F00B32547 /* readme.txt */ = {isa = PBXFileReference; lastKnownFileType = text; path = readme.txt; sourceTree = ""; }; @@ -901,7 +903,9 @@ B6E56D6414FEFC43007E1707 /* wsperf */ = { isa = PBXGroup; children = ( + B6E56D84150B887A007E1707 /* SConscript */, B6E56D6614FEFC54007E1707 /* Makefile */, + B6E56D85150B9021007E1707 /* wsperf.cfg */, B6E56D6714FEFC54007E1707 /* wsperf_commander.html */, B6E56D6814FEFC54007E1707 /* wsperf.cpp */, B6E56D7E150644A3007E1707 /* request.cpp */,