From 14ada3a3a3c71d45cc2e527a8dfd472f0b804027 Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Fri, 6 Jan 2012 17:08:59 -0600 Subject: [PATCH] cleans up debugging code --- .../broadcast_admin_handler.hpp | 2 - src/connection.hpp | 114 +++++------------- src/endpoint.hpp | 84 ++----------- src/messages/data.cpp | 25 +--- src/messages/data.hpp | 46 ++++--- src/processors/hybi_legacy.hpp | 3 - 6 files changed, 71 insertions(+), 203 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_admin_handler.hpp b/examples/broadcast_server_tls/broadcast_admin_handler.hpp index d007092a49..8550414a1a 100644 --- a/examples/broadcast_server_tls/broadcast_admin_handler.hpp +++ b/examples/broadcast_server_tls/broadcast_admin_handler.hpp @@ -97,8 +97,6 @@ public: } else { command_error(connection,"Invalid Command"); } - - connection->recycle(msg); } void command_error(connection_ptr connection,const std::string msg) { diff --git a/src/connection.hpp b/src/connection.hpp index aed2768ed0..d6f55d6dde 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -113,7 +113,6 @@ public: socket_type::init(); m_control_message = message::control_ptr(new message::control()); - //m_read_queue_avaliable.push(message::data_ptr(new message::data())); } // SHOULD BE PROTECTED @@ -156,46 +155,21 @@ public: type::shared_from_this(), msg));*/ } - void send(const binary_string& data) { - /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY, - !m_endpoint.is_server(),data)); - m_endpoint.endpoint_base::m_io_service.post( - boost::bind( - &type::write_message, - type::shared_from_this(), - msg));*/ - } void send(message::data_ptr msg) { m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand()); write_message(msg); } - void close(close::status::value code, const utf8_string& reason = "") { + void close(close::status::value code, const std::string& reason = "") { // TODO: overloads without code or reason? send_close(code, reason); } - void ping(const binary_string& payload) { - // TODO: - /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING, - !m_endpoint.is_server(), - payload)); - - m_endpoint.endpoint_base::m_io_service.post( - boost::bind( - &type::write_message, - type::shared_from_this(), - msg)); */ + void ping(const std::string& payload) { + send_ping(payload); } - void pong(const binary_string& payload) { - // TODO: - /*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG, - !m_endpoint.is_server(),payload)); - m_endpoint.endpoint_base::m_io_service.post( - boost::bind( - &type::write_message, - type::shared_from_this(), - msg));*/ + void pong(const std::string& payload) { + send_pong(payload); } uint64_t buffered_amount() const { @@ -230,26 +204,6 @@ public: return m_endpoint.get_data_message(); } - void recycle(message::data_ptr p) { - m_endpoint.recycle(p); - /*if (m_read_queue_used.erase(p) == 0) { - // tried to recycle a pointer we don't control. - } else { - m_read_queue_avaliable.push(p); - - if (m_read_state == WAITING) { - m_endpoint.endpoint_base::m_io_service.post( - boost::bind( - &type::handle_read_frame, - type::shared_from_this(), - boost::system::error_code() - ) - ); - m_read_state = READING; - } - }*/ - } - message::control_ptr get_control_message() { return m_control_message; } @@ -398,11 +352,7 @@ protected: response = get_handler()->on_ping(type::shared_from_this(), msg->get_payload()); if (response) { - message::data_ptr pong = get_data_message(); - pong->reset(frame::opcode::PONG); - pong->set_payload(msg->get_payload()); - m_processor->prepare_frame(pong,!m_endpoint.is_server(),rand()); - write_message(pong); + send_pong(msg->get_payload()); } break; case frame::opcode::PONG: @@ -476,7 +426,8 @@ protected: m_local_close_code = code; m_local_close_reason = reason; - // TODO: fix + // TODO: optimize control messages and handle case where endpoint is + // out of messages message::data_ptr msg = get_data_message(); if (!msg) { @@ -489,7 +440,6 @@ protected: } msg->reset(frame::opcode::CLOSE); - // TODO: msg payload set_status m_processor->prepare_close_frame(msg,!m_endpoint.is_server(),rand(),code,reason); write_message(msg); @@ -527,30 +477,45 @@ protected: // current write completes. - // TODO: fix + // TODO: optimize control messages and handle case where endpoint is + // out of messages message::data_ptr msg = get_data_message(); msg->reset(frame::opcode::CLOSE); - // TODO: msg payload set_status m_processor->prepare_close_frame(msg, !m_endpoint.is_server(), rand(), m_local_close_code, m_local_close_reason); write_message(msg); - - //write_message(m_processor->prepare_close_frame(m_local_close_code, - // !m_endpoint.is_server(), - // m_local_close_reason)); m_write_state = INTURRUPT; } + void send_ping(const std::string& payload) { + // TODO: optimize control messages and handle case where + // endpoint is out of messages + message::data_ptr control = get_data_message(); + control->reset(frame::opcode::PING); + control->set_payload(payload); + m_processor->prepare_frame(control,!m_endpoint.is_server(),rand()); + write_message(control); + } + + void send_pong(const std::string& payload) { + // TODO: optimize control messages and handle case where + // endpoint is out of messages + message::data_ptr control = get_data_message(); + control->reset(frame::opcode::PONG); + control->set_payload(payload); + m_processor->prepare_frame(control,!m_endpoint.is_server(),rand()); + write_message(control); + } + void write_message(message::data_ptr msg) { if (m_write_state == INTURRUPT) { return; } m_write_buffer += msg->get_payload().size(); - msg->acquire(); m_write_queue.push(msg); write(); @@ -568,10 +533,6 @@ protected: // clear the queue except for the last message while (m_write_queue.size() > 1) { m_write_buffer -= m_write_queue.front()->get_payload().size(); - m_write_queue.front()->release(); - if (m_write_queue.front()->done()) { - //recycle(m_write_queue.front()); - } m_write_queue.pop(); } break; @@ -626,15 +587,6 @@ protected: } m_write_buffer -= m_write_queue.front()->get_payload().size(); - /*m_write_queue.front()->release(); - if (m_write_queue.front()->done()) { - if (m_write_queue.front()->get_payload().size() > 0 && - (m_write_queue.front()->get_payload())[0] != '{') { - m_endpoint.alog().at(log::alevel::DEVEL) << "Recycling message, maxcount: " << m_write_queue.front()->m_max_refcount << log::endl; - } - - recycle(m_write_queue.front()); - }*/ m_write_queue.pop(); if (m_write_state == WRITING) { @@ -754,10 +706,8 @@ protected: bool m_dropped_by_me; // Read queue - read_state m_read_state; - message::control_ptr m_control_message; - std::queue m_read_queue_avaliable; - std::set m_read_queue_used; + read_state m_read_state; + message::control_ptr m_control_message; }; // connection related types that it and its policy classes need. diff --git a/src/endpoint.hpp b/src/endpoint.hpp index 69fea12566..768361df30 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -220,86 +220,19 @@ protected: return m_handler; } + /// Gets a shared pointer to a read/write data message. message::data::ptr get_data_message() { - // if we have one of this type free - - /*alog().at(log::alevel::DEVEL) - << "message requested (" - << m_read_queue_used.size() - << "/" - << m_read_queue_avaliable.size()+m_read_queue_used.size() - << ") with " - << m_read_waiting.size() - << " waiting" - << log::endl; - - if (!m_read_queue_avaliable.empty()) { - message::data_ptr p = m_read_queue_avaliable.front(); - m_read_queue_avaliable.pop(); - m_read_queue_used.insert(p); - return p; - } else { - if (m_read_queue_used.size() > 10) { - return message::data_ptr(); - } else { - m_read_queue_avaliable.push(message::data_ptr(new message::data())); - alog().at(log::alevel::DEVEL) - << "Allocating new data message. Count is now: " - << m_read_queue_used.size() + 1 - << log::endl; - return get_data_message(); - } - }*/ - return m_pool->get(); - } - - void recycle(message::data::ptr p) { - m_pool->recycle(p); - /*if (m_read_queue_used.erase(p) == 0) { - // tried to recycle a pointer we don't control. - } else { - m_read_queue_avaliable.push(p); - - - - // wake next - if (!m_read_waiting.empty()) { - connection_ptr next = m_read_waiting.front(); - - endpoint_base::m_io_service.post( - boost::bind( - &connection_type::handle_read_frame, - next, - boost::system::error_code() - ) - ); - (*next).handle_read_frame(boost::system::error_code()); - - m_read_waiting.pop(); - } - // wake all - std::list::iterator it; - - for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) { - endpoint_base::m_io_service.post( - boost::bind( - &connection_type::handle_read_frame, - *it, - boost::system::error_code() - ) - ); - } - - m_read_waiting.empty(); - }*/ - - + return m_pool->get(); } + /// Asks the endpoint to restart this connection's handle_read_frame loop + /// when there are avaliable data messages. void wait(connection_ptr con) { m_read_waiting.push(con); } + /// Message pool callback indicating that there is a free data message + /// avaliable. Causes one waiting connection to get restarted. void on_new_message() { if (!m_read_waiting.empty()) { connection_ptr next = m_read_waiting.front(); @@ -313,10 +246,7 @@ private: alogger_type m_alog; elogger_type m_elog; - // mssage buffers - //std::queue m_read_queue_avaliable; - //std::set m_read_queue_used; - + // resource pools for read/write message buffers message::pool::ptr m_pool; std::queue m_read_waiting; }; diff --git a/src/messages/data.cpp b/src/messages/data.cpp index ebb060024b..c92759552e 100644 --- a/src/messages/data.cpp +++ b/src/messages/data.cpp @@ -32,7 +32,7 @@ using websocketpp::message::data; -data::data(data::pool_ptr p, size_t s) : m_refcount(-1),m_index(s),m_ref_count(0),m_pool(p) { +data::data(data::pool_ptr p, size_t s) : m_prepared(false),m_index(s),m_ref_count(0),m_pool(p),m_live(false) { m_payload.reserve(PAYLOAD_SIZE_INIT); } @@ -53,7 +53,6 @@ uint64_t data::process_payload(std::istream& input,uint64_t size) { uint64_t i; if (new_size > PAYLOAD_SIZE_MAX) { - // TODO: real exception throw processor::exception("Message too big",processor::error::MESSAGE_TOO_BIG); } @@ -104,8 +103,7 @@ void data::reset(frame::opcode::value opcode) { m_masking_index = M_NOT_MASKED; // -1 indicates do not mask/unmask m_payload.resize(0); m_validator.reset(); - m_refcount = -1; - m_max_refcount = 0; + m_prepared = false; } void data::complete() { @@ -123,26 +121,11 @@ void data::set_masking_key(int32_t key) { } void data::set_prepared(bool b) { - m_refcount = 0; + m_prepared = b; } bool data::get_prepared() const { - return m_refcount >= 0; -} - -void data::acquire() { - assert(m_refcount >= 0); - m_refcount++; - if (m_refcount > m_max_refcount) { - m_max_refcount = m_refcount; - } -} -void data::release() { - assert(m_refcount > 0); - m_refcount--; -} -bool data::done() const { - return m_refcount == 0; + return m_prepared; } // This could be further optimized using methods that write directly into the diff --git a/src/messages/data.hpp b/src/messages/data.hpp index c329a96447..1aca002687 100644 --- a/src/messages/data.hpp +++ b/src/messages/data.hpp @@ -32,9 +32,10 @@ #include "../utf8_validator/utf8_validator.hpp" #include -#include #include #include +#include +#include #include #include @@ -44,8 +45,10 @@ namespace websocketpp { namespace message { +/// message::pool impliments a reference counted pool of elements. + // element_type interface: -// constructor: +// constructor(ptr p, size_t index) // - shared pointer to the managing pool // - integer index @@ -53,7 +56,8 @@ namespace message { // set_live() template - class pool : public boost::enable_shared_from_this< pool > { + class pool : public boost::enable_shared_from_this< pool >, + boost::noncopyable { public: typedef pool type; typedef boost::shared_ptr ptr; @@ -61,16 +65,27 @@ public: typedef boost::function callback_type; pool(size_t max_elements) : m_cur_elements(0),m_max_elements(max_elements) {} + ~pool() {} + // copy/assignment constructors require C++11 + // boost::noncopyable is being used in the meantime. + // pool(pool const&) = delete; + // pool& operator=(pool const&) = delete + + /// Requests a pointer to the next free element in the resource pool. + /* If there isn't a free element a new one is created. If the maximum number + * of elements has been created then it returns an empty/null element + * pointer. + */ element_ptr get() { element_ptr p; - std::cout << "message requested (" + /*std::cout << "message requested (" << m_cur_elements-m_avaliable.size() << "/" << m_cur_elements << ")" - << std::endl; + << std::endl;*/ if (!m_avaliable.empty()) { p = m_avaliable.front(); @@ -85,16 +100,17 @@ public: m_cur_elements++; m_used.push_back(p); - std::cout << "Allocated new data message. Count is now " + /*std::cout << "Allocated new data message. Count is now " << m_cur_elements - << std::endl; + << std::endl;*/ } p->set_live(); return p; } void recycle(element_ptr p) { - if (m_used[p->get_index()] != p) { + if (p->get_index()+1 > m_used.size() || m_used[p->get_index()] != p) { + std::cout << "error tried to recycle a pointer we don't control" << std::endl; // error tried to recycle a pointer we don't control return; } @@ -158,14 +174,9 @@ public: // Performs masking and header generation if it has not been done already. void set_prepared(bool b); bool get_prepared() const; - void acquire(); - void release(); - bool done() const; void mask(); - - int m_max_refcount; - - // RC + + // pool management interface void set_live(); size_t get_index() const; private: @@ -207,13 +218,12 @@ private: unsigned char m_masking_key[4]; // m_masking_index can take on index_value m_masking_index; - - // Message buffers - int m_refcount; std::string m_header; std::string m_payload; + bool m_prepared; + // reference counting size_t m_index; mutable boost::detail::atomic_count m_ref_count; diff --git a/src/processors/hybi_legacy.hpp b/src/processors/hybi_legacy.hpp index 1139f07d33..585d06eaa3 100644 --- a/src/processors/hybi_legacy.hpp +++ b/src/processors/hybi_legacy.hpp @@ -188,9 +188,6 @@ public: void reset() { m_state = hybi_legacy_state::INIT; - if (m_data_message) { - m_connection.recycle(m_data_message); - } m_data_message.reset(); }