From fa7cd63706404acbfe70651cdb713f2604150570 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Tue, 3 Jan 2012 06:22:42 -0600 Subject: [PATCH] work on write queue/flow control --- .../broadcast_admin_handler.hpp | 92 ++++++++------ .../broadcast_handler.hpp | 17 +-- .../broadcast_server_tls.cpp | 35 ++++-- src/connection.hpp | 114 +++++++++++------- src/endpoint.hpp | 69 +++++++++++ src/messages/data.cpp | 41 +++++-- src/messages/data.hpp | 11 +- src/processors/hybi.hpp | 8 ++ src/processors/hybi_header.cpp | 11 +- src/processors/hybi_legacy.hpp | 10 ++ src/processors/processor.hpp | 2 +- src/roles/server.hpp | 2 +- 12 files changed, 295 insertions(+), 117 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_admin_handler.hpp b/examples/broadcast_server_tls/broadcast_admin_handler.hpp index 1d75307b4a..d007092a49 100644 --- a/examples/broadcast_server_tls/broadcast_admin_handler.hpp +++ b/examples/broadcast_server_tls/broadcast_admin_handler.hpp @@ -102,7 +102,15 @@ public: } void command_error(connection_ptr connection,const std::string msg) { - connection->send("{\"type\":\"error\",\"value\":\""+msg+"\"}"); + websocketpp::message::data_ptr m = connection->get_data_message(); + + if (m) { + m->reset(frame::opcode::TEXT); + m->set_payload("{\"type\":\"error\",\"value\":\""+msg+"\"}"); + connection->send(m); + } else { + // error no avaliable message buffers + } } // close: - close this connection @@ -128,41 +136,53 @@ public: return; } - long milli_seconds = get_ms(m_epoch); - - std::stringstream update; - update << "{\"type\":\"stats\"" - << ",\"timestamp\":" << milli_seconds - << ",\"connections\":" << m_broadcast_handler->get_connection_count() - << ",\"admin_connections\":" << m_connections.size() - << ",\"messages\":["; - - const msg_map& m = m_broadcast_handler->get_message_stats(); - - msg_map::const_iterator msg_it; - msg_map::const_iterator last = m.end(); - if (m.size() > 0) { - last--; - } - - for (msg_it = m.begin(); msg_it != m.end(); msg_it++) { - update << "{\"id\":" << (*msg_it).second.id - << ",\"hash\":\"" << (*msg_it).second.hash << "\"" - << ",\"sent\":" << (*msg_it).second.sent - << ",\"acked\":" << (*msg_it).second.acked - << ",\"size\":" << (*msg_it).second.size - << ",\"time\":" << (*msg_it).second.time - << "}" << (msg_it == last ? "" : ","); - } - - update << "]}"; - - m_broadcast_handler->clear_message_stats(); - - typename std::set::iterator it; - - for (it = m_connections.begin(); it != m_connections.end(); it++) { - (*it)->send(update.str(),false); + if (m_connections.size() > 0) { + long milli_seconds = get_ms(m_epoch); + + std::stringstream update; + update << "{\"type\":\"stats\"" + << ",\"timestamp\":" << milli_seconds + << ",\"connections\":" << m_broadcast_handler->get_connection_count() + << ",\"admin_connections\":" << m_connections.size() + << ",\"messages\":["; + + const msg_map& m = m_broadcast_handler->get_message_stats(); + + msg_map::const_iterator msg_it; + msg_map::const_iterator last = m.end(); + if (m.size() > 0) { + last--; + } + + for (msg_it = m.begin(); msg_it != m.end(); msg_it++) { + update << "{\"id\":" << (*msg_it).second.id + << ",\"hash\":\"" << (*msg_it).second.hash << "\"" + << ",\"sent\":" << (*msg_it).second.sent + << ",\"acked\":" << (*msg_it).second.acked + << ",\"size\":" << (*msg_it).second.size + << ",\"time\":" << (*msg_it).second.time + << "}" << (msg_it == last ? "" : ","); + } + + update << "]}"; + + m_broadcast_handler->clear_message_stats(); + + typename std::set::iterator it; + + websocketpp::message::data_ptr msg = (*m_connections.begin())->get_data_message(); + + if (msg) { + msg->reset(frame::opcode::TEXT); + + msg->set_payload(update.str()); + + for (it = m_connections.begin(); it != m_connections.end(); it++) { + (*it)->send(msg); + } + } else { + // error no avaliable message buffers + } } m_timer->expires_from_now(boost::posix_time::milliseconds(250)); diff --git a/examples/broadcast_server_tls/broadcast_handler.hpp b/examples/broadcast_server_tls/broadcast_handler.hpp index 2cac50ed78..511e605819 100644 --- a/examples/broadcast_server_tls/broadcast_handler.hpp +++ b/examples/broadcast_server_tls/broadcast_handler.hpp @@ -90,17 +90,14 @@ public: } void on_message(connection_ptr connection,message::data_ptr msg) { - typename std::set::iterator it; - - wscmd::cmd command = wscmd::parse(msg->get_payload()); + wscmd::cmd command = wscmd::parse(msg->get_payload()); if (command.command == "ack") { handle_ack(connection,command); + connection->recycle(msg); } else { broadcast_message(msg); } - - connection->recycle(msg); } void command_error(connection_ptr connection,const std::string msg) { @@ -160,11 +157,15 @@ public: typename std::set::iterator it; - // broadcast to clients + // broadcast to clients for (it = m_connections.begin(); it != m_connections.end(); it++) { - (*it)->send(msg->get_payload(),(msg->get_opcode() == frame::opcode::BINARY)); + //(*it)->send(msg->get_payload(),(msg->get_opcode() == frame::opcode::BINARY)); + for (int i = 0; i < 100; i++) { + (*it)->send(msg); + } + } - new_msg.sent = m_connections.size(); + new_msg.sent = m_connections.size()*100; new_msg.acked = 0; } diff --git a/examples/broadcast_server_tls/broadcast_server_tls.cpp b/examples/broadcast_server_tls/broadcast_server_tls.cpp index 003c0c982e..4805b0c367 100644 --- a/examples/broadcast_server_tls/broadcast_server_tls.cpp +++ b/examples/broadcast_server_tls/broadcast_server_tls.cpp @@ -49,26 +49,37 @@ int main(int argc, char* argv[]) { bool tls = false; // 12288 is max OS X limit without changing kernal settings - const rlim_t ideal_size = 100000; + const rlim_t ideal_size = 10000; rlim_t old_size; + rlim_t old_max; struct rlimit rl; int result; result = getrlimit(RLIMIT_NOFILE, &rl); if (result == 0) { - std::cout << "cur: " << rl.rlim_cur << " max: " << rl.rlim_max << std::endl; + //std::cout << "System FD limits: " << rl.rlim_cur << " max: " << rl.rlim_max << std::endl; old_size = rl.rlim_cur; - + old_max = rl.rlim_max; + if (rl.rlim_cur < ideal_size) { - rl.rlim_cur = ideal_size; - //rl.rlim_cur = rl.rlim_max; - result = setrlimit(RLIMIT_NOFILE, &rl); + std::cout << "Attempting to raise system file descriptor limit from " << rl.rlim_cur << " to " << ideal_size << std::endl; + rl.rlim_cur = ideal_size; - if (result != 0) { - std::cout << "Unable to request an increase in the file descripter limit. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << std::endl; - } + if (rl.rlim_max < ideal_size) { + rl.rlim_max = ideal_size; + } + + result = setrlimit(RLIMIT_NOFILE, &rl); + + if (result == 0) { + std::cout << "Success" << std::endl; + } else if (result == EPERM) { + std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: Insufficient permissions. Try running process as root. system max: " << old_max << std::endl; + } else { + std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << " system max: " << old_max << std::endl; + } } } @@ -97,9 +108,11 @@ int main(int argc, char* argv[]) { plain_handler_ptr h(new websocketpp::broadcast::server_handler()); plain_endpoint_type e(h); - e.alog().set_level(websocketpp::log::alevel::ALL); - e.elog().set_level(websocketpp::log::elevel::ALL); + e.alog().unset_level(websocketpp::log::alevel::ALL); + e.elog().unset_level(websocketpp::log::elevel::ALL); + e.alog().set_level(websocketpp::log::alevel::DEVEL); + std::cout << "Starting WebSocket broadcast server on port " << port << std::endl; e.listen(port); } diff --git a/src/connection.hpp b/src/connection.hpp index 48f18382e7..c362969dd1 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -136,7 +136,7 @@ public: // Valid for OPEN state void send(const utf8_string& payload,bool binary = false) { - binary_string_ptr msg; + /*binary_string_ptr msg; if (binary) { msg = m_processor->prepare_frame(frame::opcode::BINARY,!m_endpoint.is_server(),payload); } else { @@ -154,23 +154,30 @@ public: boost::bind( &type::write_message, type::shared_from_this(), - msg)); + msg));*/ } void send(const binary_string& data) { - binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY, + /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY, !m_endpoint.is_server(),data)); m_endpoint.endpoint_base::m_io_service.post( boost::bind( &type::write_message, type::shared_from_this(), - msg)); + msg));*/ } + void send(message::data_ptr msg) { + m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand()); + write_message(msg); + } + + void close(close::status::value code, const utf8_string& reason = "") { // TODO: overloads without code or reason? send_close(code, reason); } void ping(const binary_string& payload) { - binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING, + // TODO: + /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING, !m_endpoint.is_server(), payload)); @@ -178,16 +185,17 @@ public: boost::bind( &type::write_message, type::shared_from_this(), - msg)); + msg)); */ } void pong(const binary_string& payload) { - binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG, + // TODO: + /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG, !m_endpoint.is_server(),payload)); m_endpoint.endpoint_base::m_io_service.post( boost::bind( &type::write_message, type::shared_from_this(), - msg)); + msg));*/ } uint64_t buffered_amount() const { @@ -219,19 +227,12 @@ public: // flow control interface message::data_ptr get_data_message() { - // if we have one of this type free - if (!m_read_queue_avaliable.empty()) { - message::data_ptr p = m_read_queue_avaliable.front(); - m_read_queue_avaliable.pop(); - m_read_queue_used.insert(p); - return p; - } else { - return message::data_ptr(); - } + return m_endpoint.get_data_message(); } void recycle(message::data_ptr p) { - if (m_read_queue_used.erase(p) == 0) { + m_endpoint.recycle(p); + /*if (m_read_queue_used.erase(p) == 0) { // tried to recycle a pointer we don't control. } else { m_read_queue_avaliable.push(p); @@ -246,7 +247,7 @@ public: ); m_read_state = READING; } - } + }*/ } message::control_ptr get_control_message() { @@ -290,7 +291,7 @@ protected: role_type::async_init(); } - +public: void handle_read_frame(const boost::system::error_code& error) { // check if state changed while we were waiting for a read. if (m_state == session::state::CLOSED) { return; } @@ -353,7 +354,8 @@ protected: // we need to wait for a message to be returned by the // client. We exit the read loop. handle_read_frame // will be restarted by recycle() - m_read_state = WAITING; + //m_read_state = WAITING; + m_endpoint.wait(type::shared_from_this()); return; default: // Fatal error, forcibly end connection immediately. @@ -384,7 +386,7 @@ protected: ); } } - +protected: void process_data(message::data_ptr msg) { get_handler()->on_message(type::shared_from_this(),msg); } @@ -396,10 +398,10 @@ protected: response = get_handler()->on_ping(type::shared_from_this(), msg->get_payload()); if (response) { - // send response ping - write_message(m_processor->prepare_frame(frame::opcode::PONG, - !m_endpoint.is_server(), - msg->get_payload())); + // TODO: send response ping + //write_message(m_processor->prepare_frame(frame::opcode::PONG, + // !m_endpoint.is_server(), + // msg->get_payload())); } break; case frame::opcode::PONG: @@ -473,10 +475,13 @@ protected: m_local_close_code = code; m_local_close_reason = reason; - - write_message(m_processor->prepare_close_frame(m_local_close_code, - !m_endpoint.is_server(), - m_local_close_reason)); + // TODO: fix + message::data_ptr msg = get_data_message(); + msg->reset(frame::opcode::CLOSE); + // TODO: msg payload set_status + m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand()); + write_message(msg); + m_write_state = INTURRUPT; } @@ -511,24 +516,33 @@ protected: // current write completes. - write_message(m_processor->prepare_close_frame(m_local_close_code, - !m_endpoint.is_server(), - m_local_close_reason)); + // TODO: fix + message::data_ptr msg = get_data_message(); + msg->reset(frame::opcode::CLOSE); + // TODO: msg payload set_status + m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand()); + write_message(msg); + + //write_message(m_processor->prepare_close_frame(m_local_close_code, + // !m_endpoint.is_server(), + // m_local_close_reason)); m_write_state = INTURRUPT; } - void write_message(binary_string_ptr msg) { + void write_message(message::data_ptr msg) { if (m_write_state == INTURRUPT) { return; } - m_write_buffer += msg->size(); + m_write_buffer += msg->get_payload().size(); + msg->acquire(); m_write_queue.push(msg); + write(); } void write() { - switch (m_write_state) { + switch (m_write_state) { case IDLE: break; case WRITING: @@ -538,7 +552,11 @@ protected: case INTURRUPT: // clear the queue except for the last message while (m_write_queue.size() > 1) { - m_write_buffer -= m_write_queue.front()->size(); + m_write_buffer -= m_write_queue.front()->get_payload().size(); + m_write_queue.front()->release(); + if (m_write_queue.front()->done()) { + recycle(m_write_queue.front()); + } m_write_queue.pop(); } break; @@ -552,9 +570,10 @@ protected: m_write_state = WRITING; } - std::vector data; + std::vector data; - data.push_back(boost::asio::buffer(*m_write_queue.front())); + data.push_back(boost::asio::buffer(m_write_queue.front()->get_header())); + data.push_back(boost::asio::buffer(m_write_queue.front()->get_payload())); boost::asio::async_write( socket_type::get_socket(), @@ -575,7 +594,7 @@ protected: } void handle_write(const boost::system::error_code& error) { - if (error) { + if (error) { if (error == boost::asio::error::operation_aborted) { // previous write was aborted m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) << "handle_write was called with operation_aborted error" << log::endl; @@ -591,9 +610,18 @@ protected: return; } - m_write_buffer -= m_write_queue.front()->size(); + m_write_buffer -= m_write_queue.front()->get_payload().size(); + m_write_queue.front()->release(); + if (m_write_queue.front()->done()) { + if (m_write_queue.front()->get_payload().size() > 0 && + (m_write_queue.front()->get_payload())[0] != '{') { + m_endpoint.alog().at(log::alevel::DEVEL) << "Recycling message, maxcount: " << m_write_queue.front()->m_max_refcount << log::endl; + } + + recycle(m_write_queue.front()); + } m_write_queue.pop(); - + if (m_write_state == WRITING) { m_write_state = IDLE; } @@ -697,7 +725,7 @@ protected: processor::ptr m_processor; // Write queue - std::queue m_write_queue; + std::queue m_write_queue; uint64_t m_write_buffer; write_state m_write_state; diff --git a/src/endpoint.hpp b/src/endpoint.hpp index 5fdaf5ee9f..8c865c030e 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -203,11 +203,80 @@ protected: handler_ptr get_handler() { return m_handler; } + + message::data_ptr get_data_message() { + // if we have one of this type free + if (!m_read_queue_avaliable.empty()) { + message::data_ptr p = m_read_queue_avaliable.front(); + m_read_queue_avaliable.pop(); + m_read_queue_used.insert(p); + return p; + } else { + if (m_read_queue_used.size() > 10) { + return message::data_ptr(); + } else { + m_read_queue_avaliable.push(message::data_ptr(new message::data())); + alog().at(log::alevel::DEVEL) + << "Allocating new data message. Count is now: " + << m_read_queue_used.size() + 1 + << log::endl; + return get_data_message(); + } + } + } + + void recycle(message::data_ptr p) { + if (m_read_queue_used.erase(p) == 0) { + // tried to recycle a pointer we don't control. + } else { + m_read_queue_avaliable.push(p); + + // wake next + if (!m_read_waiting.empty()) { + connection_ptr next = m_read_waiting.front(); + + /*endpoint_base::m_io_service.post( + boost::bind( + &connection_type::handle_read_frame, + next, + boost::system::error_code() + ) + );*/ + (*next).handle_read_frame(boost::system::error_code()); + + m_read_waiting.pop(); + } + // wake all + /*std::list::iterator it; + + for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) { + endpoint_base::m_io_service.post( + boost::bind( + &connection_type::handle_read_frame, + *it, + boost::system::error_code() + ) + ); + } + + m_read_waiting.empty();*/ + } + } + + void wait(connection_ptr con) { + m_read_waiting.push(con); + } + private: handler_ptr m_handler; std::set m_connections; alogger_type m_alog; elogger_type m_elog; + + // mssage buffers + std::queue m_read_queue_avaliable; + std::set m_read_queue_used; + std::queue m_read_waiting; }; /// traits class that allows looking up relevant endpoint types by the fully diff --git a/src/messages/data.cpp b/src/messages/data.cpp index aed78410c4..95db016935 100644 --- a/src/messages/data.cpp +++ b/src/messages/data.cpp @@ -32,7 +32,7 @@ using websocketpp::message::data; -data::data() { +data::data() : m_refcount(-1) { m_payload.reserve(PAYLOAD_SIZE_INIT); } @@ -43,6 +43,9 @@ websocketpp::frame::opcode::value data::get_opcode() const { const std::string& data::get_payload() const { return m_payload; } +const std::string& data::get_header() const { + return m_header; +} uint64_t data::process_payload(std::istream& input,uint64_t size) { unsigned char c; @@ -101,6 +104,8 @@ void data::reset(frame::opcode::value opcode) { m_masking_index = M_NOT_MASKED; // -1 indicates do not mask/unmask m_payload.resize(0); m_validator.reset(); + m_refcount = -1; + m_max_refcount = 0; } void data::complete() { @@ -117,22 +122,36 @@ void data::set_masking_key(int32_t key) { m_masking_index = (key == 0 ? M_MASK_KEY_ZERO : M_BYTE_0); } +void data::set_prepared(bool b) { + m_refcount = 0; +} + +bool data::get_prepared() const { + return m_refcount >= 0; +} + +void data::acquire() { + assert(m_refcount >= 0); + m_refcount++; + if (m_refcount > m_max_refcount) { + m_max_refcount = m_refcount; + } +} +void data::release() { + assert(m_refcount > 0); + m_refcount--; +} +bool data::done() const { + return m_refcount == 0; +} // This could be further optimized using methods that write directly into the // m_payload buffer void data::set_payload(const std::string& payload) { m_payload.reserve(payload.size()); - std::copy(payload.begin(), payload.end(), m_payload.begin()); + m_payload = payload; } -void data::process() { - websocketpp::processor::hybi_header header; - header.set_fin(true); - header.set_opcode(m_opcode); - - // set opcode - // set - - // mask +void data::mask() { if (m_masking_index >= 0) { for (std::string::iterator it = m_payload.begin(); it != m_payload.end(); it++) { (*it) = *it ^ m_masking_key[(m_masking_index++)%4]; diff --git a/src/messages/data.hpp b/src/messages/data.hpp index 5f8dc4a102..8ab8992479 100644 --- a/src/messages/data.hpp +++ b/src/messages/data.hpp @@ -45,6 +45,7 @@ public: frame::opcode::value get_opcode() const; const std::string& get_payload() const; + const std::string& get_header() const; // ##reading## // sets the masking key to be used to unmask as bytes are read. @@ -66,8 +67,14 @@ public: void set_header(const std::string& header); // Performs masking and header generation if it has not been done already. - void process(); + void set_prepared(bool b); + bool get_prepared() const; + void acquire(); + void release(); + bool done() const; + void mask(); + int m_max_refcount; private: static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB @@ -93,6 +100,8 @@ private: index_value m_masking_index; // Message buffers + int m_refcount; + std::string m_header; std::string m_payload; }; diff --git a/src/processors/hybi.hpp b/src/processors/hybi.hpp index 8a0b2be9d8..9c1a5cfb9a 100644 --- a/src/processors/hybi.hpp +++ b/src/processors/hybi.hpp @@ -495,6 +495,9 @@ public: // new prepare frame stuff void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) { + if (msg->get_prepared()) { + return; + } m_write_header.reset(); m_write_header.set_fin(true); m_write_header.set_opcode(msg->get_opcode()); @@ -503,6 +506,11 @@ public: m_write_header.complete(); msg->set_header(m_write_header.get_header_bytes()); + if (masked) { + msg->mask(); + } + + msg->set_prepared(true); } diff --git a/src/processors/hybi_header.cpp b/src/processors/hybi_header.cpp index 003fcc9cb4..c777f269ad 100644 --- a/src/processors/hybi_header.cpp +++ b/src/processors/hybi_header.cpp @@ -112,16 +112,16 @@ void hybi_header::set_masked(bool masked,int32_t key) { } void hybi_header::set_payload_size(uint64_t size) { if (size <= frame::limits::PAYLOAD_SIZE_BASIC) { - m_header[1] &= (size & BPB1_PAYLOAD); + m_header[1] |= size; m_payload_size = size; } else if (size <= frame::limits::PAYLOAD_SIZE_EXTENDED) { if (get_masked()) { // shift mask bytes to the correct position given the new size unsigned int mask_offset = get_header_len()-4; - m_header[1] &= (BASIC_PAYLOAD_16BIT_CODE & BPB1_PAYLOAD); + m_header[1] |= BASIC_PAYLOAD_16BIT_CODE; memcpy(&m_header[get_header_len()-4], &m_header[mask_offset], 4); } else { - m_header[1] &= (BASIC_PAYLOAD_16BIT_CODE & BPB1_PAYLOAD); + m_header[1] |= BASIC_PAYLOAD_16BIT_CODE; } m_payload_size = size; *(reinterpret_cast(&m_header[BASIC_HEADER_LENGTH])) = htons(size); @@ -129,16 +129,17 @@ void hybi_header::set_payload_size(uint64_t size) { if (get_masked()) { // shift mask bytes to the correct position given the new size unsigned int mask_offset = get_header_len()-4; - m_header[1] &= (BASIC_PAYLOAD_64BIT_CODE & BPB1_PAYLOAD); + m_header[1] |= BASIC_PAYLOAD_64BIT_CODE; memcpy(&m_header[get_header_len()-4], &m_header[mask_offset], 4); } else { - m_header[1] &= (BASIC_PAYLOAD_64BIT_CODE & BPB1_PAYLOAD); + m_header[1] |= BASIC_PAYLOAD_64BIT_CODE; } m_payload_size = size; *(reinterpret_cast(&m_header[BASIC_HEADER_LENGTH])) = htonll(size); } else { throw processor::exception("set_payload_size called with value that was too large (>2^63)",processor::error::MESSAGE_TOO_BIG); } + } void hybi_header::complete() { validate_basic_header(); diff --git a/src/processors/hybi_legacy.hpp b/src/processors/hybi_legacy.hpp index b1db85fc42..da987aa27c 100644 --- a/src/processors/hybi_legacy.hpp +++ b/src/processors/hybi_legacy.hpp @@ -256,6 +256,16 @@ public: return response; } + void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) { + if (msg->get_prepared()) { + return; + } + + msg->set_header(std::string(0x00)); + // TODO: append 0xFF + msg->set_prepared(true); + } + private: uint32_t decode_client_key(const std::string& key) { int spaces = 0; diff --git a/src/processors/processor.hpp b/src/processors/processor.hpp index b8940212ee..cc9b08fb0b 100644 --- a/src/processors/processor.hpp +++ b/src/processors/processor.hpp @@ -132,7 +132,7 @@ public: bool mask, const std::string& reason) = 0; - + virtual void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) = 0; }; diff --git a/src/roles/server.hpp b/src/roles/server.hpp index 10a62db355..6cf0ac82c3 100644 --- a/src/roles/server.hpp +++ b/src/roles/server.hpp @@ -352,7 +352,7 @@ void server::connection::handle_read_request( } // TODO: is there a way to short circuit this or something? - m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl; + //m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl; std::string h = m_request.header("Upgrade"); if (boost::ifind_first(h,"websocket")) {