message processor performance improvements

This commit is contained in:
Peter Thorson
2012-04-10 08:25:46 -05:00
parent 8129787625
commit 9bc3220b5e
5 changed files with 174 additions and 103 deletions

View File

@@ -34,6 +34,9 @@
#include "../processors/processor.hpp"
#include "../websocket_frame.hpp"
#include "../utf8_validator/utf8_validator.hpp"
#include "../processors/hybi_util.hpp"
using websocketpp::processor::hybi_util::circshift_prepared_key;
namespace websocketpp {
namespace message {
@@ -52,36 +55,31 @@ public:
return m_payload;
}
uint64_t process_payload(std::istream& input,uint64_t size) {
char c;
const uint64_t new_size = m_payload.size() + size;
uint64_t i;
void process_payload(char *input,uint64_t size) {
const size_t new_size = m_payload.size() + size;
if (new_size > PAYLOAD_SIZE_MAX) {
throw processor::exception("Message payload was too large.",processor::error::MESSAGE_TOO_BIG);
}
i = 0;
while(input.good() && i < size) {
c = input.get();
if (m_masked) {
// this retrieves ceiling of size / word size
size_t n = (size + sizeof(size_t) - 1) / sizeof(size_t);
if (!input.fail()) {
if (m_masking_index >= 0) {
c = c ^ m_masking_key.c[(m_masking_index++)%4];
}
m_payload.push_back(c);
i++;
// reinterpret the input as an array of word sized integers
size_t* data = reinterpret_cast<size_t*>(input);
// unmask working buffer
for (int i = 0; i < n; i++) {
data[i] ^= m_prepared_key;
}
if (input.bad()) {
throw processor::exception("istream read error 2",
processor::error::FATAL_ERROR);
}
// circshift working key
m_prepared_key = circshift_prepared_key(m_prepared_key, size%4);
}
// successfully read all bytes
return i;
// copy working buffer into
m_payload.append(input, size);
}
void complete() {
@@ -133,9 +131,9 @@ public:
}
void set_masking_key(int32_t key) {
//*reinterpret_cast<int32_t*>(m_masking_key) = key;
m_masking_key.i = key;
m_masking_index = (key == 0 ? -1 : 0);
m_prepared_key = processor::hybi_util::prepare_masking_key(m_masking_key);
m_masked = true;
}
private:
uint16_t get_raw_close_code() const {
@@ -154,6 +152,8 @@ private:
static const uint64_t PAYLOAD_SIZE_INIT = 128; // 128B
static const uint64_t PAYLOAD_SIZE_MAX = 128; // 128B
typedef websocketpp::processor::hybi_util::masking_key_type masking_key_type;
union masking_key {
int32_t i;
char c[4];
@@ -166,9 +166,9 @@ private:
utf8_validator::validator m_validator;
// Masking state
masking_key m_masking_key;
//unsigned char m_masking_key[4];
int m_masking_index;
masking_key_type m_masking_key;
bool m_masked;
size_t m_prepared_key;
// Message payload
std::string m_payload;

View File

@@ -31,6 +31,7 @@
#include "../processors/hybi_header.hpp"
using websocketpp::message::data;
using websocketpp::processor::hybi_util::circshift_prepared_key;
data::data(data::pool_ptr p, size_t s) : m_prepared(false),m_index(s),m_ref_count(0),m_pool(p),m_live(false) {
m_payload.reserve(PAYLOAD_SIZE_INIT);
@@ -47,59 +48,63 @@ const std::string& data::get_header() const {
return m_header;
}
uint64_t data::process_payload(std::istream& input,uint64_t size) {
unsigned char c;
const uint64_t new_size = m_payload.size() + size;
uint64_t i;
// input must be a buffer with size divisible by the machine word_size and at
// least ceil(size/word_size)*word_size bytes long.
void data::process_payload(char *input, size_t size) {
//std::cout << "data message processing: " << size << " bytes" << std::endl;
const size_t new_size = m_payload.size() + size;
if (new_size > PAYLOAD_SIZE_MAX) {
throw processor::exception("Message too big",processor::error::MESSAGE_TOO_BIG);
}
if (new_size > m_payload.capacity()) {
m_payload.reserve(std::max<size_t>(
static_cast<size_t>(new_size), static_cast<size_t>(2*m_payload.capacity())
));
m_payload.reserve(std::max(new_size, 2*m_payload.capacity()));
}
i = 0;
while(input.good() && i < size) {
c = input.get();
if (m_masked) {
//std::cout << "message is masked" << std::endl;
if (!input.fail()) {
process_character(c);
i++;
//std::cout << "before: " << zsutil::to_hex(input, size) << std::endl;
// this retrieves ceiling of size / word size
size_t n = (size + sizeof(size_t) - 1) / sizeof(size_t);
// reinterpret the input as an array of word sized integers
size_t* data = reinterpret_cast<size_t*>(input);
// unmask working buffer
for (int i = 0; i < n; i++) {
data[i] ^= m_prepared_key;
}
if (input.bad()) {
throw processor::exception("istream read error 2",
processor::error::FATAL_ERROR);
//std::cout << "after: " << zsutil::to_hex(input, size) << std::endl;
// circshift working key
//std::cout << "circshift by : " << size%4 << " bytes " << zsutil::to_hex(reinterpret_cast<char*>(&m_prepared_key),sizeof(size_t));
m_prepared_key = circshift_prepared_key(m_prepared_key, size%4);
//std::cout << " to " << zsutil::to_hex(reinterpret_cast<char*>(&m_prepared_key),sizeof(size_t)) << std::endl;
}
if (m_opcode == frame::opcode::TEXT) {
if (!m_validator.decode(input, input+size)) {
throw processor::exception("Invalid UTF8 data",
processor::error::PAYLOAD_VIOLATION);
}
}
// successfully read all bytes
return i;
}
void data::process_character(unsigned char c) {
if (m_masking_index >= 0) {
c = c ^ m_masking_key.c[m_masking_index];
m_masking_index = index_value((m_masking_index+1)%4);
}
// copy working buffer into
//std::cout << "before: " << m_payload.size() << std::endl;
if (m_opcode == frame::opcode::TEXT &&
!m_validator.consume(static_cast<uint32_t>((unsigned char)(c))))
{
throw processor::exception("Invalid UTF8 data",processor::error::PAYLOAD_VIOLATION);
}
m_payload.append(input, size);
// add c to payload
m_payload.push_back(c);
//std::cout << "after: " << m_payload.size() << std::endl;
}
void data::reset(websocketpp::frame::opcode::value opcode) {
m_opcode = opcode;
m_masking_index = M_NOT_MASKED;
m_masked = false;
m_payload.clear();
m_validator.reset();
m_prepared = false;
@@ -108,10 +113,10 @@ void data::reset(websocketpp::frame::opcode::value opcode) {
void data::complete() {
if (m_opcode == frame::opcode::TEXT) {
if (!m_validator.complete()) {
throw processor::exception("Invalid UTF8 data",processor::error::PAYLOAD_VIOLATION);
throw processor::exception("Invalid UTF8 data",
processor::error::PAYLOAD_VIOLATION);
}
}
}
void data::validate_payload() {
@@ -128,7 +133,8 @@ void data::validate_payload() {
void data::set_masking_key(int32_t key) {
m_masking_key.i = key;
m_masking_index = (key == 0 ? M_MASK_KEY_ZERO : M_BYTE_0);
m_prepared_key = processor::hybi_util::prepare_masking_key(m_masking_key);
m_masked = true;
}
void data::set_prepared(bool b) {
@@ -150,7 +156,7 @@ void data::append_payload(const std::string& payload) {
m_payload.append(payload);
}
void data::mask() {
if (m_masking_index >= 0) {
if (m_masked && m_payload.size() > 0) {
// By default WebSocket++ performs block masking/unmasking in a mannor that makes
// some assumptions about the nature of the machine and STL library used. In
// particular the assumption is either a 32 or 64 bit word size and an STL with
@@ -162,9 +168,20 @@ void data::mask() {
// To disable this optimization (for use with alternative STL implementations or
// processors) define WEBSOCKETPP_STRICT_MASKING when compiling the library. This
// will force the library to perform masking in single byte chunks.
#define WEBSOCKETPP_STRICT_MASKING
//#define WEBSOCKETPP_STRICT_MASKING
#ifndef WEBSOCKETPP_STRICT_MASKING
#ifdef WEBSOCKETPP_STRICT_MASKING
size_t len = m_payload.size();
for (size_t i = 0; i < len; i++) {
m_payload[i] ^= m_masking_key.c[i%4];
}
#else
// This should trigger a write to the string in case the STL
// implimentation is copy-on-write and hasn't been written to yet.
// Performing the masking will always require a copy of the string
// in this case to hold the masked version.
m_payload[0] = m_payload[0];
size_t size = m_payload.size()/sizeof(size_t);
size_t key = m_masking_key.i;
if (sizeof(size_t) == 8) {
@@ -178,15 +195,6 @@ void data::mask() {
for (size_t i = size*sizeof(size_t); i < m_payload.size(); i++) {
m_payload[i] ^= m_masking_key.c[i%4];
}
#else
size_t len = m_payload.size();
for (size_t i = 0; i < len; i++) {
m_payload[i] ^= m_masking_key.c[i%4];
}
/*for (std::string::iterator it = m_payload.begin(); it != m_payload.end(); it++) {
(*it) = *it ^ m_masking_key.c[m_masking_index];
m_masking_index = index_value((m_masking_index+1)&3);
}*/
#endif
}
}

View File

@@ -30,11 +30,13 @@
#include "../common.hpp"
#include "../utf8_validator/utf8_validator.hpp"
#include "../processors/hybi_util.hpp"
#include <boost/detail/atomic_count.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/thread/mutex.hpp>
#include <boost/utility.hpp>
#include <algorithm>
@@ -79,6 +81,8 @@ public:
* pointer.
*/
element_ptr get() {
boost::lock_guard<boost::mutex> lock(m_lock);
element_ptr p;
/*std::cout << "message requested ("
@@ -110,6 +114,8 @@ public:
return p;
}
void recycle(element_ptr p) {
boost::lock_guard<boost::mutex> lock(m_lock);
if (p->get_index()+1 > m_used.size() || m_used[p->get_index()] != p) {
//std::cout << "error tried to recycle a pointer we don't control" << std::endl;
// error tried to recycle a pointer we don't control
@@ -133,6 +139,7 @@ public:
// set a function that will be called when new elements are avaliable.
void set_callback(callback_type fn) {
boost::lock_guard<boost::mutex> lock(m_lock);
m_callback = fn;
}
@@ -144,6 +151,8 @@ private:
std::vector<element_ptr> m_used;
callback_type m_callback;
boost::mutex m_lock;
};
class data {
@@ -168,8 +177,8 @@ public:
// validation. Returns number of bytes read.
// throws a processor::exception if the message is too big, there is a fatal
// istream read error, or invalid UTF8 data is read for a text message
uint64_t process_payload(std::istream& input,uint64_t size);
void process_character(unsigned char c);
//uint64_t process_payload(std::istream& input,uint64_t size);
void process_payload(char * input, size_t size);
void complete();
void validate_payload();
@@ -185,7 +194,11 @@ public:
void set_prepared(bool b);
bool get_prepared() const;
void mask();
int32_t get_masking_key() const {
return m_masking_key.i;
}
// pool management interface
void set_live();
size_t get_index() const;
@@ -193,34 +206,24 @@ private:
static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB
static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB
enum index_value {
M_MASK_KEY_ZERO = -2,
M_NOT_MASKED = -1,
M_BYTE_0 = 0,
M_BYTE_1 = 1,
M_BYTE_2 = 2,
M_BYTE_3 = 3
};
union masking_key {
int32_t i;
char c[4];
};
typedef websocketpp::processor::hybi_util::masking_key_type masking_key_type;
friend void intrusive_ptr_add_ref(const data * s) {
++s->m_ref_count;
}
friend void intrusive_ptr_release(const data * s) {
boost::unique_lock<boost::mutex> lock(s->m_lock);
// TODO: thread safety
long count = --s->m_ref_count;
if (count == 1 && s->m_live) {
// recycle if endpoint exists
s->m_live = false;
pool_ptr pp = s->m_pool.lock();
if (pp) {
lock.unlock();
pp->recycle(ptr(const_cast<data *>(s)));
}
@@ -237,8 +240,9 @@ private:
utf8_validator::validator m_validator;
// Masking state
masking_key m_masking_key;
index_value m_masking_index;
masking_key_type m_masking_key;
bool m_masked;
size_t m_prepared_key;
std::string m_header;
std::string m_payload;
@@ -250,6 +254,7 @@ private:
mutable boost::detail::atomic_count m_ref_count;
mutable pool_weak_ptr m_pool;
mutable bool m_live;
mutable boost::mutex m_lock;
};
typedef boost::intrusive_ptr<data> data_ptr;

View File

@@ -44,7 +44,8 @@ namespace hybi_state {
enum value {
READ_HEADER = 0,
READ_PAYLOAD = 1,
READY = 2
READY = 2,
IGNORE = 3
};
}
@@ -255,6 +256,15 @@ public:
break;
case hybi_state::READY:
// shouldn't be here..
break;
case hybi_state::IGNORE:
s.ignore(m_payload_left);
m_payload_left -= s.gcount();
if (m_payload_left == 0) {
reset();
}
break;
default:
break;
@@ -268,6 +278,7 @@ public:
// processor for a new message.
if (m_header.ready()) {
m_header.reset();
ignore();
}
}
@@ -276,6 +287,13 @@ public:
}
}
// Sends the processor an inturrupt signal instructing it to ignore the next
// num bytes and then reset itself. This is used to flush a bad frame out of
// the read buffer.
void ignore() {
m_state = hybi_state::IGNORE;
}
void process_header(std::istream& s) {
m_header.consume(s);
@@ -344,17 +362,33 @@ public:
void process_payload(std::istream& input) {
//std::cout << "payload left 1: " << m_payload_left << std::endl;
uint64_t written;
size_t num;
// read bytes into processor buffer. Read the lesser of the buffer size
// and the number of bytes left in the payload.
input.read(m_payload_buffer, std::min(m_payload_left, PAYLOAD_BUFFER_SIZE));
num = input.gcount();
if (input.bad()) {
throw processor::exception("istream readsome error",
processor::error::FATAL_ERROR);
}
if (num == 0) {
return;
}
m_payload_left -= num;
// tell the appropriate message to process the bytes.
if (m_header.is_control()) {
written = m_control_message->process_payload(input,m_payload_left);
m_control_message->process_payload(m_payload_buffer,num);
} else {
//m_connection.alog().at(log::alevel::DEVEL) << "process_payload. Size: " << m_payload_left << log::endl;
written = m_data_message->process_payload(input,m_payload_left);
m_data_message->process_payload(m_payload_buffer,num);
}
m_payload_left -= written;
//std::cout << "payload left 2: " << m_payload_left << std::endl;
if (m_payload_left == 0) {
process_frame();
}
@@ -406,6 +440,7 @@ public:
case hybi_state::READ_HEADER:
return m_header.get_bytes_needed();
case hybi_state::READ_PAYLOAD:
case hybi_state::IGNORE:
return m_payload_left;
case hybi_state::READY:
return 0;
@@ -561,6 +596,10 @@ public:
prepare_frame(msg);
}
private:
// must be divisible by 8 (some things are hardcoded for 4 and 8 byte word
// sizes
static const size_t PAYLOAD_BUFFER_SIZE = 512;
connection_type& m_connection;
int m_state;
@@ -568,12 +607,16 @@ private:
message::control_ptr m_control_message;
hybi_header m_header;
hybi_header m_write_header;
uint64_t m_payload_left;
size_t m_payload_left;
char m_payload_buffer[PAYLOAD_BUFFER_SIZE];
frame::parser<connection_type> m_write_frame; // TODO: refactor this out
};
template <class connection_type>
const size_t hybi<connection_type>::PAYLOAD_BUFFER_SIZE;
} // namespace processor
} // namespace websocketpp

View File

@@ -193,7 +193,16 @@ public:
m_state = hybi_legacy_state::DONE;
} else {
if (m_data_message) {
m_data_message->process_payload(input,1);
size_t num;
num = input.readsome(m_payload_buffer, PAYLOAD_BUFFER_SIZE);
if (input.bad()) {
throw processor::exception("istream readsome error",
processor::error::FATAL_ERROR);
}
m_data_message->process_payload(m_payload_buffer,num);
}
}
}
@@ -324,12 +333,18 @@ private:
}
}
// must be divisible by 8 (some things are hardcoded for 4 and 8 byte word
// sizes
static const size_t PAYLOAD_BUFFER_SIZE = 512;
connection_type& m_connection;
hybi_legacy_state::value m_state;
message::data_ptr m_data_message;
std::string m_key3;
char m_payload_buffer[PAYLOAD_BUFFER_SIZE];
};
} // processor