From 9bc3220b5e1ddbb7639105f8f3ee71da44332580 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Tue, 10 Apr 2012 08:25:46 -0500 Subject: [PATCH] message processor performance improvements --- src/messages/control.hpp | 52 ++++++++--------- src/messages/data.cpp | 104 ++++++++++++++++++--------------- src/messages/data.hpp | 43 ++++++++------ src/processors/hybi.hpp | 61 ++++++++++++++++--- src/processors/hybi_legacy.hpp | 17 +++++- 5 files changed, 174 insertions(+), 103 deletions(-) diff --git a/src/messages/control.hpp b/src/messages/control.hpp index 6dd6ac72b5..f3a9975c6c 100644 --- a/src/messages/control.hpp +++ b/src/messages/control.hpp @@ -34,6 +34,9 @@ #include "../processors/processor.hpp" #include "../websocket_frame.hpp" #include "../utf8_validator/utf8_validator.hpp" +#include "../processors/hybi_util.hpp" + +using websocketpp::processor::hybi_util::circshift_prepared_key; namespace websocketpp { namespace message { @@ -52,36 +55,31 @@ public: return m_payload; } - uint64_t process_payload(std::istream& input,uint64_t size) { - char c; - const uint64_t new_size = m_payload.size() + size; - uint64_t i; + void process_payload(char *input,uint64_t size) { + const size_t new_size = m_payload.size() + size; if (new_size > PAYLOAD_SIZE_MAX) { throw processor::exception("Message payload was too large.",processor::error::MESSAGE_TOO_BIG); } - i = 0; - while(input.good() && i < size) { - c = input.get(); + if (m_masked) { + // this retrieves ceiling of size / word size + size_t n = (size + sizeof(size_t) - 1) / sizeof(size_t); - if (!input.fail()) { - if (m_masking_index >= 0) { - c = c ^ m_masking_key.c[(m_masking_index++)%4]; - } - - m_payload.push_back(c); - i++; + // reinterpret the input as an array of word sized integers + size_t* data = reinterpret_cast(input); + + // unmask working buffer + for (int i = 0; i < n; i++) { + data[i] ^= m_prepared_key; } - if (input.bad()) { - throw processor::exception("istream read error 2", - processor::error::FATAL_ERROR); - } + // circshift working key + m_prepared_key = circshift_prepared_key(m_prepared_key, size%4); } - - // successfully read all bytes - return i; + + // copy working buffer into + m_payload.append(input, size); } void complete() { @@ -133,9 +131,9 @@ public: } void set_masking_key(int32_t key) { - //*reinterpret_cast(m_masking_key) = key; m_masking_key.i = key; - m_masking_index = (key == 0 ? -1 : 0); + m_prepared_key = processor::hybi_util::prepare_masking_key(m_masking_key); + m_masked = true; } private: uint16_t get_raw_close_code() const { @@ -154,6 +152,8 @@ private: static const uint64_t PAYLOAD_SIZE_INIT = 128; // 128B static const uint64_t PAYLOAD_SIZE_MAX = 128; // 128B + typedef websocketpp::processor::hybi_util::masking_key_type masking_key_type; + union masking_key { int32_t i; char c[4]; @@ -166,9 +166,9 @@ private: utf8_validator::validator m_validator; // Masking state - masking_key m_masking_key; - //unsigned char m_masking_key[4]; - int m_masking_index; + masking_key_type m_masking_key; + bool m_masked; + size_t m_prepared_key; // Message payload std::string m_payload; diff --git a/src/messages/data.cpp b/src/messages/data.cpp index c9e0be86a0..f408b1a06e 100644 --- a/src/messages/data.cpp +++ b/src/messages/data.cpp @@ -31,6 +31,7 @@ #include "../processors/hybi_header.hpp" using websocketpp::message::data; +using websocketpp::processor::hybi_util::circshift_prepared_key; data::data(data::pool_ptr p, size_t s) : m_prepared(false),m_index(s),m_ref_count(0),m_pool(p),m_live(false) { m_payload.reserve(PAYLOAD_SIZE_INIT); @@ -47,59 +48,63 @@ const std::string& data::get_header() const { return m_header; } -uint64_t data::process_payload(std::istream& input,uint64_t size) { - unsigned char c; - const uint64_t new_size = m_payload.size() + size; - uint64_t i; +// input must be a buffer with size divisible by the machine word_size and at +// least ceil(size/word_size)*word_size bytes long. +void data::process_payload(char *input, size_t size) { + //std::cout << "data message processing: " << size << " bytes" << std::endl; + + const size_t new_size = m_payload.size() + size; if (new_size > PAYLOAD_SIZE_MAX) { throw processor::exception("Message too big",processor::error::MESSAGE_TOO_BIG); } if (new_size > m_payload.capacity()) { - m_payload.reserve(std::max( - static_cast(new_size), static_cast(2*m_payload.capacity()) - )); + m_payload.reserve(std::max(new_size, 2*m_payload.capacity())); } - i = 0; - while(input.good() && i < size) { - c = input.get(); + if (m_masked) { + //std::cout << "message is masked" << std::endl; - if (!input.fail()) { - process_character(c); - i++; + //std::cout << "before: " << zsutil::to_hex(input, size) << std::endl; + + // this retrieves ceiling of size / word size + size_t n = (size + sizeof(size_t) - 1) / sizeof(size_t); + + // reinterpret the input as an array of word sized integers + size_t* data = reinterpret_cast(input); + + // unmask working buffer + for (int i = 0; i < n; i++) { + data[i] ^= m_prepared_key; } - if (input.bad()) { - throw processor::exception("istream read error 2", - processor::error::FATAL_ERROR); + //std::cout << "after: " << zsutil::to_hex(input, size) << std::endl; + + // circshift working key + //std::cout << "circshift by : " << size%4 << " bytes " << zsutil::to_hex(reinterpret_cast(&m_prepared_key),sizeof(size_t)); + m_prepared_key = circshift_prepared_key(m_prepared_key, size%4); + //std::cout << " to " << zsutil::to_hex(reinterpret_cast(&m_prepared_key),sizeof(size_t)) << std::endl; + } + + if (m_opcode == frame::opcode::TEXT) { + if (!m_validator.decode(input, input+size)) { + throw processor::exception("Invalid UTF8 data", + processor::error::PAYLOAD_VIOLATION); } } - // successfully read all bytes - return i; -} - -void data::process_character(unsigned char c) { - if (m_masking_index >= 0) { - c = c ^ m_masking_key.c[m_masking_index]; - m_masking_index = index_value((m_masking_index+1)%4); - } + // copy working buffer into + //std::cout << "before: " << m_payload.size() << std::endl; - if (m_opcode == frame::opcode::TEXT && - !m_validator.consume(static_cast((unsigned char)(c)))) - { - throw processor::exception("Invalid UTF8 data",processor::error::PAYLOAD_VIOLATION); - } + m_payload.append(input, size); - // add c to payload - m_payload.push_back(c); + //std::cout << "after: " << m_payload.size() << std::endl; } void data::reset(websocketpp::frame::opcode::value opcode) { m_opcode = opcode; - m_masking_index = M_NOT_MASKED; + m_masked = false; m_payload.clear(); m_validator.reset(); m_prepared = false; @@ -108,10 +113,10 @@ void data::reset(websocketpp::frame::opcode::value opcode) { void data::complete() { if (m_opcode == frame::opcode::TEXT) { if (!m_validator.complete()) { - throw processor::exception("Invalid UTF8 data",processor::error::PAYLOAD_VIOLATION); + throw processor::exception("Invalid UTF8 data", + processor::error::PAYLOAD_VIOLATION); } } - } void data::validate_payload() { @@ -128,7 +133,8 @@ void data::validate_payload() { void data::set_masking_key(int32_t key) { m_masking_key.i = key; - m_masking_index = (key == 0 ? M_MASK_KEY_ZERO : M_BYTE_0); + m_prepared_key = processor::hybi_util::prepare_masking_key(m_masking_key); + m_masked = true; } void data::set_prepared(bool b) { @@ -150,7 +156,7 @@ void data::append_payload(const std::string& payload) { m_payload.append(payload); } void data::mask() { - if (m_masking_index >= 0) { + if (m_masked && m_payload.size() > 0) { // By default WebSocket++ performs block masking/unmasking in a mannor that makes // some assumptions about the nature of the machine and STL library used. In // particular the assumption is either a 32 or 64 bit word size and an STL with @@ -162,9 +168,20 @@ void data::mask() { // To disable this optimization (for use with alternative STL implementations or // processors) define WEBSOCKETPP_STRICT_MASKING when compiling the library. This // will force the library to perform masking in single byte chunks. - #define WEBSOCKETPP_STRICT_MASKING + //#define WEBSOCKETPP_STRICT_MASKING - #ifndef WEBSOCKETPP_STRICT_MASKING + #ifdef WEBSOCKETPP_STRICT_MASKING + size_t len = m_payload.size(); + for (size_t i = 0; i < len; i++) { + m_payload[i] ^= m_masking_key.c[i%4]; + } + #else + // This should trigger a write to the string in case the STL + // implimentation is copy-on-write and hasn't been written to yet. + // Performing the masking will always require a copy of the string + // in this case to hold the masked version. + m_payload[0] = m_payload[0]; + size_t size = m_payload.size()/sizeof(size_t); size_t key = m_masking_key.i; if (sizeof(size_t) == 8) { @@ -178,15 +195,6 @@ void data::mask() { for (size_t i = size*sizeof(size_t); i < m_payload.size(); i++) { m_payload[i] ^= m_masking_key.c[i%4]; } - #else - size_t len = m_payload.size(); - for (size_t i = 0; i < len; i++) { - m_payload[i] ^= m_masking_key.c[i%4]; - } - /*for (std::string::iterator it = m_payload.begin(); it != m_payload.end(); it++) { - (*it) = *it ^ m_masking_key.c[m_masking_index]; - m_masking_index = index_value((m_masking_index+1)&3); - }*/ #endif } } diff --git a/src/messages/data.hpp b/src/messages/data.hpp index 762aa7a05e..05949aec28 100644 --- a/src/messages/data.hpp +++ b/src/messages/data.hpp @@ -30,11 +30,13 @@ #include "../common.hpp" #include "../utf8_validator/utf8_validator.hpp" +#include "../processors/hybi_util.hpp" #include #include #include #include +#include #include #include @@ -79,6 +81,8 @@ public: * pointer. */ element_ptr get() { + boost::lock_guard lock(m_lock); + element_ptr p; /*std::cout << "message requested (" @@ -110,6 +114,8 @@ public: return p; } void recycle(element_ptr p) { + boost::lock_guard lock(m_lock); + if (p->get_index()+1 > m_used.size() || m_used[p->get_index()] != p) { //std::cout << "error tried to recycle a pointer we don't control" << std::endl; // error tried to recycle a pointer we don't control @@ -133,6 +139,7 @@ public: // set a function that will be called when new elements are avaliable. void set_callback(callback_type fn) { + boost::lock_guard lock(m_lock); m_callback = fn; } @@ -144,6 +151,8 @@ private: std::vector m_used; callback_type m_callback; + + boost::mutex m_lock; }; class data { @@ -168,8 +177,8 @@ public: // validation. Returns number of bytes read. // throws a processor::exception if the message is too big, there is a fatal // istream read error, or invalid UTF8 data is read for a text message - uint64_t process_payload(std::istream& input,uint64_t size); - void process_character(unsigned char c); + //uint64_t process_payload(std::istream& input,uint64_t size); + void process_payload(char * input, size_t size); void complete(); void validate_payload(); @@ -185,7 +194,11 @@ public: void set_prepared(bool b); bool get_prepared() const; void mask(); - + + int32_t get_masking_key() const { + return m_masking_key.i; + } + // pool management interface void set_live(); size_t get_index() const; @@ -193,34 +206,24 @@ private: static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB - enum index_value { - M_MASK_KEY_ZERO = -2, - M_NOT_MASKED = -1, - M_BYTE_0 = 0, - M_BYTE_1 = 1, - M_BYTE_2 = 2, - M_BYTE_3 = 3 - }; - - union masking_key { - int32_t i; - char c[4]; - }; + typedef websocketpp::processor::hybi_util::masking_key_type masking_key_type; friend void intrusive_ptr_add_ref(const data * s) { ++s->m_ref_count; } friend void intrusive_ptr_release(const data * s) { + boost::unique_lock lock(s->m_lock); + // TODO: thread safety long count = --s->m_ref_count; if (count == 1 && s->m_live) { // recycle if endpoint exists s->m_live = false; - pool_ptr pp = s->m_pool.lock(); if (pp) { + lock.unlock(); pp->recycle(ptr(const_cast(s))); } @@ -237,8 +240,9 @@ private: utf8_validator::validator m_validator; // Masking state - masking_key m_masking_key; - index_value m_masking_index; + masking_key_type m_masking_key; + bool m_masked; + size_t m_prepared_key; std::string m_header; std::string m_payload; @@ -250,6 +254,7 @@ private: mutable boost::detail::atomic_count m_ref_count; mutable pool_weak_ptr m_pool; mutable bool m_live; + mutable boost::mutex m_lock; }; typedef boost::intrusive_ptr data_ptr; diff --git a/src/processors/hybi.hpp b/src/processors/hybi.hpp index cee755d965..39702e2a6d 100644 --- a/src/processors/hybi.hpp +++ b/src/processors/hybi.hpp @@ -44,7 +44,8 @@ namespace hybi_state { enum value { READ_HEADER = 0, READ_PAYLOAD = 1, - READY = 2 + READY = 2, + IGNORE = 3 }; } @@ -255,6 +256,15 @@ public: break; case hybi_state::READY: // shouldn't be here.. + break; + case hybi_state::IGNORE: + s.ignore(m_payload_left); + m_payload_left -= s.gcount(); + + if (m_payload_left == 0) { + reset(); + } + break; default: break; @@ -268,6 +278,7 @@ public: // processor for a new message. if (m_header.ready()) { m_header.reset(); + ignore(); } } @@ -276,6 +287,13 @@ public: } } + // Sends the processor an inturrupt signal instructing it to ignore the next + // num bytes and then reset itself. This is used to flush a bad frame out of + // the read buffer. + void ignore() { + m_state = hybi_state::IGNORE; + } + void process_header(std::istream& s) { m_header.consume(s); @@ -344,17 +362,33 @@ public: void process_payload(std::istream& input) { //std::cout << "payload left 1: " << m_payload_left << std::endl; - uint64_t written; + size_t num; + + // read bytes into processor buffer. Read the lesser of the buffer size + // and the number of bytes left in the payload. + + input.read(m_payload_buffer, std::min(m_payload_left, PAYLOAD_BUFFER_SIZE)); + num = input.gcount(); + + if (input.bad()) { + throw processor::exception("istream readsome error", + processor::error::FATAL_ERROR); + } + + if (num == 0) { + return; + } + + m_payload_left -= num; + + // tell the appropriate message to process the bytes. if (m_header.is_control()) { - written = m_control_message->process_payload(input,m_payload_left); + m_control_message->process_payload(m_payload_buffer,num); } else { //m_connection.alog().at(log::alevel::DEVEL) << "process_payload. Size: " << m_payload_left << log::endl; - written = m_data_message->process_payload(input,m_payload_left); + m_data_message->process_payload(m_payload_buffer,num); } - m_payload_left -= written; - - //std::cout << "payload left 2: " << m_payload_left << std::endl; - + if (m_payload_left == 0) { process_frame(); } @@ -406,6 +440,7 @@ public: case hybi_state::READ_HEADER: return m_header.get_bytes_needed(); case hybi_state::READ_PAYLOAD: + case hybi_state::IGNORE: return m_payload_left; case hybi_state::READY: return 0; @@ -561,6 +596,10 @@ public: prepare_frame(msg); } private: + // must be divisible by 8 (some things are hardcoded for 4 and 8 byte word + // sizes + static const size_t PAYLOAD_BUFFER_SIZE = 512; + connection_type& m_connection; int m_state; @@ -568,12 +607,16 @@ private: message::control_ptr m_control_message; hybi_header m_header; hybi_header m_write_header; - uint64_t m_payload_left; + size_t m_payload_left; + char m_payload_buffer[PAYLOAD_BUFFER_SIZE]; frame::parser m_write_frame; // TODO: refactor this out }; +template +const size_t hybi::PAYLOAD_BUFFER_SIZE; + } // namespace processor } // namespace websocketpp diff --git a/src/processors/hybi_legacy.hpp b/src/processors/hybi_legacy.hpp index 0b175fdf15..bebe5809b5 100644 --- a/src/processors/hybi_legacy.hpp +++ b/src/processors/hybi_legacy.hpp @@ -193,7 +193,16 @@ public: m_state = hybi_legacy_state::DONE; } else { if (m_data_message) { - m_data_message->process_payload(input,1); + size_t num; + + num = input.readsome(m_payload_buffer, PAYLOAD_BUFFER_SIZE); + + if (input.bad()) { + throw processor::exception("istream readsome error", + processor::error::FATAL_ERROR); + } + + m_data_message->process_payload(m_payload_buffer,num); } } } @@ -324,12 +333,18 @@ private: } } + // must be divisible by 8 (some things are hardcoded for 4 and 8 byte word + // sizes + static const size_t PAYLOAD_BUFFER_SIZE = 512; + connection_type& m_connection; hybi_legacy_state::value m_state; message::data_ptr m_data_message; std::string m_key3; + + char m_payload_buffer[PAYLOAD_BUFFER_SIZE]; }; } // processor