work on write queue/flow control

This commit is contained in:
Peter Thorson
2012-01-03 06:22:42 -06:00
parent 3ff8775c1c
commit fa7cd63706
12 changed files with 295 additions and 117 deletions

View File

@@ -102,7 +102,15 @@ public:
}
void command_error(connection_ptr connection,const std::string msg) {
connection->send("{\"type\":\"error\",\"value\":\""+msg+"\"}");
websocketpp::message::data_ptr m = connection->get_data_message();
if (m) {
m->reset(frame::opcode::TEXT);
m->set_payload("{\"type\":\"error\",\"value\":\""+msg+"\"}");
connection->send(m);
} else {
// error no avaliable message buffers
}
}
// close: - close this connection
@@ -128,41 +136,53 @@ public:
return;
}
long milli_seconds = get_ms(m_epoch);
std::stringstream update;
update << "{\"type\":\"stats\""
<< ",\"timestamp\":" << milli_seconds
<< ",\"connections\":" << m_broadcast_handler->get_connection_count()
<< ",\"admin_connections\":" << m_connections.size()
<< ",\"messages\":[";
const msg_map& m = m_broadcast_handler->get_message_stats();
msg_map::const_iterator msg_it;
msg_map::const_iterator last = m.end();
if (m.size() > 0) {
last--;
}
for (msg_it = m.begin(); msg_it != m.end(); msg_it++) {
update << "{\"id\":" << (*msg_it).second.id
<< ",\"hash\":\"" << (*msg_it).second.hash << "\""
<< ",\"sent\":" << (*msg_it).second.sent
<< ",\"acked\":" << (*msg_it).second.acked
<< ",\"size\":" << (*msg_it).second.size
<< ",\"time\":" << (*msg_it).second.time
<< "}" << (msg_it == last ? "" : ",");
}
update << "]}";
m_broadcast_handler->clear_message_stats();
typename std::set<connection_ptr>::iterator it;
for (it = m_connections.begin(); it != m_connections.end(); it++) {
(*it)->send(update.str(),false);
if (m_connections.size() > 0) {
long milli_seconds = get_ms(m_epoch);
std::stringstream update;
update << "{\"type\":\"stats\""
<< ",\"timestamp\":" << milli_seconds
<< ",\"connections\":" << m_broadcast_handler->get_connection_count()
<< ",\"admin_connections\":" << m_connections.size()
<< ",\"messages\":[";
const msg_map& m = m_broadcast_handler->get_message_stats();
msg_map::const_iterator msg_it;
msg_map::const_iterator last = m.end();
if (m.size() > 0) {
last--;
}
for (msg_it = m.begin(); msg_it != m.end(); msg_it++) {
update << "{\"id\":" << (*msg_it).second.id
<< ",\"hash\":\"" << (*msg_it).second.hash << "\""
<< ",\"sent\":" << (*msg_it).second.sent
<< ",\"acked\":" << (*msg_it).second.acked
<< ",\"size\":" << (*msg_it).second.size
<< ",\"time\":" << (*msg_it).second.time
<< "}" << (msg_it == last ? "" : ",");
}
update << "]}";
m_broadcast_handler->clear_message_stats();
typename std::set<connection_ptr>::iterator it;
websocketpp::message::data_ptr msg = (*m_connections.begin())->get_data_message();
if (msg) {
msg->reset(frame::opcode::TEXT);
msg->set_payload(update.str());
for (it = m_connections.begin(); it != m_connections.end(); it++) {
(*it)->send(msg);
}
} else {
// error no avaliable message buffers
}
}
m_timer->expires_from_now(boost::posix_time::milliseconds(250));

View File

@@ -90,17 +90,14 @@ public:
}
void on_message(connection_ptr connection,message::data_ptr msg) {
typename std::set<connection_ptr>::iterator it;
wscmd::cmd command = wscmd::parse(msg->get_payload());
wscmd::cmd command = wscmd::parse(msg->get_payload());
if (command.command == "ack") {
handle_ack(connection,command);
connection->recycle(msg);
} else {
broadcast_message(msg);
}
connection->recycle(msg);
}
void command_error(connection_ptr connection,const std::string msg) {
@@ -160,11 +157,15 @@ public:
typename std::set<connection_ptr>::iterator it;
// broadcast to clients
// broadcast to clients
for (it = m_connections.begin(); it != m_connections.end(); it++) {
(*it)->send(msg->get_payload(),(msg->get_opcode() == frame::opcode::BINARY));
//(*it)->send(msg->get_payload(),(msg->get_opcode() == frame::opcode::BINARY));
for (int i = 0; i < 100; i++) {
(*it)->send(msg);
}
}
new_msg.sent = m_connections.size();
new_msg.sent = m_connections.size()*100;
new_msg.acked = 0;
}

View File

@@ -49,26 +49,37 @@ int main(int argc, char* argv[]) {
bool tls = false;
// 12288 is max OS X limit without changing kernal settings
const rlim_t ideal_size = 100000;
const rlim_t ideal_size = 10000;
rlim_t old_size;
rlim_t old_max;
struct rlimit rl;
int result;
result = getrlimit(RLIMIT_NOFILE, &rl);
if (result == 0) {
std::cout << "cur: " << rl.rlim_cur << " max: " << rl.rlim_max << std::endl;
//std::cout << "System FD limits: " << rl.rlim_cur << " max: " << rl.rlim_max << std::endl;
old_size = rl.rlim_cur;
old_max = rl.rlim_max;
if (rl.rlim_cur < ideal_size) {
rl.rlim_cur = ideal_size;
//rl.rlim_cur = rl.rlim_max;
result = setrlimit(RLIMIT_NOFILE, &rl);
std::cout << "Attempting to raise system file descriptor limit from " << rl.rlim_cur << " to " << ideal_size << std::endl;
rl.rlim_cur = ideal_size;
if (result != 0) {
std::cout << "Unable to request an increase in the file descripter limit. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << std::endl;
}
if (rl.rlim_max < ideal_size) {
rl.rlim_max = ideal_size;
}
result = setrlimit(RLIMIT_NOFILE, &rl);
if (result == 0) {
std::cout << "Success" << std::endl;
} else if (result == EPERM) {
std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: Insufficient permissions. Try running process as root. system max: " << old_max << std::endl;
} else {
std::cout << "Failed. This server will be limited to " << old_size << " concurrent connections. Error code: " << errno << " system max: " << old_max << std::endl;
}
}
}
@@ -97,9 +108,11 @@ int main(int argc, char* argv[]) {
plain_handler_ptr h(new websocketpp::broadcast::server_handler<plain_endpoint_type>());
plain_endpoint_type e(h);
e.alog().set_level(websocketpp::log::alevel::ALL);
e.elog().set_level(websocketpp::log::elevel::ALL);
e.alog().unset_level(websocketpp::log::alevel::ALL);
e.elog().unset_level(websocketpp::log::elevel::ALL);
e.alog().set_level(websocketpp::log::alevel::DEVEL);
std::cout << "Starting WebSocket broadcast server on port " << port << std::endl;
e.listen(port);
}

View File

@@ -136,7 +136,7 @@ public:
// Valid for OPEN state
void send(const utf8_string& payload,bool binary = false) {
binary_string_ptr msg;
/*binary_string_ptr msg;
if (binary) {
msg = m_processor->prepare_frame(frame::opcode::BINARY,!m_endpoint.is_server(),payload);
} else {
@@ -154,23 +154,30 @@ public:
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));
msg));*/
}
void send(const binary_string& data) {
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY,
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY,
!m_endpoint.is_server(),data));
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));
msg));*/
}
void send(message::data_ptr msg) {
m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand());
write_message(msg);
}
void close(close::status::value code, const utf8_string& reason = "") {
// TODO: overloads without code or reason?
send_close(code, reason);
}
void ping(const binary_string& payload) {
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING,
// TODO:
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING,
!m_endpoint.is_server(),
payload));
@@ -178,16 +185,17 @@ public:
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));
msg)); */
}
void pong(const binary_string& payload) {
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG,
// TODO:
/*binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG,
!m_endpoint.is_server(),payload));
m_endpoint.endpoint_base::m_io_service.post(
boost::bind(
&type::write_message,
type::shared_from_this(),
msg));
msg));*/
}
uint64_t buffered_amount() const {
@@ -219,19 +227,12 @@ public:
// flow control interface
message::data_ptr get_data_message() {
// if we have one of this type free
if (!m_read_queue_avaliable.empty()) {
message::data_ptr p = m_read_queue_avaliable.front();
m_read_queue_avaliable.pop();
m_read_queue_used.insert(p);
return p;
} else {
return message::data_ptr();
}
return m_endpoint.get_data_message();
}
void recycle(message::data_ptr p) {
if (m_read_queue_used.erase(p) == 0) {
m_endpoint.recycle(p);
/*if (m_read_queue_used.erase(p) == 0) {
// tried to recycle a pointer we don't control.
} else {
m_read_queue_avaliable.push(p);
@@ -246,7 +247,7 @@ public:
);
m_read_state = READING;
}
}
}*/
}
message::control_ptr get_control_message() {
@@ -290,7 +291,7 @@ protected:
role_type::async_init();
}
public:
void handle_read_frame(const boost::system::error_code& error) {
// check if state changed while we were waiting for a read.
if (m_state == session::state::CLOSED) { return; }
@@ -353,7 +354,8 @@ protected:
// we need to wait for a message to be returned by the
// client. We exit the read loop. handle_read_frame
// will be restarted by recycle()
m_read_state = WAITING;
//m_read_state = WAITING;
m_endpoint.wait(type::shared_from_this());
return;
default:
// Fatal error, forcibly end connection immediately.
@@ -384,7 +386,7 @@ protected:
);
}
}
protected:
void process_data(message::data_ptr msg) {
get_handler()->on_message(type::shared_from_this(),msg);
}
@@ -396,10 +398,10 @@ protected:
response = get_handler()->on_ping(type::shared_from_this(),
msg->get_payload());
if (response) {
// send response ping
write_message(m_processor->prepare_frame(frame::opcode::PONG,
!m_endpoint.is_server(),
msg->get_payload()));
// TODO: send response ping
//write_message(m_processor->prepare_frame(frame::opcode::PONG,
// !m_endpoint.is_server(),
// msg->get_payload()));
}
break;
case frame::opcode::PONG:
@@ -473,10 +475,13 @@ protected:
m_local_close_code = code;
m_local_close_reason = reason;
write_message(m_processor->prepare_close_frame(m_local_close_code,
!m_endpoint.is_server(),
m_local_close_reason));
// TODO: fix
message::data_ptr msg = get_data_message();
msg->reset(frame::opcode::CLOSE);
// TODO: msg payload set_status
m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand());
write_message(msg);
m_write_state = INTURRUPT;
}
@@ -511,24 +516,33 @@ protected:
// current write completes.
write_message(m_processor->prepare_close_frame(m_local_close_code,
!m_endpoint.is_server(),
m_local_close_reason));
// TODO: fix
message::data_ptr msg = get_data_message();
msg->reset(frame::opcode::CLOSE);
// TODO: msg payload set_status
m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand());
write_message(msg);
//write_message(m_processor->prepare_close_frame(m_local_close_code,
// !m_endpoint.is_server(),
// m_local_close_reason));
m_write_state = INTURRUPT;
}
void write_message(binary_string_ptr msg) {
void write_message(message::data_ptr msg) {
if (m_write_state == INTURRUPT) {
return;
}
m_write_buffer += msg->size();
m_write_buffer += msg->get_payload().size();
msg->acquire();
m_write_queue.push(msg);
write();
}
void write() {
switch (m_write_state) {
switch (m_write_state) {
case IDLE:
break;
case WRITING:
@@ -538,7 +552,11 @@ protected:
case INTURRUPT:
// clear the queue except for the last message
while (m_write_queue.size() > 1) {
m_write_buffer -= m_write_queue.front()->size();
m_write_buffer -= m_write_queue.front()->get_payload().size();
m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
recycle(m_write_queue.front());
}
m_write_queue.pop();
}
break;
@@ -552,9 +570,10 @@ protected:
m_write_state = WRITING;
}
std::vector<boost::asio::mutable_buffer> data;
std::vector<boost::asio::const_buffer> data;
data.push_back(boost::asio::buffer(*m_write_queue.front()));
data.push_back(boost::asio::buffer(m_write_queue.front()->get_header()));
data.push_back(boost::asio::buffer(m_write_queue.front()->get_payload()));
boost::asio::async_write(
socket_type::get_socket(),
@@ -575,7 +594,7 @@ protected:
}
void handle_write(const boost::system::error_code& error) {
if (error) {
if (error) {
if (error == boost::asio::error::operation_aborted) {
// previous write was aborted
m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) << "handle_write was called with operation_aborted error" << log::endl;
@@ -591,9 +610,18 @@ protected:
return;
}
m_write_buffer -= m_write_queue.front()->size();
m_write_buffer -= m_write_queue.front()->get_payload().size();
m_write_queue.front()->release();
if (m_write_queue.front()->done()) {
if (m_write_queue.front()->get_payload().size() > 0 &&
(m_write_queue.front()->get_payload())[0] != '{') {
m_endpoint.alog().at(log::alevel::DEVEL) << "Recycling message, maxcount: " << m_write_queue.front()->m_max_refcount << log::endl;
}
recycle(m_write_queue.front());
}
m_write_queue.pop();
if (m_write_state == WRITING) {
m_write_state = IDLE;
}
@@ -697,7 +725,7 @@ protected:
processor::ptr m_processor;
// Write queue
std::queue<binary_string_ptr> m_write_queue;
std::queue<message::data_ptr> m_write_queue;
uint64_t m_write_buffer;
write_state m_write_state;

View File

@@ -203,11 +203,80 @@ protected:
handler_ptr get_handler() {
return m_handler;
}
message::data_ptr get_data_message() {
// if we have one of this type free
if (!m_read_queue_avaliable.empty()) {
message::data_ptr p = m_read_queue_avaliable.front();
m_read_queue_avaliable.pop();
m_read_queue_used.insert(p);
return p;
} else {
if (m_read_queue_used.size() > 10) {
return message::data_ptr();
} else {
m_read_queue_avaliable.push(message::data_ptr(new message::data()));
alog().at(log::alevel::DEVEL)
<< "Allocating new data message. Count is now: "
<< m_read_queue_used.size() + 1
<< log::endl;
return get_data_message();
}
}
}
void recycle(message::data_ptr p) {
if (m_read_queue_used.erase(p) == 0) {
// tried to recycle a pointer we don't control.
} else {
m_read_queue_avaliable.push(p);
// wake next
if (!m_read_waiting.empty()) {
connection_ptr next = m_read_waiting.front();
/*endpoint_base::m_io_service.post(
boost::bind(
&connection_type::handle_read_frame,
next,
boost::system::error_code()
)
);*/
(*next).handle_read_frame(boost::system::error_code());
m_read_waiting.pop();
}
// wake all
/*std::list<connection_ptr>::iterator it;
for (it = m_read_waiting.begin(); it != m_read_waiting.end(); it++) {
endpoint_base::m_io_service.post(
boost::bind(
&connection_type::handle_read_frame,
*it,
boost::system::error_code()
)
);
}
m_read_waiting.empty();*/
}
}
void wait(connection_ptr con) {
m_read_waiting.push(con);
}
private:
handler_ptr m_handler;
std::set<connection_ptr> m_connections;
alogger_type m_alog;
elogger_type m_elog;
// mssage buffers
std::queue<message::data_ptr> m_read_queue_avaliable;
std::set<message::data_ptr> m_read_queue_used;
std::queue<connection_ptr> m_read_waiting;
};
/// traits class that allows looking up relevant endpoint types by the fully

View File

@@ -32,7 +32,7 @@
using websocketpp::message::data;
data::data() {
data::data() : m_refcount(-1) {
m_payload.reserve(PAYLOAD_SIZE_INIT);
}
@@ -43,6 +43,9 @@ websocketpp::frame::opcode::value data::get_opcode() const {
const std::string& data::get_payload() const {
return m_payload;
}
const std::string& data::get_header() const {
return m_header;
}
uint64_t data::process_payload(std::istream& input,uint64_t size) {
unsigned char c;
@@ -101,6 +104,8 @@ void data::reset(frame::opcode::value opcode) {
m_masking_index = M_NOT_MASKED; // -1 indicates do not mask/unmask
m_payload.resize(0);
m_validator.reset();
m_refcount = -1;
m_max_refcount = 0;
}
void data::complete() {
@@ -117,22 +122,36 @@ void data::set_masking_key(int32_t key) {
m_masking_index = (key == 0 ? M_MASK_KEY_ZERO : M_BYTE_0);
}
void data::set_prepared(bool b) {
m_refcount = 0;
}
bool data::get_prepared() const {
return m_refcount >= 0;
}
void data::acquire() {
assert(m_refcount >= 0);
m_refcount++;
if (m_refcount > m_max_refcount) {
m_max_refcount = m_refcount;
}
}
void data::release() {
assert(m_refcount > 0);
m_refcount--;
}
bool data::done() const {
return m_refcount == 0;
}
// This could be further optimized using methods that write directly into the
// m_payload buffer
void data::set_payload(const std::string& payload) {
m_payload.reserve(payload.size());
std::copy(payload.begin(), payload.end(), m_payload.begin());
m_payload = payload;
}
void data::process() {
websocketpp::processor::hybi_header header;
header.set_fin(true);
header.set_opcode(m_opcode);
// set opcode
// set
// mask
void data::mask() {
if (m_masking_index >= 0) {
for (std::string::iterator it = m_payload.begin(); it != m_payload.end(); it++) {
(*it) = *it ^ m_masking_key[(m_masking_index++)%4];

View File

@@ -45,6 +45,7 @@ public:
frame::opcode::value get_opcode() const;
const std::string& get_payload() const;
const std::string& get_header() const;
// ##reading##
// sets the masking key to be used to unmask as bytes are read.
@@ -66,8 +67,14 @@ public:
void set_header(const std::string& header);
// Performs masking and header generation if it has not been done already.
void process();
void set_prepared(bool b);
bool get_prepared() const;
void acquire();
void release();
bool done() const;
void mask();
int m_max_refcount;
private:
static const uint64_t PAYLOAD_SIZE_INIT = 1000; // 1KB
static const uint64_t PAYLOAD_SIZE_MAX = 100000000;// 100MB
@@ -93,6 +100,8 @@ private:
index_value m_masking_index;
// Message buffers
int m_refcount;
std::string m_header;
std::string m_payload;
};

View File

@@ -495,6 +495,9 @@ public:
// new prepare frame stuff
void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) {
if (msg->get_prepared()) {
return;
}
m_write_header.reset();
m_write_header.set_fin(true);
m_write_header.set_opcode(msg->get_opcode());
@@ -503,6 +506,11 @@ public:
m_write_header.complete();
msg->set_header(m_write_header.get_header_bytes());
if (masked) {
msg->mask();
}
msg->set_prepared(true);
}

View File

@@ -112,16 +112,16 @@ void hybi_header::set_masked(bool masked,int32_t key) {
}
void hybi_header::set_payload_size(uint64_t size) {
if (size <= frame::limits::PAYLOAD_SIZE_BASIC) {
m_header[1] &= (size & BPB1_PAYLOAD);
m_header[1] |= size;
m_payload_size = size;
} else if (size <= frame::limits::PAYLOAD_SIZE_EXTENDED) {
if (get_masked()) {
// shift mask bytes to the correct position given the new size
unsigned int mask_offset = get_header_len()-4;
m_header[1] &= (BASIC_PAYLOAD_16BIT_CODE & BPB1_PAYLOAD);
m_header[1] |= BASIC_PAYLOAD_16BIT_CODE;
memcpy(&m_header[get_header_len()-4], &m_header[mask_offset], 4);
} else {
m_header[1] &= (BASIC_PAYLOAD_16BIT_CODE & BPB1_PAYLOAD);
m_header[1] |= BASIC_PAYLOAD_16BIT_CODE;
}
m_payload_size = size;
*(reinterpret_cast<uint16_t*>(&m_header[BASIC_HEADER_LENGTH])) = htons(size);
@@ -129,16 +129,17 @@ void hybi_header::set_payload_size(uint64_t size) {
if (get_masked()) {
// shift mask bytes to the correct position given the new size
unsigned int mask_offset = get_header_len()-4;
m_header[1] &= (BASIC_PAYLOAD_64BIT_CODE & BPB1_PAYLOAD);
m_header[1] |= BASIC_PAYLOAD_64BIT_CODE;
memcpy(&m_header[get_header_len()-4], &m_header[mask_offset], 4);
} else {
m_header[1] &= (BASIC_PAYLOAD_64BIT_CODE & BPB1_PAYLOAD);
m_header[1] |= BASIC_PAYLOAD_64BIT_CODE;
}
m_payload_size = size;
*(reinterpret_cast<uint64_t*>(&m_header[BASIC_HEADER_LENGTH])) = htonll(size);
} else {
throw processor::exception("set_payload_size called with value that was too large (>2^63)",processor::error::MESSAGE_TOO_BIG);
}
}
void hybi_header::complete() {
validate_basic_header();

View File

@@ -256,6 +256,16 @@ public:
return response;
}
void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) {
if (msg->get_prepared()) {
return;
}
msg->set_header(std::string(0x00));
// TODO: append 0xFF
msg->set_prepared(true);
}
private:
uint32_t decode_client_key(const std::string& key) {
int spaces = 0;

View File

@@ -132,7 +132,7 @@ public:
bool mask,
const std::string& reason) = 0;
virtual void prepare_frame(message::data_ptr msg, bool masked, int32_t mask) = 0;
};

View File

@@ -352,7 +352,7 @@ void server<endpoint>::connection<connection_type>::handle_read_request(
}
// TODO: is there a way to short circuit this or something?
m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl;
//m_endpoint.alog().at(log::alevel::DEBUG_HANDSHAKE) << m_request.raw() << log::endl;
std::string h = m_request.header("Upgrade");
if (boost::ifind_first(h,"websocket")) {