From b2d698d3ca3b0212b20a26624a315fdc54944d1f Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Thu, 6 Mar 2014 11:59:38 -0600 Subject: [PATCH 01/10] include hdl in message actions This is not strictly necessary for the broadcast server example. As this example is commonly used as a starting point for new programs the lack of hdl on message action is a source of surprising behavior. --- examples/broadcast_server/broadcast_server.cpp | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/examples/broadcast_server/broadcast_server.cpp b/examples/broadcast_server/broadcast_server.cpp index 29c18b6aa0..bdb01fe444 100644 --- a/examples/broadcast_server/broadcast_server.cpp +++ b/examples/broadcast_server/broadcast_server.cpp @@ -34,7 +34,8 @@ enum action_type { struct action { action(action_type t, connection_hdl h) : type(t), hdl(h) {} - action(action_type t, server::message_ptr m) : type(t), msg(m) {} + action(action_type t, connection_hdl h, server::message_ptr m) + : type(t), hdl(h), msg(m) {} action_type type; websocketpp::connection_hdl hdl; @@ -92,7 +93,7 @@ public: // queue message up for sending by processing thread unique_lock lock(m_action_lock); //std::cout << "on_message" << std::endl; - m_actions.push(action(MESSAGE,msg)); + m_actions.push(action(MESSAGE,hdl,msg)); lock.unlock(); m_action_cond.notify_one(); } From edb26d7721bd8c8c95f5f29807f274fa5afd7c4e Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Thu, 6 Mar 2014 19:01:11 -0600 Subject: [PATCH 02/10] listen errors are now reported to the caller --- changelog.md | 6 ++--- websocketpp/transport/asio/endpoint.hpp | 31 ++++++++++++++++--------- 2 files changed, 23 insertions(+), 14 deletions(-) diff --git a/changelog.md b/changelog.md index 01b142554f..dd6e580ff2 100644 --- a/changelog.md +++ b/changelog.md @@ -12,7 +12,7 @@ HEAD will not read more data from their socket, allowing TCP flow control to work without blocking the main thread. - Feature: Adds the ability to specify whether or not to use the `SO_REUSEADDR` - TCP socket option. The default for this value has been changed from `true` to + TCP socket option. The default for this value has been changed from `true` to `false`. - Feature: Adds the ability to specify a maximum message size. - Feature: Adds `close::status::get_string(...)` method to look up a human @@ -30,8 +30,8 @@ HEAD short reads and quasi-expected socket shutdown related errors will no longer be reported as unclean WebSocket shutdowns to the application. Information about them will remain in the info error channel for debugging purposes. -- Improvement: `start_accept` errors are now reported to the caller either via - an exception or an ec parameter. +- Improvement: `start_accept` and `listen` errors are now reported to the caller + either via an exception or an ec parameter. - Bug: Fix some cases of calls to empty lib::function objects. - Bug: Fix memory leak of connection objects due to cached handlers holding on to reference counted pointers. #310 Thank you otaras for reporting. diff --git a/websocketpp/transport/asio/endpoint.hpp b/websocketpp/transport/asio/endpoint.hpp index 21bb610e57..a7bf8a36f9 100644 --- a/websocketpp/transport/asio/endpoint.hpp +++ b/websocketpp/transport/asio/endpoint.hpp @@ -123,7 +123,7 @@ public: : m_io_service(src.m_io_service) , m_external_io_service(src.m_external_io_service) , m_acceptor(src.m_acceptor) - , m_listen_backlog(0) + , m_listen_backlog(boost::asio::socket_base::max_connections) , m_reuse_addr(src.m_reuse_addr) , m_state(src.m_state) { @@ -145,7 +145,7 @@ public: rhs.m_io_service = NULL; rhs.m_external_io_service = false; rhs.m_acceptor = NULL; - rhs.m_listen_backlog = 0; + rhs.m_listen_backlog = boost::asio::socket_base::max_connections; rhs.m_state = UNINITIALIZED; } return *this; @@ -344,16 +344,25 @@ public: m_alog->write(log::alevel::devel,"asio::listen"); - m_acceptor->open(ep.protocol()); - m_acceptor->set_option(boost::asio::socket_base::reuse_address(m_reuse_addr)); - m_acceptor->bind(ep); - if (m_listen_backlog == 0) { - m_acceptor->listen(); - } else { - m_acceptor->listen(m_listen_backlog); + boost::system::error_code bec; + + m_acceptor->open(ep.protocol(),bec); + if (!bec) { + m_acceptor->set_option(boost::asio::socket_base::reuse_address(m_reuse_addr),bec); + } + if (!bec) { + m_acceptor->bind(ep,bec); + } + if (!bec) { + m_acceptor->listen(m_listen_backlog,bec); + } + if (bec) { + log_err(log::elevel::info,"asio listen",bec); + ec = make_error_code(error::pass_through); + } else { + m_state = LISTENING; + ec = lib::error_code(); } - m_state = LISTENING; - ec = lib::error_code(); } /// Set up endpoint for listening manually From 0f3a36a63053c24951071110f4e9cc6d2c3212cb Mon Sep 17 00:00:00 2001 From: Jeff Mitchell Date: Wed, 12 Mar 2014 10:46:33 -0400 Subject: [PATCH 03/10] Add define to allow disabling threading entirely, for when using an iostream-based client. --- websocketpp/config/core_client.hpp | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/websocketpp/config/core_client.hpp b/websocketpp/config/core_client.hpp index 51cfa85c79..1b2fae44aa 100644 --- a/websocketpp/config/core_client.hpp +++ b/websocketpp/config/core_client.hpp @@ -34,7 +34,11 @@ #include // Concurrency +#ifndef _WEBSOCKETPP_NO_THREADING_ #include +#else +#include +#endif // Transport #include @@ -68,7 +72,11 @@ struct core_client { typedef core_client type; // Concurrency policy +#ifndef _WEBSOCKETPP_NO_THREADING_ typedef websocketpp::concurrency::basic concurrency_type; +#else + typedef websocketpp::concurrency::none concurrency_type; +#endif // HTTP Parser Policies typedef http::parser::request request_type; From 77495d40765ecb0a60aa6bf66e2746653ca5ef46 Mon Sep 17 00:00:00 2001 From: Dominik Schmidt Date: Fri, 14 Mar 2014 16:54:18 +0100 Subject: [PATCH 04/10] Use _WIN32 instead of WIN32 preprocessor directive and fix warnings --- websocketpp/common/network.hpp | 2 +- websocketpp/common/platforms.hpp | 2 +- websocketpp/common/stdint.hpp | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/websocketpp/common/network.hpp b/websocketpp/common/network.hpp index 6a01f140cb..9ba8b538bc 100644 --- a/websocketpp/common/network.hpp +++ b/websocketpp/common/network.hpp @@ -29,7 +29,7 @@ #define WEBSOCKETPP_COMMON_NETWORK_HPP // For ntohs and htons -#if defined(WIN32) +#if defined(_WIN32) #include #else //#include diff --git a/websocketpp/common/platforms.hpp b/websocketpp/common/platforms.hpp index 4e031aa345..1196847b5a 100644 --- a/websocketpp/common/platforms.hpp +++ b/websocketpp/common/platforms.hpp @@ -33,7 +33,7 @@ * don't fit somewhere else better. */ -#if defined(WIN32) && !defined(NOMINMAX) +#if defined(_WIN32) && !defined(NOMINMAX) // don't define min and max macros that conflict with std::min and std::max #define NOMINMAX #endif diff --git a/websocketpp/common/stdint.hpp b/websocketpp/common/stdint.hpp index 233e36f5bc..a8ca1e3c52 100644 --- a/websocketpp/common/stdint.hpp +++ b/websocketpp/common/stdint.hpp @@ -32,7 +32,7 @@ #define __STDC_LIMIT_MACROS 1 #endif -#if WIN32 && (_MSC_VER < 1600) +#if defined (_WIN32) && defined (_MSC_VER) && (_MSC_VER < 1600) #include using boost::int8_t; From e154d955ecb6052c84b6994d48d563565900ab56 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 16 Mar 2014 09:26:51 -0500 Subject: [PATCH 05/10] Use __cplusplus header to enable C++11 language features --- websocketpp/common/cpp11.hpp | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/websocketpp/common/cpp11.hpp b/websocketpp/common/cpp11.hpp index 7c1ac43388..4b980efdd3 100644 --- a/websocketpp/common/cpp11.hpp +++ b/websocketpp/common/cpp11.hpp @@ -41,9 +41,11 @@ #endif -#ifdef _WEBSOCKETPP_CPP11_STL_ - // This flag indicates that all of the C++11 language features are available - // to us. +#if defined(_WEBSOCKETPP_CPP11_STL_) || __cplusplus >= 201103L + // _WEBSOCKETPP_CPP11_STL_ is a flag from the build system that forces + // WebSocket++ into C++11 mode. __cplusplus is a define set by the compiler + // if it has full support for C++11 language features. If either are set use + // C++11 language features #ifndef _WEBSOCKETPP_NOEXCEPT_TOKEN_ #define _WEBSOCKETPP_NOEXCEPT_TOKEN_ noexcept #endif From e2b7a4b9f1210ae1f8974f9f4d9eb81a70591612 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Sun, 16 Mar 2014 09:59:34 -0500 Subject: [PATCH 06/10] fix whitespace --- websocketpp/uri.hpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/websocketpp/uri.hpp b/websocketpp/uri.hpp index 07ed32ebf2..b0895d581b 100644 --- a/websocketpp/uri.hpp +++ b/websocketpp/uri.hpp @@ -290,10 +290,10 @@ public: * @return query portion of the URI. */ std::string get_query() const { - std::size_t found = m_resource.find('?'); - if (found != std::string::npos) { - return m_resource.substr(found + 1); - } else { + std::size_t found = m_resource.find('?'); + if (found != std::string::npos) { + return m_resource.substr(found + 1); + } else { return ""; } } From cd534ad18e062e39af0ce969d74aa0b39e384794 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 24 Mar 2014 08:04:23 -0500 Subject: [PATCH 07/10] implements batch sending rather than writing a single message per trip through the underlying transport, writes are batched and sent as a group. This drastically improves the system call / application code ratio and tcp packet utilization rates when sending lots of small messages --- websocketpp/connection.hpp | 6 +- websocketpp/impl/connection_impl.hpp | 99 +++++++++++++++++++--------- 2 files changed, 70 insertions(+), 35 deletions(-) diff --git a/websocketpp/connection.hpp b/websocketpp/connection.hpp index a3de8459ee..e2ebcd6ab8 100644 --- a/websocketpp/connection.hpp +++ b/websocketpp/connection.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Peter Thorson. All rights reserved. + * Copyright (c) 2014, Peter Thorson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -1451,9 +1451,9 @@ private: */ std::vector m_send_buffer; - /// a pointer to hold on to the current message being written to keep it + /// a list of pointers to hold on to the messages being written to keep them /// from going out of scope before the write is complete. - message_ptr m_current_msg; + std::vector m_current_msgs; /// True if there is currently an outstanding transport write /** diff --git a/websocketpp/impl/connection_impl.hpp b/websocketpp/impl/connection_impl.hpp index 41260c6192..a2b689762e 100644 --- a/websocketpp/impl/connection_impl.hpp +++ b/websocketpp/impl/connection_impl.hpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2013, Peter Thorson. All rights reserved. + * Copyright (c) 2014, Peter Thorson. All rights reserved. * * Redistribution and use in source and binary forms, with or without * modification, are permitted provided that the following conditions are met: @@ -1562,43 +1562,77 @@ void connection::write_frame() { return; } - // Get the next message in the queue. This will return an empty - // message if the queue was empty. - m_current_msg = write_pop(); - - if (!m_current_msg) { - return; + // pull off all the messages that are ready to write. + // stop if we get a message marked terminal + message_ptr next_message = write_pop(); + while (next_message) { + m_current_msgs.push_back(next_message); + if (!next_message->get_terminal()) { + next_message = write_pop(); + } else { + next_message = message_ptr(); + } + } + + if (m_current_msgs.empty()) { + // there was nothing to send + return; + } else { + // At this point we own the next messages to be sent and are + // responsible for holding the write flag until they are + // successfully sent or there is some error + m_write_flag = true; } - - // At this point we own the next message to be sent and are - // responsible for holding the write flag until it is successfully - // sent or there is some error - m_write_flag = true; } - std::string const & header = m_current_msg->get_header(); - std::string const & payload = m_current_msg->get_payload(); - - m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); - m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); + typename std::vector::iterator it; + for (it = m_current_msgs.begin(); it != m_current_msgs.end(); ++it) { + std::string const & header = (*it)->get_header(); + std::string const & payload = (*it)->get_payload(); + m_send_buffer.push_back(transport::buffer(header.c_str(),header.size())); + m_send_buffer.push_back(transport::buffer(payload.c_str(),payload.size())); + } + // Print detailed send stats if those log levels are enabled if (m_alog.static_test(log::alevel::frame_header)) { if (m_alog.dynamic_test(log::alevel::frame_header)) { - std::stringstream s; - s << "Dispatching write with " << header.size() - << " header bytes and " << payload.size() - << " payload bytes" << std::endl; - m_alog.write(log::alevel::frame_header,s.str()); - m_alog.write(log::alevel::frame_header,"Header: "+utility::to_hex(header)); - } - } - if (m_alog.static_test(log::alevel::frame_payload)) { - if (m_alog.dynamic_test(log::alevel::frame_payload)) { - m_alog.write(log::alevel::frame_payload,"Payload: "+utility::to_hex(payload)); - } - } + std::stringstream general,header,payload; + + general << "Dispatching write containing " << m_current_msgs.size() + <<" message(s) containing "; + header << "Header Bytes: \n"; + payload << "Payload Bytes: \n"; + + size_t hbytes = 0; + size_t pbytes = 0; + + for (size_t i = 0; i < m_current_msgs.size(); i++) { + hbytes += m_current_msgs[i]->get_header().size(); + pbytes += m_current_msgs[i]->get_payload().size(); + + header << "[" << i << "] (" + << m_current_msgs[i]->get_header().size() << ") " + << utility::to_hex(m_current_msgs[i]->get_header()) << "\n"; + + if (m_alog.static_test(log::alevel::frame_payload)) { + if (m_alog.dynamic_test(log::alevel::frame_payload)) { + payload << "[" << i << "] (" + << m_current_msgs[i]->get_payload().size() << ") " + << utility::to_hex(m_current_msgs[i]->get_payload()) + << "\n"; + } + } + } + + general << hbytes << " header bytes and " << pbytes << " payload bytes"; + + m_alog.write(log::alevel::frame_header,general.str()); + m_alog.write(log::alevel::frame_header,header.str()); + m_alog.write(log::alevel::frame_payload,payload.str()); + } + } transport_con_type::async_write( m_send_buffer, @@ -1613,10 +1647,11 @@ void connection::handle_write_frame(lib::error_code const & ec) m_alog.write(log::alevel::devel,"connection handle_write_frame"); } - bool terminal = m_current_msg->get_terminal(); + bool terminal = m_current_msgs.back()->get_terminal(); m_send_buffer.clear(); - m_current_msg.reset(); + m_current_msgs.clear(); + // TODO: recycle instead of deleting if (ec) { log_err(log::elevel::fatal,"handle_write_frame",ec); From 19a3713b298b6f7c3f5ff7de3212d445857f15b6 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Mon, 24 Mar 2014 08:07:40 -0500 Subject: [PATCH 08/10] use libc++ for non-c++11 branches on mac os x --- SConstruct | 1 + 1 file changed, 1 insertion(+) diff --git a/SConstruct b/SConstruct index fb8df135a0..5bd61c172f 100644 --- a/SConstruct +++ b/SConstruct @@ -146,6 +146,7 @@ if env_cpp11['CXX'].startswith('g++'): print "C++11 build environment is not supported on this version of G++" elif env_cpp11['CXX'].startswith('clang++'): print "C++11 build environment enabled" + env.Append(CXXFLANGS = ['-stdlib=libc++'],LINKFLAGS=['-stdlib=libc++']) env_cpp11.Append(WSPP_CPP11_ENABLED = "true",CXXFLAGS = ['-std=c++0x','-stdlib=libc++'],LINKFLAGS = ['-stdlib=libc++'],TOOLSET = ['clang++'],CPPDEFINES = ['_WEBSOCKETPP_CPP11_STL_']) # look for optional second boostroot compiled with clang's libc++ STL library From b358e2cfe8232893f6fd08c648481b7d16304967 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Tue, 25 Mar 2014 05:49:15 -0500 Subject: [PATCH 09/10] update changelog --- changelog.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/changelog.md b/changelog.md index dd6e580ff2..25d1014463 100644 --- a/changelog.md +++ b/changelog.md @@ -32,6 +32,8 @@ HEAD about them will remain in the info error channel for debugging purposes. - Improvement: `start_accept` and `listen` errors are now reported to the caller either via an exception or an ec parameter. +- Improvement: Outgoing writes are now batched for improved message throughput + and reduced system call and TCP frame overhead. - Bug: Fix some cases of calls to empty lib::function objects. - Bug: Fix memory leak of connection objects due to cached handlers holding on to reference counted pointers. #310 Thank you otaras for reporting. From 002d2d000788a49dea166663f6560e9fd59994ba Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Tue, 25 Mar 2014 06:15:35 -0500 Subject: [PATCH 10/10] force the offset to be calculated first to avoid confusing MSVC's checked iterators references #342 Thank you tmoers for reporting --- websocketpp/http/impl/request.hpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/websocketpp/http/impl/request.hpp b/websocketpp/http/impl/request.hpp index 19f91d853a..4557df4b75 100644 --- a/websocketpp/http/impl/request.hpp +++ b/websocketpp/http/impl/request.hpp @@ -127,7 +127,7 @@ inline size_t request::consume(const char *buf, size_t len) { } } - begin = end+sizeof(header_delimiter)-1; + begin = end+(sizeof(header_delimiter)-1); } }