write queue passes autobahn tests

This commit is contained in:
Peter Thorson
2012-01-06 15:20:08 -06:00
parent 50abd0b426
commit 93efa1ae97
5 changed files with 167 additions and 50 deletions

View File

@@ -94,7 +94,7 @@ public:
if (command.command == "ack") {
handle_ack(connection,command);
connection->recycle(msg);
//connection->recycle(msg);
} else {
broadcast_message(msg);
}

View File

@@ -113,7 +113,7 @@ public:
socket_type::init();
m_control_message = message::control_ptr(new message::control());
m_read_queue_avaliable.push(message::data_ptr(new message::data()));
//m_read_queue_avaliable.push(message::data_ptr(new message::data()));
}
// SHOULD BE PROTECTED
@@ -570,7 +570,7 @@ protected:
m_write_buffer -= m_write_queue.front()->get_payload().size();
m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
recycle(m_write_queue.front());
//recycle(m_write_queue.front());
}
m_write_queue.pop();
}
@@ -626,7 +626,7 @@ protected:
}
m_write_buffer -= m_write_queue.front()->get_payload().size();
m_write_queue.front()->release();
/*m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
if (m_write_queue.front()->get_payload().size() > 0 &&
(m_write_queue.front()->get_payload())[0] != '{') {
@@ -634,7 +634,7 @@ protected:
}
recycle(m_write_queue.front());
}
}*/
m_write_queue.pop();
if (m_write_state == WRITING) {

View File

@@ -34,6 +34,7 @@
#include <boost/asio.hpp>
#include <boost/shared_ptr.hpp>
#include <boost/utility.hpp>
#include <iostream>
#include <set>
@@ -65,7 +66,8 @@ template <
class endpoint
: public endpoint_base,
public role< endpoint<role,socket> >,
public socket< endpoint<role,socket> >
public socket< endpoint<role,socket> >,
boost::noncopyable
{
public:
/// Type of the traits class that stores endpoint related types.
@@ -116,7 +118,21 @@ public:
explicit endpoint(handler_ptr handler)
: role_type(m_io_service),
socket_type(m_io_service),
m_handler(handler) {}
m_handler(handler),
m_pool(new message::pool<message::data>(10))
{
m_pool->set_callback(boost::bind(&type::on_new_message,this));
}
/// Destroy and endpoint
~endpoint() {
m_pool->set_callback(NULL);
}
// copy/assignment constructors require C++11
// boost::noncopyable is being used in the meantime.
// endpoint(endpoint const&) = delete;
// endpoint& operator=(endpoint const&) = delete
/// Returns a reference to the endpoint's access logger.
/**
@@ -204,10 +220,10 @@ protected:
return m_handler;
}
message::data_ptr get_data_message() {
message::data::ptr get_data_message() {
// if we have one of this type free
alog().at(log::alevel::DEVEL)
/*alog().at(log::alevel::DEVEL)
<< "message requested ("
<< m_read_queue_used.size()
<< "/"
@@ -233,11 +249,13 @@ protected:
<< log::endl;
return get_data_message();
}
}
}*/
return m_pool->get();
}
void recycle(message::data_ptr p) {
if (m_read_queue_used.erase(p) == 0) {
void recycle(message::data::ptr p) {
m_pool->recycle(p);
/*if (m_read_queue_used.erase(p) == 0) {
// tried to recycle a pointer we don't control.
} else {
m_read_queue_avaliable.push(p);
@@ -248,19 +266,19 @@ protected:
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
/*endpoint_base::m_io_service.post(
endpoint_base::m_io_service.post(
boost::bind(
&connection_type::handle_read_frame,
next,
boost::system::error_code()
)
);*/
);
(*next).handle_read_frame(boost::system::error_code());
m_read_waiting.pop();
}
// wake all
/*std::list<connection_ptr>::iterator it;
std::list<connection_ptr>::iterator it;
for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) {
endpoint_base::m_io_service.post(
@@ -272,8 +290,8 @@ protected:
);
}
m_read_waiting.empty();*/
}
m_read_waiting.empty();
}*/
}
@@ -282,6 +300,13 @@ protected:
m_read_waiting.push(con);
}
void on_new_message() {
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
(*next).handle_read_frame(boost::system::error_code());
m_read_waiting.pop();
}
}
private:
handler_ptr m_handler;
std::set<connection_ptr> m_connections;
@@ -289,9 +314,11 @@ private:
elogger_type m_elog;
// mssage buffers
std::queue<message::data_ptr> m_read_queue_avaliable;
std::set<message::data_ptr> m_read_queue_used;
std::queue<connection_ptr> m_read_waiting;
//std::queue<message::data_ptr> m_read_queue_avaliable;
//std::set<message::data_ptr> m_read_queue_used;
message::pool<message::data>::ptr m_pool;
std::queue<connection_ptr> m_read_waiting;
};
/// traits class that allows looking up relevant endpoint types by the fully

View File

@@ -32,7 +32,7 @@
using websocketpp::message::data;
data::data() : m_refcount(-1) {
data::data(data::pool_ptr p, size_t s) : m_refcount(-1),m_index(s),m_ref_count(0),m_pool(p) {
m_payload.reserve(PAYLOAD_SIZE_INIT);
}
@@ -166,3 +166,16 @@ void data::mask() {
void data::set_header(const std::string& header) {
m_header = header;
}
//
void data::set_live() {
m_live = true;
}
size_t data::get_index() const {
return m_index;
}

View File

@@ -32,51 +32,102 @@
#include "../utf8_validator/utf8_validator.hpp"
#include <boost/detail/atomic_count.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <algorithm>
#include <istream>
#include <queue>
#include <vector>
namespace websocketpp {
namespace message {
/*class intrusive_ptr_base {
// element_type interface:
// constructor:
// - shared pointer to the managing pool
// - integer index
// get_index();
// set_live()
template <class element_type>
class pool : public boost::enable_shared_from_this< pool<element_type> > {
public:
intrusive_ptr_base() : ref_count(0) {}
intrusive_ptr_base(intrusive_ptr_base const&) : ref_count(0) {}
intrusive_ptr_base& operator=(intrusive_ptr_base const& rhs) {
return *this;
}
friend void intrusive_ptr_add_ref(intrusive_ptr_base const* s) {
assert(s->ref_count >= 0);
assert(s != 0);
++s->ref_count;
}
friend void intrusive_ptr_release(intrusive_ptr_base const* s) {
assert(s->ref_count > 0);
assert(s != 0);
typedef pool<element_type> type;
typedef boost::shared_ptr<type> ptr;
typedef typename element_type::ptr element_ptr;
typedef boost::function<void()> callback_type;
pool(size_t max_elements) : m_cur_elements(0),m_max_elements(max_elements) {}
element_ptr get() {
element_ptr p;
// TODO: thread safety
long count = --s->ref_count;
if (count == 1 && endpoint != NULL) {
// recycle if endpoint exists
endpoint->recycle();
} else if (count == 0) {
boost::checked_delete(static_cast<intrusive_ptr_base const*>(s));
std::cout << "message requested ("
<< m_cur_elements-m_avaliable.size()
<< "/"
<< m_cur_elements
<< ")"
<< std::endl;
if (!m_avaliable.empty()) {
p = m_avaliable.front();
m_avaliable.pop();
m_used[p->get_index()] = p;
} else {
if (m_cur_elements == m_max_elements) {
return element_ptr();
}
p = element_ptr(new element_type(type::shared_from_this(),m_cur_elements));
m_cur_elements++;
m_used.push_back(p);
std::cout << "Allocated new data message. Count is now "
<< m_cur_elements
<< std::endl;
}
p->set_live();
return p;
}
void recycle(element_ptr p) {
if (m_used[p->get_index()] != p) {
// error tried to recycle a pointer we don't control
return;
}
m_avaliable.push(p);
m_used[p->get_index()] = element_ptr();
if (m_callback && m_avaliable.size() == 1) {
m_callback();
}
}
detach() {
endpoint = NULL;
// set a function that will be called when new elements are avaliable.
void set_callback(callback_type fn) {
m_callback = fn;
}
private:
websocketpp::endpoint_base* endpoint;
mutable boost::detail::atomic_count ref_count;
};*/
size_t m_cur_elements;
size_t m_max_elements;
std::queue<element_ptr> m_avaliable;
std::vector<element_ptr> m_used;
callback_type m_callback;
};
class data {
public:
data();
typedef boost::intrusive_ptr<data> ptr;
typedef pool<data>::ptr pool_ptr;
data(pool_ptr p, size_t s);
void reset(frame::opcode::value opcode);
@@ -113,6 +164,10 @@ public:
void mask();
int m_max_refcount;
// RC
void set_live();
size_t get_index() const;
private:
static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB
static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB
@@ -126,6 +181,22 @@ private:
M_BYTE_3 = 3
};
friend void intrusive_ptr_add_ref(const data * s) {
++s->m_ref_count;
}
friend void intrusive_ptr_release(const data * s) {
// TODO: thread safety
long count = --s->m_ref_count;
if (count == 1 && s->m_live) {
// recycle if endpoint exists
s->m_live = false;
s->m_pool->recycle(ptr(const_cast<data *>(s)));
} else if (count == 0) {
boost::checked_delete(static_cast<data const *>(s));
}
}
// Message state
frame::opcode::value m_opcode;
@@ -142,9 +213,15 @@ private:
std::string m_header;
std::string m_payload;
// reference counting
size_t m_index;
mutable boost::detail::atomic_count m_ref_count;
mutable pool_ptr m_pool;
mutable bool m_live;
};
typedef boost::shared_ptr<data> data_ptr;
typedef boost::intrusive_ptr<data> data_ptr;
} // namespace message
} // namespace websocketpp