mirror of
https://github.com/XRPLF/rippled.git
synced 2026-04-29 15:37:57 +00:00
updates connection to use control message pool
This commit is contained in:
@@ -134,29 +134,23 @@ public:
|
||||
}
|
||||
|
||||
// Valid for OPEN state
|
||||
void send(const utf8_string& payload,bool binary = false) {
|
||||
/*binary_string_ptr msg;
|
||||
if (binary) {
|
||||
msg = m_processor->prepare_frame(frame::opcode::BINARY,!m_endpoint.is_server(),payload);
|
||||
} else {
|
||||
msg = m_processor->prepare_frame(frame::opcode::TEXT,!m_endpoint.is_server(),payload);
|
||||
}
|
||||
|
||||
// TODO: decide which of these to use. Direct function call better
|
||||
// ensures that writes triggered by reads sent immediately prior to a
|
||||
// close frame get written before the acknowledgement close frame.
|
||||
// The async option will probably reduce latency
|
||||
// See Autobahn test 7.1.3
|
||||
//write_message(msg);
|
||||
|
||||
m_endpoint.endpoint_base::m_io_service.post(
|
||||
boost::bind(
|
||||
&type::write_message,
|
||||
type::shared_from_this(),
|
||||
msg));*/
|
||||
/// convenience overload for sending a one off text message.
|
||||
void send(const std::string& payload, frame::opcode::value op = frame::opcode::TEXT) {
|
||||
websocketpp::message::data::ptr msg = get_control_message2();
|
||||
|
||||
if (!msg) {
|
||||
throw exception("Endpoint send queue is full",error::SEND_QUEUE_FULL);
|
||||
}
|
||||
if (op != frame::opcode::TEXT && op != frame::opcode::BINARY) {
|
||||
throw exception("opcode must be either TEXT or BINARY",error::GENERIC);
|
||||
}
|
||||
|
||||
msg->reset(op);
|
||||
msg->set_payload(payload);
|
||||
send(msg);
|
||||
}
|
||||
void send(message::data_ptr msg) {
|
||||
m_processor->prepare_frame(msg,!m_endpoint.is_server(),rand());
|
||||
m_processor->prepare_frame(msg);
|
||||
write_message(msg);
|
||||
}
|
||||
|
||||
@@ -204,6 +198,10 @@ public:
|
||||
return m_endpoint.get_data_message();
|
||||
}
|
||||
|
||||
message::data_ptr get_control_message2() {
|
||||
return m_endpoint.get_control_message();
|
||||
}
|
||||
|
||||
message::control_ptr get_control_message() {
|
||||
return m_control_message;
|
||||
}
|
||||
@@ -254,17 +252,20 @@ public:
|
||||
if (error == boost::asio::error::eof) {
|
||||
// got unexpected EOF
|
||||
// TODO: log error
|
||||
m_endpoint.elog().at(log::elevel::ERROR) << "Remote connection dropped unexpectedly" << log::endl;
|
||||
terminate(false);
|
||||
} else if (error == boost::asio::error::operation_aborted) {
|
||||
// got unexpected abort (likely our server issued an abort on
|
||||
// all connections on this io_service)
|
||||
|
||||
// TODO: log error
|
||||
m_endpoint.elog().at(log::elevel::ERROR) << "Terminating due to abort: " << error << log::endl;
|
||||
terminate(true);
|
||||
} else {
|
||||
// Other unexpected error
|
||||
|
||||
// TODO: log error
|
||||
m_endpoint.elog().at(log::elevel::ERROR) << "Terminating due to unknown error: " << error << log::endl;
|
||||
terminate(false);
|
||||
}
|
||||
}
|
||||
@@ -428,7 +429,7 @@ protected:
|
||||
|
||||
// TODO: optimize control messages and handle case where endpoint is
|
||||
// out of messages
|
||||
message::data_ptr msg = get_data_message();
|
||||
message::data_ptr msg = get_control_message2();
|
||||
|
||||
if (!msg) {
|
||||
// server is out of resources, close connection.
|
||||
@@ -440,7 +441,7 @@ protected:
|
||||
}
|
||||
|
||||
msg->reset(frame::opcode::CLOSE);
|
||||
m_processor->prepare_close_frame(msg,!m_endpoint.is_server(),rand(),code,reason);
|
||||
m_processor->prepare_close_frame(msg,code,reason);
|
||||
write_message(msg);
|
||||
|
||||
m_write_state = INTURRUPT;
|
||||
@@ -479,11 +480,19 @@ protected:
|
||||
|
||||
// TODO: optimize control messages and handle case where endpoint is
|
||||
// out of messages
|
||||
message::data_ptr msg = get_data_message();
|
||||
message::data_ptr msg = get_control_message2();
|
||||
|
||||
if (!msg) {
|
||||
// server is out of resources, close connection.
|
||||
m_endpoint.elog().at(log::elevel::ERROR)
|
||||
<< "Server has run out of message buffers."
|
||||
<< log::endl;
|
||||
terminate(true);
|
||||
return;
|
||||
}
|
||||
|
||||
msg->reset(frame::opcode::CLOSE);
|
||||
m_processor->prepare_close_frame(msg,
|
||||
!m_endpoint.is_server(),
|
||||
rand(),
|
||||
m_local_close_code,
|
||||
m_local_close_reason);
|
||||
write_message(msg);
|
||||
@@ -493,20 +502,20 @@ protected:
|
||||
void send_ping(const std::string& payload) {
|
||||
// TODO: optimize control messages and handle case where
|
||||
// endpoint is out of messages
|
||||
message::data_ptr control = get_data_message();
|
||||
message::data_ptr control = get_control_message2();
|
||||
control->reset(frame::opcode::PING);
|
||||
control->set_payload(payload);
|
||||
m_processor->prepare_frame(control,!m_endpoint.is_server(),rand());
|
||||
m_processor->prepare_frame(control);
|
||||
write_message(control);
|
||||
}
|
||||
|
||||
void send_pong(const std::string& payload) {
|
||||
// TODO: optimize control messages and handle case where
|
||||
// endpoint is out of messages
|
||||
message::data_ptr control = get_data_message();
|
||||
message::data_ptr control = get_control_message2();
|
||||
control->reset(frame::opcode::PONG);
|
||||
control->set_payload(payload);
|
||||
m_processor->prepare_frame(control,!m_endpoint.is_server(),rand());
|
||||
m_processor->prepare_frame(control);
|
||||
write_message(control);
|
||||
}
|
||||
|
||||
@@ -551,6 +560,8 @@ protected:
|
||||
data.push_back(boost::asio::buffer(m_write_queue.front()->get_header()));
|
||||
data.push_back(boost::asio::buffer(m_write_queue.front()->get_payload()));
|
||||
|
||||
m_endpoint.alog().at(log::alevel::DEVEL) << "write header: " << to_hex(m_write_queue.front()->get_header()) << log::endl;
|
||||
|
||||
boost::asio::async_write(
|
||||
socket_type::get_socket(),
|
||||
data,
|
||||
@@ -564,6 +575,7 @@ protected:
|
||||
// if we are in an inturrupted state and had nothing else to write
|
||||
// it is safe to terminate the connection.
|
||||
if (m_write_state == INTURRUPT) {
|
||||
m_endpoint.alog().at(log::alevel::DEBUG_CLOSE) << "Exit after inturrupt" << log::endl;
|
||||
terminate(false);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user