From 93efa1ae97647b74fcac1e1325c4432af78ab95d Mon Sep 17 00:00:00 2001 From: Peter Thorson Date: Fri, 6 Jan 2012 15:20:08 -0600 Subject: [PATCH] write queue passes autobahn tests --- .../broadcast_handler.hpp | 2 +- src/connection.hpp | 8 +- src/endpoint.hpp | 57 ++++++-- src/messages/data.cpp | 15 +- src/messages/data.hpp | 135 ++++++++++++++---- 5 files changed, 167 insertions(+), 50 deletions(-) diff --git a/examples/broadcast_server_tls/broadcast_handler.hpp b/examples/broadcast_server_tls/broadcast_handler.hpp index f270288113..876a586c27 100644 --- a/examples/broadcast_server_tls/broadcast_handler.hpp +++ b/examples/broadcast_server_tls/broadcast_handler.hpp @@ -94,7 +94,7 @@ public: if (command.command == "ack") { handle_ack(connection,command); - connection->recycle(msg); + //connection->recycle(msg); } else { broadcast_message(msg); } diff --git a/src/connection.hpp b/src/connection.hpp index f18437d4a2..aed2768ed0 100644 --- a/src/connection.hpp +++ b/src/connection.hpp @@ -113,7 +113,7 @@ public: socket_type::init(); m_control_message = message::control_ptr(new message::control()); - m_read_queue_avaliable.push(message::data_ptr(new message::data())); + //m_read_queue_avaliable.push(message::data_ptr(new message::data())); } // SHOULD BE PROTECTED @@ -570,7 +570,7 @@ protected: m_write_buffer -= m_write_queue.front()->get_payload().size(); m_write_queue.front()->release(); if (m_write_queue.front()->done()) { - recycle(m_write_queue.front()); + //recycle(m_write_queue.front()); } m_write_queue.pop(); } @@ -626,7 +626,7 @@ protected: } m_write_buffer -= m_write_queue.front()->get_payload().size(); - m_write_queue.front()->release(); + /*m_write_queue.front()->release(); if (m_write_queue.front()->done()) { if (m_write_queue.front()->get_payload().size() > 0 && (m_write_queue.front()->get_payload())[0] != '{') { @@ -634,7 +634,7 @@ protected: } recycle(m_write_queue.front()); - } + }*/ m_write_queue.pop(); if (m_write_state == WRITING) { diff --git a/src/endpoint.hpp b/src/endpoint.hpp index 85886bbfcc..69fea12566 100644 --- a/src/endpoint.hpp +++ b/src/endpoint.hpp @@ -34,6 +34,7 @@ #include #include +#include #include #include @@ -65,7 +66,8 @@ template < class endpoint : public endpoint_base, public role< endpoint >, - public socket< endpoint > + public socket< endpoint >, + boost::noncopyable { public: /// Type of the traits class that stores endpoint related types. @@ -116,7 +118,21 @@ public: explicit endpoint(handler_ptr handler) : role_type(m_io_service), socket_type(m_io_service), - m_handler(handler) {} + m_handler(handler), + m_pool(new message::pool(10)) + { + m_pool->set_callback(boost::bind(&type::on_new_message,this)); + } + + /// Destroy and endpoint + ~endpoint() { + m_pool->set_callback(NULL); + } + + // copy/assignment constructors require C++11 + // boost::noncopyable is being used in the meantime. + // endpoint(endpoint const&) = delete; + // endpoint& operator=(endpoint const&) = delete /// Returns a reference to the endpoint's access logger. /** @@ -204,10 +220,10 @@ protected: return m_handler; } - message::data_ptr get_data_message() { + message::data::ptr get_data_message() { // if we have one of this type free - alog().at(log::alevel::DEVEL) + /*alog().at(log::alevel::DEVEL) << "message requested (" << m_read_queue_used.size() << "/" @@ -233,11 +249,13 @@ protected: << log::endl; return get_data_message(); } - } + }*/ + return m_pool->get(); } - void recycle(message::data_ptr p) { - if (m_read_queue_used.erase(p) == 0) { + void recycle(message::data::ptr p) { + m_pool->recycle(p); + /*if (m_read_queue_used.erase(p) == 0) { // tried to recycle a pointer we don't control. } else { m_read_queue_avaliable.push(p); @@ -248,19 +266,19 @@ protected: if (!m_read_waiting.empty()) { connection_ptr next = m_read_waiting.front(); - /*endpoint_base::m_io_service.post( + endpoint_base::m_io_service.post( boost::bind( &connection_type::handle_read_frame, next, boost::system::error_code() ) - );*/ + ); (*next).handle_read_frame(boost::system::error_code()); m_read_waiting.pop(); } // wake all - /*std::list::iterator it; + std::list::iterator it; for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) { endpoint_base::m_io_service.post( @@ -272,8 +290,8 @@ protected: ); } - m_read_waiting.empty();*/ - } + m_read_waiting.empty(); + }*/ } @@ -282,6 +300,13 @@ protected: m_read_waiting.push(con); } + void on_new_message() { + if (!m_read_waiting.empty()) { + connection_ptr next = m_read_waiting.front(); + (*next).handle_read_frame(boost::system::error_code()); + m_read_waiting.pop(); + } + } private: handler_ptr m_handler; std::set m_connections; @@ -289,9 +314,11 @@ private: elogger_type m_elog; // mssage buffers - std::queue m_read_queue_avaliable; - std::set m_read_queue_used; - std::queue m_read_waiting; + //std::queue m_read_queue_avaliable; + //std::set m_read_queue_used; + + message::pool::ptr m_pool; + std::queue m_read_waiting; }; /// traits class that allows looking up relevant endpoint types by the fully diff --git a/src/messages/data.cpp b/src/messages/data.cpp index acad60767f..ebb060024b 100644 --- a/src/messages/data.cpp +++ b/src/messages/data.cpp @@ -32,7 +32,7 @@ using websocketpp::message::data; -data::data() : m_refcount(-1) { +data::data(data::pool_ptr p, size_t s) : m_refcount(-1),m_index(s),m_ref_count(0),m_pool(p) { m_payload.reserve(PAYLOAD_SIZE_INIT); } @@ -166,3 +166,16 @@ void data::mask() { void data::set_header(const std::string& header) { m_header = header; } + + + + + +// +void data::set_live() { + m_live = true; +} +size_t data::get_index() const { + return m_index; +} + diff --git a/src/messages/data.hpp b/src/messages/data.hpp index 892754929f..c329a96447 100644 --- a/src/messages/data.hpp +++ b/src/messages/data.hpp @@ -32,51 +32,102 @@ #include "../utf8_validator/utf8_validator.hpp" #include +#include +#include +#include #include #include +#include +#include namespace websocketpp { namespace message { -/*class intrusive_ptr_base { +// element_type interface: +// constructor: +// - shared pointer to the managing pool +// - integer index + +// get_index(); +// set_live() + +template + class pool : public boost::enable_shared_from_this< pool > { public: - intrusive_ptr_base() : ref_count(0) {} - intrusive_ptr_base(intrusive_ptr_base const&) : ref_count(0) {} - intrusive_ptr_base& operator=(intrusive_ptr_base const& rhs) { - return *this; - } - friend void intrusive_ptr_add_ref(intrusive_ptr_base const* s) { - assert(s->ref_count >= 0); - assert(s != 0); - ++s->ref_count; - } - friend void intrusive_ptr_release(intrusive_ptr_base const* s) { - assert(s->ref_count > 0); - assert(s != 0); + typedef pool type; + typedef boost::shared_ptr ptr; + typedef typename element_type::ptr element_ptr; + typedef boost::function callback_type; + + pool(size_t max_elements) : m_cur_elements(0),m_max_elements(max_elements) {} + + element_ptr get() { + element_ptr p; - // TODO: thread safety - long count = --s->ref_count; - if (count == 1 && endpoint != NULL) { - // recycle if endpoint exists - endpoint->recycle(); - } else if (count == 0) { - boost::checked_delete(static_cast(s)); + std::cout << "message requested (" + << m_cur_elements-m_avaliable.size() + << "/" + << m_cur_elements + << ")" + << std::endl; + + if (!m_avaliable.empty()) { + p = m_avaliable.front(); + m_avaliable.pop(); + m_used[p->get_index()] = p; + } else { + if (m_cur_elements == m_max_elements) { + return element_ptr(); + } + + p = element_ptr(new element_type(type::shared_from_this(),m_cur_elements)); + m_cur_elements++; + m_used.push_back(p); + + std::cout << "Allocated new data message. Count is now " + << m_cur_elements + << std::endl; + } + + p->set_live(); + return p; + } + void recycle(element_ptr p) { + if (m_used[p->get_index()] != p) { + // error tried to recycle a pointer we don't control + return; + } + + m_avaliable.push(p); + m_used[p->get_index()] = element_ptr(); + + if (m_callback && m_avaliable.size() == 1) { + m_callback(); } } - detach() { - endpoint = NULL; + // set a function that will be called when new elements are avaliable. + void set_callback(callback_type fn) { + m_callback = fn; } + private: - websocketpp::endpoint_base* endpoint; - mutable boost::detail::atomic_count ref_count; -};*/ - + size_t m_cur_elements; + size_t m_max_elements; + + std::queue m_avaliable; + std::vector m_used; + + callback_type m_callback; +}; class data { public: - data(); + typedef boost::intrusive_ptr ptr; + typedef pool::ptr pool_ptr; + + data(pool_ptr p, size_t s); void reset(frame::opcode::value opcode); @@ -113,6 +164,10 @@ public: void mask(); int m_max_refcount; + + // RC + void set_live(); + size_t get_index() const; private: static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB @@ -126,6 +181,22 @@ private: M_BYTE_3 = 3 }; + friend void intrusive_ptr_add_ref(const data * s) { + ++s->m_ref_count; + } + + friend void intrusive_ptr_release(const data * s) { + // TODO: thread safety + long count = --s->m_ref_count; + if (count == 1 && s->m_live) { + // recycle if endpoint exists + s->m_live = false; + s->m_pool->recycle(ptr(const_cast(s))); + } else if (count == 0) { + boost::checked_delete(static_cast(s)); + } + } + // Message state frame::opcode::value m_opcode; @@ -142,9 +213,15 @@ private: std::string m_header; std::string m_payload; + + // reference counting + size_t m_index; + mutable boost::detail::atomic_count m_ref_count; + mutable pool_ptr m_pool; + mutable bool m_live; }; -typedef boost::shared_ptr data_ptr; +typedef boost::intrusive_ptr data_ptr; } // namespace message } // namespace websocketpp