mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
updates connection to new processor read interface
This commit is contained in:
@@ -48,6 +48,7 @@
|
||||
#include <vector>
|
||||
#include <string>
|
||||
#include <queue>
|
||||
#include <set>
|
||||
|
||||
namespace websocketpp {
|
||||
|
||||
@@ -84,8 +85,6 @@ public:
|
||||
//friend class role<endpoint>;
|
||||
//friend class socket<endpoint>;
|
||||
|
||||
//friend class role<endpoint>::template connection<type>;
|
||||
//friend class socket<endpoint>::template connection<type>;
|
||||
friend class role< connection<endpoint,role,socket> >;
|
||||
friend class socket< connection<endpoint,role,socket> >;
|
||||
|
||||
@@ -95,6 +94,11 @@ public:
|
||||
INTURRUPT = 2
|
||||
};
|
||||
|
||||
enum read_state {
|
||||
READING = 0,
|
||||
WAITING = 1
|
||||
};
|
||||
|
||||
connection(endpoint_type& e,handler_ptr h)
|
||||
: role_type(e),
|
||||
socket_type(e),
|
||||
@@ -103,9 +107,14 @@ public:
|
||||
m_timer(e.endpoint_base::m_io_service,boost::posix_time::seconds(0)),
|
||||
m_state(session::state::CONNECTING),
|
||||
m_write_buffer(0),
|
||||
m_write_state(IDLE)
|
||||
m_write_state(IDLE),
|
||||
m_remote_close_code(close::status::ABNORMAL_CLOSE),
|
||||
m_read_state(READING)
|
||||
{
|
||||
socket_type::init();
|
||||
|
||||
m_control_message = message::control_ptr(new message::control());
|
||||
m_read_queue_avaliable.push(message::data_ptr(new message::data()));
|
||||
}
|
||||
|
||||
// SHOULD BE PROTECTED
|
||||
@@ -127,15 +136,26 @@ public:
|
||||
}
|
||||
|
||||
// Valid for OPEN state
|
||||
void send(const utf8_string& payload) {
|
||||
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::TEXT,
|
||||
false,payload));
|
||||
void send(const utf8_string& payload,bool binary = false) {
|
||||
binary_string_ptr msg;
|
||||
if (binary) {
|
||||
msg = m_processor->prepare_frame(frame::opcode::BINARY,false,payload);
|
||||
} else {
|
||||
msg = m_processor->prepare_frame(frame::opcode::TEXT,false,payload);
|
||||
}
|
||||
|
||||
m_endpoint.endpoint_base::m_io_service.post(
|
||||
// TODO: decide which of these to use. Direct function call better
|
||||
// ensures that writes triggered by reads sent immediately prior to a
|
||||
// close frame get written before the acknowledgement close frame.
|
||||
// The async option will probably reduce latency
|
||||
// See Autobahn test 7.1.3
|
||||
write_message(msg);
|
||||
|
||||
/*m_endpoint.endpoint_base::m_io_service.post(
|
||||
boost::bind(
|
||||
&type::write_message,
|
||||
type::shared_from_this(),
|
||||
msg));
|
||||
msg)); */
|
||||
}
|
||||
void send(const binary_string& data) {
|
||||
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::BINARY,
|
||||
@@ -153,7 +173,7 @@ public:
|
||||
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PING,
|
||||
false,payload));
|
||||
|
||||
m_endpoint.m_io_service.post(
|
||||
m_endpoint.endpoint_base::m_io_service.post(
|
||||
boost::bind(
|
||||
&type::write_message,
|
||||
type::shared_from_this(),
|
||||
@@ -162,7 +182,7 @@ public:
|
||||
void pong(const binary_string& payload) {
|
||||
binary_string_ptr msg(m_processor->prepare_frame(frame::opcode::PONG,
|
||||
false,payload));
|
||||
m_endpoint.m_io_service.post(
|
||||
m_endpoint.endpoint_base::m_io_service.post(
|
||||
boost::bind(
|
||||
&type::write_message,
|
||||
type::shared_from_this(),
|
||||
@@ -199,18 +219,47 @@ public:
|
||||
// flow control interface
|
||||
message::data_ptr get_data_message() {
|
||||
// if we have one of this type free
|
||||
if (!m_read_queue_data.empty()) {
|
||||
message::data_ptr p = m_read_queue_data.front();
|
||||
m_read_queue_data.pop();
|
||||
if (!m_read_queue_avaliable.empty()) {
|
||||
message::data_ptr p = m_read_queue_avaliable.front();
|
||||
m_read_queue_avaliable.pop();
|
||||
m_read_queue_used.insert(p);
|
||||
return p;
|
||||
} else {
|
||||
return message::data_ptr();
|
||||
}
|
||||
}
|
||||
|
||||
void recycle(message::data_ptr p) {
|
||||
if (m_read_queue_used.erase(p) == 0) {
|
||||
// tried to recycle a pointer we don't control.
|
||||
} else {
|
||||
m_read_queue_avaliable.push(p);
|
||||
|
||||
if (m_read_state == WAITING) {
|
||||
m_endpoint.endpoint_base::m_io_service.post(
|
||||
boost::bind(
|
||||
&type::handle_read_frame,
|
||||
type::shared_from_this(),
|
||||
boost::system::error_code()
|
||||
)
|
||||
);
|
||||
m_read_state = READING;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
message::control_ptr get_control_message() {
|
||||
return m_control_message;
|
||||
}
|
||||
|
||||
// TODO: deprecated. will change to get_rng?
|
||||
int32_t gen() {
|
||||
return 0;
|
||||
}
|
||||
|
||||
typename endpoint::alogger_type& alog() {
|
||||
return m_endpoint.alog();
|
||||
}
|
||||
protected:
|
||||
void handle_socket_init(const boost::system::error_code& error) {
|
||||
if (error) {
|
||||
@@ -272,7 +321,7 @@ protected:
|
||||
case processor::error::PAYLOAD_VIOLATION:
|
||||
send_close(close::status::INVALID_PAYLOAD, e.what());
|
||||
break;
|
||||
case processor::error::INTERNAL_SERVER_ERROR:
|
||||
case processor::error::INTERNAL_ENDPOINT_ERROR:
|
||||
send_close(close::status::INTERNAL_ENDPOINT_ERROR, e.what());
|
||||
break;
|
||||
case processor::error::SOFT_ERROR:
|
||||
@@ -284,6 +333,7 @@ protected:
|
||||
// we need to wait for a message to be returned by the
|
||||
// client. We exit the read loop. handle_read_frame
|
||||
// will be restarted by recycle()
|
||||
m_read_state = WAITING;
|
||||
return;
|
||||
default:
|
||||
// Fatal error, forcibly end connection immediately.
|
||||
@@ -314,141 +364,50 @@ protected:
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
void handle_read_frame_old(const boost::system::error_code& error) {
|
||||
// check if state changed while we were waiting for a read.
|
||||
if (m_state == session::state::CLOSED) { return; }
|
||||
|
||||
if (error) {
|
||||
if (error == boost::asio::error::eof) {
|
||||
// got unexpected EOF
|
||||
// TODO: log error
|
||||
terminate(false);
|
||||
} else if (error == boost::asio::error::operation_aborted) {
|
||||
// got unexpected abort (likely our server issued an abort on
|
||||
// all connections on this io_service)
|
||||
|
||||
// TODO: log error
|
||||
terminate(true);
|
||||
} else {
|
||||
// Other unexpected error
|
||||
|
||||
// TODO: log error
|
||||
terminate(false);
|
||||
}
|
||||
}
|
||||
|
||||
// process data from the buffer just read into
|
||||
std::istream s(&m_buf);
|
||||
|
||||
while (m_state != session::state::CLOSED && m_buf.size() > 0) {
|
||||
try {
|
||||
m_processor->consume(s);
|
||||
|
||||
if (m_processor->ready()) {
|
||||
process_message();
|
||||
m_processor->reset();
|
||||
}
|
||||
} catch (const processor::exception& e) {
|
||||
if (m_processor->ready()) {
|
||||
m_processor->reset();
|
||||
}
|
||||
|
||||
if (e.code() == processor::error::PROTOCOL_VIOLATION) {
|
||||
send_close(close::status::PROTOCOL_ERROR, e.what());
|
||||
} else if (e.code() == processor::error::PAYLOAD_VIOLATION) {
|
||||
send_close(close::status::INVALID_PAYLOAD, e.what());
|
||||
} else if (e.code() == processor::error::INTERNAL_SERVER_ERROR) {
|
||||
send_close(close::status::POLICY_VIOLATION, e.what());
|
||||
} else if (e.code() == processor::error::SOFT_ERROR) {
|
||||
// ignore and continue processing frames
|
||||
continue;
|
||||
} else {
|
||||
// Fatal error, forcibly end connection immediately.
|
||||
m_endpoint.elog().at(log::elevel::DEVEL)
|
||||
<< "Dropping TCP due to unrecoverable exception"
|
||||
<< log::endl;
|
||||
terminate(true);
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// try and read more
|
||||
if (m_state != session::state::CLOSED &&
|
||||
m_processor->get_bytes_needed() > 0) {
|
||||
// TODO: read timeout timer?
|
||||
|
||||
boost::asio::async_read(
|
||||
socket_type::get_socket(),
|
||||
m_buf,
|
||||
boost::asio::transfer_at_least(m_processor->get_bytes_needed()),
|
||||
boost::bind(
|
||||
&type::handle_read_frame,
|
||||
type::shared_from_this(),
|
||||
boost::asio::placeholders::error
|
||||
)
|
||||
);
|
||||
}
|
||||
void process_data(message::data_ptr msg) {
|
||||
m_endpoint.get_handler()->on_message(type::shared_from_this(),msg);
|
||||
}
|
||||
|
||||
void process_message() {
|
||||
void process_control(message::control_ptr msg) {
|
||||
bool response;
|
||||
switch (m_processor->get_opcode()) {
|
||||
case frame::opcode::TEXT:
|
||||
m_endpoint.get_handler()->on_message(
|
||||
type::shared_from_this(),
|
||||
m_processor->get_utf8_payload());
|
||||
break;
|
||||
case frame::opcode::BINARY:
|
||||
m_endpoint.get_handler()->on_message(
|
||||
type::shared_from_this(),
|
||||
m_processor->get_binary_payload());
|
||||
break;
|
||||
switch (msg->get_opcode()) {
|
||||
case frame::opcode::PING:
|
||||
response = m_endpoint.get_handler()->on_ping(
|
||||
type::shared_from_this(),
|
||||
m_processor->get_binary_payload());
|
||||
msg->get_payload()
|
||||
);
|
||||
|
||||
if (response) {
|
||||
// send response ping
|
||||
write_message(m_processor->prepare_frame(frame::opcode::PONG,false,*m_processor->get_binary_payload()));
|
||||
write_message(m_processor->prepare_frame(frame::opcode::PONG,false,msg->get_payload()));
|
||||
}
|
||||
break;
|
||||
case frame::opcode::PONG:
|
||||
m_endpoint.get_handler()->on_pong(
|
||||
type::shared_from_this(),
|
||||
m_processor->get_binary_payload());
|
||||
msg->get_payload());
|
||||
|
||||
// TODO: disable ping response timer
|
||||
|
||||
break;
|
||||
case frame::opcode::CLOSE:
|
||||
m_remote_close_code = m_processor->get_close_code();
|
||||
m_remote_close_reason = m_processor->get_close_reason();
|
||||
m_remote_close_code = msg->get_close_code();
|
||||
m_remote_close_reason = msg->get_close_reason();
|
||||
|
||||
// check that the codes we got over the wire are valid
|
||||
|
||||
if (close::status::invalid(m_remote_close_code)) {
|
||||
throw processor::exception("Invalid close code",processor::error::PROTOCOL_VIOLATION);
|
||||
}
|
||||
|
||||
if (close::status::reserved(m_remote_close_code)) {
|
||||
throw processor::exception("Reserved close code",processor::error::PROTOCOL_VIOLATION);
|
||||
}
|
||||
|
||||
if (m_state == session::state::OPEN) {
|
||||
// other end is initiating
|
||||
m_endpoint.elog().at(log::elevel::DEVEL)
|
||||
<< "sending close ack" << log::endl;
|
||||
<< "sending close ack" << log::endl;
|
||||
|
||||
// TODO:
|
||||
send_close_ack();
|
||||
} else if (m_state == session::state::CLOSING) {
|
||||
// ack of our close
|
||||
m_endpoint.elog().at(log::elevel::DEVEL)
|
||||
<< "got close ack" << log::endl;
|
||||
<< "got close ack" << log::endl;
|
||||
|
||||
terminate(false);
|
||||
// TODO: start terminate timer (if client)
|
||||
@@ -458,7 +417,6 @@ protected:
|
||||
throw processor::exception("Invalid Opcode",processor::error::PROTOCOL_VIOLATION);
|
||||
break;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
void send_close(close::status::value code, const std::string& reason) {
|
||||
@@ -515,9 +473,11 @@ protected:
|
||||
// frame
|
||||
throw "shouldn't be here";
|
||||
} else if (close::status::invalid(m_remote_close_code)) {
|
||||
// TODO: shouldn't be able to get here now either
|
||||
m_local_close_code = close::status::PROTOCOL_ERROR;
|
||||
m_local_close_reason = "Status code is invalid";
|
||||
} else if (close::status::reserved(m_remote_close_code)) {
|
||||
// TODO: shouldn't be able to get here now either
|
||||
m_local_close_code = close::status::PROTOCOL_ERROR;
|
||||
m_local_close_reason = "Status code is reserved";
|
||||
} else {
|
||||
@@ -537,6 +497,10 @@ protected:
|
||||
}
|
||||
|
||||
void write_message(binary_string_ptr msg) {
|
||||
if (m_write_state == INTURRUPT) {
|
||||
return;
|
||||
}
|
||||
|
||||
m_write_buffer += msg->size();
|
||||
m_write_queue.push(msg);
|
||||
write();
|
||||
@@ -722,8 +686,10 @@ protected:
|
||||
bool m_dropped_by_me;
|
||||
|
||||
// Read queue
|
||||
read_state m_read_state;
|
||||
message::control_ptr m_control_message;
|
||||
std::queue<message::data_ptr> m_read_queue_data;
|
||||
std::queue<message::data_ptr> m_read_queue_avaliable;
|
||||
std::set<message::data_ptr> m_read_queue_used;
|
||||
};
|
||||
|
||||
// connection related types that it and its policy classes need.
|
||||
|
||||
Reference in New Issue
Block a user