completes frame reading changes.

fixes 37, fixes 32
This commit is contained in:
Peter Thorson
2011-10-10 18:35:38 -05:00
parent eec0390618
commit 1ff8d333a5
10 changed files with 375 additions and 188 deletions

View File

@@ -47,6 +47,7 @@ session::session (boost::asio::io_service& io_service,
websocketpp::connection_handler_ptr defc,
uint64_t buf_size)
: m_state(STATE_CONNECTING),
m_writing(false),
m_local_close_code(CLOSE_STATUS_NO_STATUS),
m_remote_close_code(CLOSE_STATUS_NO_STATUS),
m_was_clean(false),
@@ -165,9 +166,6 @@ void session::send_close(uint16_t status,const std::string &message) {
m_state = STATE_CLOSING;
m_close_code = status;
m_close_message = message;
m_local_close_code = status;
m_local_close_msg = message;
@@ -177,8 +175,8 @@ void session::send_close(uint16_t status,const std::string &message) {
if (status == CLOSE_STATUS_NO_STATUS) {
m_write_frame.set_status(CLOSE_STATUS_NORMAL,"");
} else if (status == CLOSE_STATUS_ABNORMAL_CLOSE) {
// unknown internal error, don't set a status? use protocol error?
log("Tried to disconnect with status ABNORMAL_CLOSE",LOG_DEBUG);
// Internal implimentation error. There is no good close code for this.
m_write_frame.set_status(CLOSE_STATUS_POLICY_VIOLATION,message);
} else {
m_write_frame.set_status(status,message);
}
@@ -210,75 +208,136 @@ void session::pong(const std::string &msg) {
write_frame();
}
void session::read_frame() {
// the initial read in the handshake may have read in the first frame.
// handle it (if it exists) before we read anything else.
handle_read_frame(boost::system::error_code());
}
// handle_read_frame reads and processes all socket read commands for the
// session by consuming the read buffer and then starting an async read with
// itself as the callback. The connection is over when this method returns.
void session::handle_read_frame(const boost::system::error_code& error) {
if (error) {
handle_error("Error reading extended frame header",error);
// TODO: close behavior
if (m_state != STATE_OPEN && m_state != STATE_CLOSING) {
log("handle_read_frame called in invalid state",LOG_ERROR);
return;
}
if (m_state != STATE_OPEN && m_state != STATE_CLOSING) {
// stop processing frames.
log("handle_read_frame called in invalid state",LOG_ERROR);
return;
if (error) {
if (error == boost::asio::error::eof) {
// if this is a case where we are expecting eof, return, else log & drop
log_error("Recieved EOF",error);
//drop_tcp(false);
//m_state = STATE_CLOSED;
} else if (error == boost::asio::error::operation_aborted) {
// some other part of our client called shutdown on our socket.
// This is usually due to a write error. Everything should have
// already been logged and dropped so we just return here
return;
} else {
log_error("Error reading frame",error);
//drop_tcp(false);
m_state = STATE_CLOSED;
}
}
std::istream s(&m_buf);
try {
while (m_buf.size() > 0) {
m_read_frame.consume(s);
if (m_read_frame.get_state() == frame::STATE_READY) {
process_frame();
// should we break?
while (m_buf.size() > 0 && m_state != STATE_CLOSED) {
try {
if (m_read_frame.get_bytes_needed() == 0) {
throw frame_error("have bytes that no frame needs",frame::FERR_FATAL_SESSION_ERROR);
}
// Consume will read bytes from s
// will throw a frame_error on error.
std::stringstream err;
err << "consuming. have: " << m_buf.size() << " bytes. Need: " << m_read_frame.get_bytes_needed() << " state: " << (int)m_read_frame.get_state();
log(err.str(),LOG_DEBUG);
m_read_frame.consume(s);
err.str("");
err << "consume complete, " << m_buf.size() << " bytes left, " << m_read_frame.get_bytes_needed() << " still needed, state: " << (int)m_read_frame.get_state();
log(err.str(),LOG_DEBUG);
if (m_read_frame.get_state() == frame::STATE_READY) {
// process frame and reset frame state for the next frame.
// will throw a frame_error on error. May set m_state to CLOSED,
// if so no more frames should be processed.
err.str("");
err << "processing frame " << m_buf.size();
log(err.str(),LOG_DEBUG);
process_frame();
}
} catch (const frame_error& e) {
std::stringstream err;
err << "Caught frame exception: " << e.what();
access_log(e.what(),ALOG_FRAME);
log(err.str(),LOG_ERROR);
// process different types of frame errors
//
if (e.code() == frame::FERR_PROTOCOL_VIOLATION) {
send_close(CLOSE_STATUS_PROTOCOL_ERROR, e.what());
} else if (e.code() == frame::FERR_PAYLOAD_VIOLATION) {
send_close(CLOSE_STATUS_INVALID_PAYLOAD, e.what());
} else if (e.code() == frame::FERR_INTERNAL_SERVER_ERROR) {
send_close(CLOSE_STATUS_ABNORMAL_CLOSE, e.what());
} else if (e.code() == frame::FERR_SOFT_SESSION_ERROR) {
// ignore and continue processing frames
continue;
} else {
// Fatal error, forcibly end connection immediately.
drop_tcp(true);
}
break;
}
} catch (const frame_error& e) {
std::stringstream err;
err << "Caught frame exception: " << e.what();
access_log(e.what(),ALOG_FRAME);
log(err.str(),LOG_ERROR);
if (e.code() == frame::FERR_PROTOCOL_VIOLATION) {
disconnect(CLOSE_STATUS_PROTOCOL_ERROR, e.what());
} else if (e.code() == frame::FERR_PAYLOAD_VIOLATION) {
disconnect(CLOSE_STATUS_INVALID_PAYLOAD, e.what());
} else if (e.code() == frame::FERR_SOFT_SESSION_ERROR) {
// ???
} else {
// Fatal error
disconnect(CLOSE_STATUS_NO_STATUS, "");
}
disconnect(CLOSE_STATUS_PROTOCOL_ERROR,"");
// TODO: close behavior
return;
}
if (error == boost::asio::error::eof) {
m_state = STATE_CLOSED;
}
// we have read everything, check if we should read more
if (m_state != STATE_OPEN && m_state != STATE_CLOSING) {
// stop processing frames.
if ((m_state == STATE_OPEN || m_state == STATE_CLOSING) && m_read_frame.get_bytes_needed() > 0) {
std::stringstream msg;
msg << "starting async read for " << m_read_frame.get_bytes_needed() << " bytes.";
log(msg.str(),LOG_DEBUG);
// TODO: set a timer here in case we don't want to read forever.
// Ex: when the frame is in a degraded state.
boost::asio::async_read(
m_socket,
m_buf,
boost::asio::transfer_at_least(m_read_frame.get_bytes_needed()),
boost::bind(
&session::handle_read_frame,
shared_from_this(),
boost::asio::placeholders::error
)
);
} else if (m_state == STATE_CLOSED) {
log_close_result();
if (m_local_interface) {
m_local_interface->on_close(shared_from_this(),m_remote_close_code,m_remote_close_msg);
}
} else {
log("handle_read_frame called in invalid state",LOG_ERROR);
return;
}
// read more
boost::asio::async_read(
m_socket,
m_buf,
boost::asio::transfer_at_least(m_read_frame.get_bytes_needed()),
boost::bind(
&session::handle_read_frame,
shared_from_this(),
boost::asio::placeholders::error
)
);
}
void session::process_frame () {
log("process_frame",LOG_DEBUG);
if (m_state == STATE_OPEN) {
switch (m_read_frame.get_opcode()) {
case frame::CONTINUATION_FRAME:
@@ -291,6 +350,7 @@ void session::process_frame () {
process_binary();
break;
case frame::CONNECTION_CLOSE:
log("process_close",LOG_DEBUG);
process_close();
break;
case frame::PING:
@@ -300,8 +360,8 @@ void session::process_frame () {
process_pong();
break;
default:
disconnect(CLOSE_STATUS_PROTOCOL_ERROR,"Invalid Opcode");
// TODO: close behavior
throw frame_error("Invalid Opcode",
frame::FERR_PROTOCOL_VIOLATION);
break;
}
} else if (m_state == STATE_CLOSING) {
@@ -309,42 +369,24 @@ void session::process_frame () {
process_close();
} else {
// Ignore all other frames in closing state
log("ignoring this frame",LOG_DEBUG);
}
} else {
// Recieved message before or after connection was opened/closed
// TODO: close behavior
return;
throw frame_error("process_frame called from invalid state");
}
// check if there was an error processing this frame and fail the connection
if (m_error) {
log("Connection has been closed uncleanly",LOG_ERROR);
// TODO: close behavior
return;
}
if (m_status == CLOSED) {
log_close_result();
if (m_local_interface) {
m_local_interface->on_close(shared_from_this(),
m_close_code,
m_close_message);
}
// TODO: close behavior
return;
}
m_read_frame.reset();
}
void session::handle_write_frame (const boost::system::error_code& error) {
if (error) {
handle_error("Error writing frame data",error);
// TODO: close behavior
log_error("Error writing frame data",error);
drop_tcp(false);
}
//std::cout << "Successfully wrote frame." << std::endl;
access_log("handle_write_frame complete",ALOG_FRAME);
m_writing = false;
}
void session::process_ping() {
@@ -417,11 +459,13 @@ void session::process_close() {
m_remote_close_msg = message;
if (m_state == STATE_OPEN) {
log("process_close sending ack",LOG_DEBUG);
// This is the case where the remote initiated the close.
m_closed_by_me = false;
// TODO: close behavior
disconnect(status,message);
// send acknowledgement
send_close(status,message);
} else if (m_state == STATE_CLOSING) {
log("process_close got ack",LOG_DEBUG);
// this is an ack of our close message
m_closed_by_me = true;
} else {
@@ -501,7 +545,9 @@ void session::write_frame() {
);
log("Write Frame: "+m_write_frame.print_frame(),LOG_DEBUG);
m_writing = true;
boost::asio::async_write(
m_socket,
data,
@@ -547,19 +593,13 @@ void session::log_open_result() {
access_log(msg.str(),ALOG_HANDSHAKE);
}
void session::handle_error(std::string msg,
const boost::system::error_code& error) {
std::stringstream e;
// this is called when an async asio call encounters an error
void session::log_error(std::string msg,const boost::system::error_code& e) {
std::stringstream err;
e << "[Connection " << this << "] " << msg << " (" << error << ")";
err << "[Connection " << this << "] " << msg << " (" << e << ")";
log(e.str(),LOG_ERROR);
if (m_local_interface) {
m_local_interface->on_close(shared_from_this(),1006,e.str());
}
m_error = true;
log(err.str(),LOG_ERROR);
}
// validates status codes that the end application is allowed to use
@@ -574,3 +614,12 @@ bool session::validate_app_close_status(uint16_t status) {
return false;
}
void session::drop_tcp(bool dropped_by_me) {
if (m_socket.is_open()) {
m_socket.shutdown(tcp::socket::shutdown_both);
m_socket.close();
}
m_dropped_by_me = dropped_by_me;
m_state = STATE_CLOSED;
}