cleans up debugging code

This commit is contained in:
Peter Thorson
2012-01-06 17:08:59 -06:00
parent 93efa1ae97
commit 14ada3a3a3
6 changed files with 71 additions and 203 deletions

View File

@@ -97,8 +97,6 @@ public:
} else {
command_error(connection,"Invalid Command");
}
connection->recycle(msg);
}
void command_error(connection_ptr connection,const std::string msg) {

View File

@@ -113,7 +113,6 @@ public:
socket_type::init();
m_control_message = message::control_ptr(new message::control());
//m_read_queue_avaliable.push(message::data_ptr(new message::data()));
}
// SHOULD BE PROTECTED
@@ -156,46 +155,21 @@ public:
type::shared_from_this(),
msg));*/
}
void send(const binary_string& data) {
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY,
!m_endpoint.is_server(),data));
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));*/
}
void send(message::data_ptr msg) {
m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand());
write_message(msg);
}
void close(close::status::value code, const utf8_string& reason = "") {
void close(close::status::value code, const std::string& reason = "") {
// TODO: overloads without code or reason?
send_close(code, reason);
}
void ping(const binary_string& payload) {
// TODO:
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING,
!m_endpoint.is_server(),
payload));
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::write_message,
type::shared_from_this(),
msg)); */
void ping(const std::string& payload) {
send_ping(payload);
}
void pong(const binary_string& payload) {
// TODO:
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG,
!m_endpoint.is_server(),payload));
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));*/
void pong(const std::string& payload) {
send_pong(payload);
}
uint64_t buffered_amount() const {
@@ -230,26 +204,6 @@ public:
return m_endpoint.get_data_message();
}
void recycle(message::data_ptr p) {
m_endpoint.recycle(p);
/*if (m_read_queue_used.erase(p) == 0) {
// tried to recycle a pointer we don't control.
} else {
m_read_queue_avaliable.push(p);
if (m_read_state == WAITING) {
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::handle_read_frame,
type::shared_from_this(),
boost::system::error_code()
)
);
m_read_state = READING;
}
}*/
}
message::control_ptr get_control_message() {
return m_control_message;
}
@@ -398,11 +352,7 @@ protected:
response = get_handler()->on_ping(type::shared_from_this(),
msg->get_payload());
if (response) {
message::data_ptr pong = get_data_message();
pong->reset(frame::opcode::PONG);
pong->set_payload(msg->get_payload());
m_processor->prepare_frame(pong,!m_endpoint.is_server(),rand());
write_message(pong);
send_pong(msg->get_payload());
}
break;
case frame::opcode::PONG:
@@ -476,7 +426,8 @@ protected:
m_local_close_code = code;
m_local_close_reason = reason;
// TODO: fix
// TODO: optimize control messages and handle case where endpoint is
// out of messages
message::data_ptr msg = get_data_message();
if (!msg) {
@@ -489,7 +440,6 @@ protected:
}
msg->reset(frame::opcode::CLOSE);
// TODO: msg payload set_status
m_processor->prepare_close_frame(msg,!m_endpoint.is_server(),rand(),code,reason);
write_message(msg);
@@ -527,30 +477,45 @@ protected:
// current write completes.
// TODO: fix
// TODO: optimize control messages and handle case where endpoint is
// out of messages
message::data_ptr msg = get_data_message();
msg->reset(frame::opcode::CLOSE);
// TODO: msg payload set_status
m_processor->prepare_close_frame(msg,
!m_endpoint.is_server(),
rand(),
m_local_close_code,
m_local_close_reason);
write_message(msg);
//write_message(m_processor->prepare_close_frame(m_local_close_code,
// !m_endpoint.is_server(),
// m_local_close_reason));
m_write_state = INTURRUPT;
}
void send_ping(const std::string& payload) {
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_data_message();
control->reset(frame::opcode::PING);
control->set_payload(payload);
m_processor->prepare_frame(control,!m_endpoint.is_server(),rand());
write_message(control);
}
void send_pong(const std::string& payload) {
// TODO: optimize control messages and handle case where
// endpoint is out of messages
message::data_ptr control = get_data_message();
control->reset(frame::opcode::PONG);
control->set_payload(payload);
m_processor->prepare_frame(control,!m_endpoint.is_server(),rand());
write_message(control);
}
void write_message(message::data_ptr msg) {
if (m_write_state == INTURRUPT) {
return;
}
m_write_buffer += msg->get_payload().size();
msg->acquire();
m_write_queue.push(msg);
write();
@@ -568,10 +533,6 @@ protected:
// clear the queue except for the last message
while (m_write_queue.size() > 1) {
m_write_buffer -= m_write_queue.front()->get_payload().size();
m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
//recycle(m_write_queue.front());
}
m_write_queue.pop();
}
break;
@@ -626,15 +587,6 @@ protected:
}
m_write_buffer -= m_write_queue.front()->get_payload().size();
/*m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
if (m_write_queue.front()->get_payload().size() > 0 &&
(m_write_queue.front()->get_payload())[0] != '{') {
m_endpoint.alog().at(log::alevel::DEVEL) << "Recycling message, maxcount: " << m_write_queue.front()->m_max_refcount << log::endl;
}
recycle(m_write_queue.front());
}*/
m_write_queue.pop();
if (m_write_state == WRITING) {
@@ -754,10 +706,8 @@ protected:
bool m_dropped_by_me;
// Read queue
read_state m_read_state;
message::control_ptr m_control_message;
std::queue<message::data_ptr> m_read_queue_avaliable;
std::set<message::data_ptr> m_read_queue_used;
read_state m_read_state;
message::control_ptr m_control_message;
};
// connection related types that it and its policy classes need.

View File

@@ -220,86 +220,19 @@ protected:
return m_handler;
}
/// Gets a shared pointer to a read/write data message.
message::data::ptr get_data_message() {
// if we have one of this type free
/*alog().at(log::alevel::DEVEL)
<< "message requested ("
<< m_read_queue_used.size()
<< "/"
<< m_read_queue_avaliable.size()+m_read_queue_used.size()
<< ") with "
<< m_read_waiting.size()
<< " waiting"
<< log::endl;
if (!m_read_queue_avaliable.empty()) {
message::data_ptr p = m_read_queue_avaliable.front();
m_read_queue_avaliable.pop();
m_read_queue_used.insert(p);
return p;
} else {
if (m_read_queue_used.size() > 10) {
return message::data_ptr();
} else {
m_read_queue_avaliable.push(message::data_ptr(new message::data()));
alog().at(log::alevel::DEVEL)
<< "Allocating new data message. Count is now: "
<< m_read_queue_used.size() + 1
<< log::endl;
return get_data_message();
}
}*/
return m_pool->get();
}
void recycle(message::data::ptr p) {
m_pool->recycle(p);
/*if (m_read_queue_used.erase(p) == 0) {
// tried to recycle a pointer we don't control.
} else {
m_read_queue_avaliable.push(p);
// wake next
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
endpoint_base::m_io_service.post(
boost::bind(
&connection_type::handle_read_frame,
next,
boost::system::error_code()
)
);
(*next).handle_read_frame(boost::system::error_code());
m_read_waiting.pop();
}
// wake all
std::list<connection_ptr>::iterator it;
for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) {
endpoint_base::m_io_service.post(
boost::bind(
&connection_type::handle_read_frame,
*it,
boost::system::error_code()
)
);
}
m_read_waiting.empty();
}*/
return m_pool->get();
}
/// Asks the endpoint to restart this connection's handle_read_frame loop
/// when there are avaliable data messages.
void wait(connection_ptr con) {
m_read_waiting.push(con);
}
/// Message pool callback indicating that there is a free data message
/// avaliable. Causes one waiting connection to get restarted.
void on_new_message() {
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
@@ -313,10 +246,7 @@ private:
alogger_type m_alog;
elogger_type m_elog;
// mssage buffers
//std::queue<message::data_ptr> m_read_queue_avaliable;
//std::set<message::data_ptr> m_read_queue_used;
// resource pools for read/write message buffers
message::pool<message::data>::ptr m_pool;
std::queue<connection_ptr> m_read_waiting;
};

View File

@@ -32,7 +32,7 @@
using websocketpp::message::data;
data::data(data::pool_ptr p, size_t s) : m_refcount(-1),m_index(s),m_ref_count(0),m_pool(p) {
data::data(data::pool_ptr p, size_t s) : m_prepared(false),m_index(s),m_ref_count(0),m_pool(p),m_live(false) {
m_payload.reserve(PAYLOAD_SIZE_INIT);
}
@@ -53,7 +53,6 @@ uint64_t data::process_payload(std::istream& input,uint64_t size) {
uint64_t i;
if (new_size > PAYLOAD_SIZE_MAX) {
// TODO: real exception
throw processor::exception("Message too big",processor::error::MESSAGE_TOO_BIG);
}
@@ -104,8 +103,7 @@ void data::reset(frame::opcode::value opcode) {
m_masking_index = M_NOT_MASKED; // -1 indicates do not mask/unmask
m_payload.resize(0);
m_validator.reset();
m_refcount = -1;
m_max_refcount = 0;
m_prepared = false;
}
void data::complete() {
@@ -123,26 +121,11 @@ void data::set_masking_key(int32_t key) {
}
void data::set_prepared(bool b) {
m_refcount = 0;
m_prepared = b;
}
bool data::get_prepared() const {
return m_refcount >= 0;
}
void data::acquire() {
assert(m_refcount >= 0);
m_refcount++;
if (m_refcount > m_max_refcount) {
m_max_refcount = m_refcount;
}
}
void data::release() {
assert(m_refcount > 0);
m_refcount--;
}
bool data::done() const {
return m_refcount == 0;
return m_prepared;
}
// This could be further optimized using methods that write directly into the

View File

@@ -32,9 +32,10 @@
#include "../utf8_validator/utf8_validator.hpp"
#include <boost/detail/atomic_count.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/enable_shared_from_this.hpp>
#include <boost/function.hpp>
#include <boost/intrusive_ptr.hpp>
#include <boost/utility.hpp>
#include <algorithm>
#include <istream>
@@ -44,8 +45,10 @@
namespace websocketpp {
namespace message {
/// message::pool impliments a reference counted pool of elements.
// element_type interface:
// constructor:
// constructor(ptr p, size_t index)
// - shared pointer to the managing pool
// - integer index
@@ -53,7 +56,8 @@ namespace message {
// set_live()
template <class element_type>
class pool : public boost::enable_shared_from_this< pool<element_type> > {
class pool : public boost::enable_shared_from_this< pool<element_type> >,
boost::noncopyable {
public:
typedef pool<element_type> type;
typedef boost::shared_ptr<type> ptr;
@@ -61,16 +65,27 @@ public:
typedef boost::function<void()> callback_type;
pool(size_t max_elements) : m_cur_elements(0),m_max_elements(max_elements) {}
~pool() {}
// copy/assignment constructors require C++11
// boost::noncopyable is being used in the meantime.
// pool(pool const&) = delete;
// pool& operator=(pool const&) = delete
/// Requests a pointer to the next free element in the resource pool.
/* If there isn't a free element a new one is created. If the maximum number
* of elements has been created then it returns an empty/null element
* pointer.
*/
element_ptr get() {
element_ptr p;
std::cout << "message requested ("
/*std::cout << "message requested ("
<< m_cur_elements-m_avaliable.size()
<< "/"
<< m_cur_elements
<< ")"
<< std::endl;
<< std::endl;*/
if (!m_avaliable.empty()) {
p = m_avaliable.front();
@@ -85,16 +100,17 @@ public:
m_cur_elements++;
m_used.push_back(p);
std::cout << "Allocated new data message. Count is now "
/*std::cout << "Allocated new data message. Count is now "
<< m_cur_elements
<< std::endl;
<< std::endl;*/
}
p->set_live();
return p;
}
void recycle(element_ptr p) {
if (m_used[p->get_index()] != p) {
if (p->get_index()+1 > m_used.size() || m_used[p->get_index()] != p) {
std::cout << "error tried to recycle a pointer we don't control" << std::endl;
// error tried to recycle a pointer we don't control
return;
}
@@ -158,14 +174,9 @@ public:
// Performs masking and header generation if it has not been done already.
void set_prepared(bool b);
bool get_prepared() const;
void acquire();
void release();
bool done() const;
void mask();
int m_max_refcount;
// RC
// pool management interface
void set_live();
size_t get_index() const;
private:
@@ -207,13 +218,12 @@ private:
unsigned char m_masking_key[4];
// m_masking_index can take on
index_value m_masking_index;
// Message buffers
int m_refcount;
std::string m_header;
std::string m_payload;
bool m_prepared;
// reference counting
size_t m_index;
mutable boost::detail::atomic_count m_ref_count;

View File

@@ -188,9 +188,6 @@ public:
void reset() {
m_state = hybi_legacy_state::INIT;
if (m_data_message) {
m_connection.recycle(m_data_message);
}
m_data_message.reset();
}