mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Refactored socket send method and message recieved method (#25)
* Enhanced sockets * Removed unnecessary copy operations * Enhanced sock session send method to recieve rref * Enhanced on_read with rvalue reference * Added string_view * Added comments
This commit is contained in:
@@ -12,11 +12,15 @@ socket_client::socket_client(net::io_context &ioc, socket_session_handler &sessi
|
||||
{
|
||||
}
|
||||
|
||||
void socket_client::run(char const *host, char const *port)
|
||||
/**
|
||||
* Entry point to socket client which will intiate a connection to server
|
||||
*/
|
||||
// boost async_resolve function requires a port as a string because of that port is passed as a string
|
||||
void socket_client::run(std::string_view host, std::string_view port)
|
||||
{
|
||||
host_ = host;
|
||||
port_ = (unsigned short)std::strtoul(port, NULL, 0);
|
||||
|
||||
port_ = port;
|
||||
|
||||
// Look up the domain name
|
||||
resolver_.async_resolve(
|
||||
host,
|
||||
@@ -26,6 +30,9 @@ void socket_client::run(char const *host, char const *port)
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of resolving the server
|
||||
*/
|
||||
void socket_client::on_resolve(error ec, tcp::resolver::results_type results)
|
||||
{
|
||||
if (ec)
|
||||
@@ -39,6 +46,9 @@ void socket_client::on_resolve(error ec, tcp::resolver::results_type results)
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of connecting to the server
|
||||
*/
|
||||
void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
|
||||
{
|
||||
if (ec)
|
||||
@@ -60,13 +70,20 @@ void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_t
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on completion of handshake
|
||||
*/
|
||||
void socket_client::on_handshake(error ec)
|
||||
{
|
||||
//Creates a new socket session object
|
||||
std::make_shared<socket_session>(
|
||||
ws_, sess_handler_)
|
||||
->client_run(port_, host_, ec);
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
void socket_client::socket_client_fail(beast::error_code ec, char const *what)
|
||||
{
|
||||
std::cerr << what << ": " << ec.message() << "\n";
|
||||
|
||||
@@ -16,18 +16,17 @@ using error = boost::system::error_code;
|
||||
|
||||
namespace sock
|
||||
{
|
||||
|
||||
/**
|
||||
* Represents an active WebSocket client connection
|
||||
* Based on the implementation from https://github.com/vinniefalco/CppCon2018
|
||||
*/
|
||||
class socket_client : public std::enable_shared_from_this<socket_client>
|
||||
{
|
||||
tcp::resolver resolver_;
|
||||
websocket::stream<beast::tcp_stream> ws_;
|
||||
std::string host_;
|
||||
unsigned short port_;
|
||||
socket_session_handler &sess_handler_;
|
||||
tcp::resolver resolver_; // resolver used to resolve host and the port
|
||||
websocket::stream<beast::tcp_stream> ws_; // web socket stream used to send and receive messages
|
||||
std::string host_; // address of the server in which the client connects
|
||||
std::string_view port_; // port of the server in which client connects
|
||||
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
|
||||
|
||||
void on_resolve(error ec, tcp::resolver::results_type results);
|
||||
|
||||
@@ -46,7 +45,7 @@ public:
|
||||
socket_client(net::io_context &ioc, socket_session_handler &session_handler);
|
||||
|
||||
//Entry point to the client which requires an active host and port
|
||||
void run(char const *host, char const *port);
|
||||
void run(std::string_view host, std::string_view port);
|
||||
};
|
||||
} // namespace sock
|
||||
#endif
|
||||
@@ -53,8 +53,12 @@ socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socke
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Entry point to socket server which accepts new connections
|
||||
*/
|
||||
void socket_server::run()
|
||||
{
|
||||
|
||||
// Start accepting a connection
|
||||
acceptor_.async_accept(
|
||||
socket_,
|
||||
@@ -63,7 +67,9 @@ void socket_server::run()
|
||||
});
|
||||
}
|
||||
|
||||
// Report a failure
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
void socket_server::fail(error_code ec, char const *what)
|
||||
{
|
||||
// Don't report on canceled operations
|
||||
@@ -72,7 +78,9 @@ void socket_server::fail(error_code ec, char const *what)
|
||||
std::cerr << what << ": " << ec.message() << "\n";
|
||||
}
|
||||
|
||||
// Handle a connection
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
void socket_server::on_accept(error_code ec)
|
||||
{
|
||||
if (ec)
|
||||
@@ -81,8 +89,10 @@ void socket_server::on_accept(error_code ec)
|
||||
}
|
||||
else
|
||||
{
|
||||
unsigned short port = socket_.remote_endpoint().port();
|
||||
std::string address = socket_.remote_endpoint().address().to_string();
|
||||
std::string_view port = std::to_string(socket_.remote_endpoint().port());
|
||||
std::string_view address = socket_.remote_endpoint().address().to_string();
|
||||
|
||||
//Creating websocket stream required to pass to initiate a new session
|
||||
websocket::stream<beast::tcp_stream> ws(std::move(socket_));
|
||||
|
||||
// Launch a new session for this connection
|
||||
|
||||
@@ -18,9 +18,9 @@ namespace sock
|
||||
*/
|
||||
class socket_server : public std::enable_shared_from_this<socket_server>
|
||||
{
|
||||
tcp::acceptor acceptor_;
|
||||
tcp::socket socket_;
|
||||
socket_session_handler &sess_handler_;
|
||||
tcp::acceptor acceptor_; // acceptor which accepts new connections
|
||||
tcp::socket socket_; // socket in which the client connects
|
||||
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
|
||||
|
||||
void fail(error ec, char const *what);
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ socket_session::socket_session(websocket::stream<beast::tcp_stream> &websocket,
|
||||
}
|
||||
|
||||
//port and address will be used to identify from which client the message recieved in the handler
|
||||
void socket_session::server_run(const std::uint16_t port, std::string_view address)
|
||||
void socket_session::server_run(std::string_view port, std::string_view address)
|
||||
{
|
||||
port_ = port;
|
||||
address_ = address;
|
||||
@@ -31,7 +31,7 @@ void socket_session::server_run(const std::uint16_t port, std::string_view addre
|
||||
}
|
||||
|
||||
//port and address will be used to identify from which server the message recieved in the handler
|
||||
void socket_session::client_run(const std::uint16_t port, std::string_view address, error ec)
|
||||
void socket_session::client_run(std::string_view port, std::string_view address, error ec)
|
||||
{
|
||||
port_ = port;
|
||||
address_ = address;
|
||||
@@ -49,6 +49,9 @@ void socket_session::client_run(const std::uint16_t port, std::string_view addre
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on error
|
||||
*/
|
||||
void socket_session::fail(error_code ec, char const *what)
|
||||
{
|
||||
// std::cerr << what << ": " << ec.message() << std::endl;
|
||||
@@ -59,6 +62,9 @@ void socket_session::fail(error_code ec, char const *what)
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Executes on acceptance of new connection
|
||||
*/
|
||||
void socket_session::on_accept(error_code ec)
|
||||
{
|
||||
// Handle the error, if any
|
||||
@@ -76,8 +82,12 @@ void socket_session::on_accept(error_code ec)
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of recieiving a new message
|
||||
*/
|
||||
void socket_session::on_read(error_code ec, std::size_t)
|
||||
{
|
||||
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
|
||||
// read may get called when operation_aborted as well.
|
||||
// We don't need to process read operation in that case.
|
||||
if (ec == net::error::operation_aborted)
|
||||
@@ -92,7 +102,7 @@ void socket_session::on_read(error_code ec, std::size_t)
|
||||
}
|
||||
|
||||
std::string message = beast::buffers_to_string(buffer_.data());
|
||||
sess_handler_.on_message(this, message);
|
||||
sess_handler_.on_message(this, std::move(message));
|
||||
|
||||
// Clear the buffer
|
||||
buffer_.consume(buffer_.size());
|
||||
@@ -106,7 +116,10 @@ void socket_session::on_read(error_code ec, std::size_t)
|
||||
});
|
||||
}
|
||||
|
||||
void socket_session::send(std::shared_ptr<std::string const> const &ss)
|
||||
/*
|
||||
* Send message through an active websocket connection
|
||||
*/
|
||||
void socket_session::send(std::string &&ss)
|
||||
{
|
||||
// Always add to queue
|
||||
queue_.push_back(ss);
|
||||
@@ -117,13 +130,16 @@ void socket_session::send(std::shared_ptr<std::string const> const &ss)
|
||||
|
||||
// We are not currently writing, so send this immediately
|
||||
ws_.async_write(
|
||||
net::buffer(*queue_.front()),
|
||||
net::buffer(queue_.front()),
|
||||
[sp = shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of write operation to a socket
|
||||
*/
|
||||
void socket_session::on_write(error_code ec, std::size_t)
|
||||
{
|
||||
// Handle the error, if any
|
||||
@@ -136,13 +152,16 @@ void socket_session::on_write(error_code ec, std::size_t)
|
||||
// Send the next message if any
|
||||
if (!queue_.empty())
|
||||
ws_.async_write(
|
||||
net::buffer(*queue_.front()),
|
||||
net::buffer(queue_.front()),
|
||||
[sp = shared_from_this()](
|
||||
error_code ec, std::size_t bytes) {
|
||||
sp->on_write(ec, bytes);
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Close an active websocket connection gracefully
|
||||
*/
|
||||
void socket_session::close()
|
||||
{
|
||||
// Close the WebSocket connection
|
||||
@@ -153,6 +172,9 @@ void socket_session::close()
|
||||
});
|
||||
}
|
||||
|
||||
/*
|
||||
* Executes on completion of closing a socket connection
|
||||
*/
|
||||
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
|
||||
void socket_session::on_close(error_code ec, std::int8_t type)
|
||||
{
|
||||
@@ -171,7 +193,7 @@ void socket_session::init_uniqueid()
|
||||
// Create a unique id for the session combining ip and port.
|
||||
// We prepare this appended string here because we need to use it for finding elemends from the maps
|
||||
// for validation purposes whenever a message is received.
|
||||
uniqueid_.append(address_).append(":").append(std::to_string(port_));
|
||||
uniqueid_.append(address_).append(":").append(port_);
|
||||
}
|
||||
|
||||
} // namespace sock
|
||||
@@ -27,10 +27,10 @@ class socket_session_handler;
|
||||
*/
|
||||
class socket_session : public std::enable_shared_from_this<socket_session>
|
||||
{
|
||||
beast::flat_buffer buffer_;
|
||||
websocket::stream<beast::tcp_stream> ws_;
|
||||
std::vector<std::shared_ptr<std::string const>> queue_;
|
||||
socket_session_handler &sess_handler_;
|
||||
beast::flat_buffer buffer_; // used to store incoming messages
|
||||
websocket::stream<beast::tcp_stream> ws_; // websocket stream used send an recieve messages
|
||||
std::vector<std::string> queue_; // uses to store messages temporarily until it is sent to the relevant party
|
||||
socket_session_handler &sess_handler_; // handler passed to gain access to websocket events
|
||||
|
||||
void fail(error ec, char const *what);
|
||||
|
||||
@@ -45,11 +45,15 @@ class socket_session : public std::enable_shared_from_this<socket_session>
|
||||
public:
|
||||
socket_session(websocket::stream<beast::tcp_stream> &websocket, socket_session_handler &sess_handler);
|
||||
|
||||
// Port and the address of the remote party is being saved to used in the session handler
|
||||
// to identify from which remote party the message recieved. Since the port is passed as a string
|
||||
// from the parent we store as it is, since we are not going to pass it anywhere or used in a method
|
||||
|
||||
// The port of the remote party.
|
||||
std::uint16_t port_;
|
||||
std::string_view port_;
|
||||
|
||||
// The IP address of the remote party.
|
||||
std::string address_;
|
||||
std::string_view address_;
|
||||
|
||||
// The unique identifier of the remote party (format <ip>:<port>).
|
||||
std::string uniqueid_;
|
||||
@@ -59,11 +63,10 @@ public:
|
||||
// Setting and reading flags to this is completely managed by user-code.
|
||||
std::bitset<8> flags_;
|
||||
|
||||
void server_run(const std::uint16_t port, std::string_view address);
|
||||
void client_run(const std::uint16_t port, std::string_view address, error ec);
|
||||
void server_run(std::string_view port, std::string_view address);
|
||||
void client_run(std::string_view port, std::string_view address, error ec);
|
||||
|
||||
// Used to send message through an active websocket connection.
|
||||
void send(std::shared_ptr<std::string const> const &ss);
|
||||
void send(std::string &&ss);
|
||||
|
||||
// When called, initializes the unique id string for this session.
|
||||
void init_uniqueid();
|
||||
|
||||
@@ -15,8 +15,19 @@ class socket_session;
|
||||
class socket_session_handler
|
||||
{
|
||||
public:
|
||||
/**
|
||||
* Executes on initiation of a new connection
|
||||
*/
|
||||
virtual void on_connect(socket_session *session) = 0;
|
||||
virtual void on_message(socket_session *session, const std::string &message) = 0;
|
||||
|
||||
/**
|
||||
* Executes on recieval of new message
|
||||
*/
|
||||
virtual void on_message(socket_session *session, std::string &&message) = 0;
|
||||
|
||||
/**
|
||||
* Executes on websocket connection close
|
||||
*/
|
||||
virtual void on_close(socket_session *session) = 0;
|
||||
};
|
||||
} // namespace sock
|
||||
|
||||
@@ -24,7 +24,7 @@ void user_session_handler::on_connect(sock::socket_session *session)
|
||||
{
|
||||
std::cout << "User client connected " << session->address_ << ":" << session->port_ << std::endl;
|
||||
|
||||
// As a soon as a user conntects, we issue them a challenge message. We remember the
|
||||
// As soon as a user conntects, we issue them a challenge message. We remember the
|
||||
// challenge we issued and later verifies the user's response with it.
|
||||
|
||||
std::string msg;
|
||||
@@ -37,8 +37,7 @@ void user_session_handler::on_connect(sock::socket_session *session)
|
||||
// Create an entry in pending_challenges for later tracking upon challenge response.
|
||||
usr::pending_challenges[session->uniqueid_] = challengeb64;
|
||||
|
||||
// TODO: This needs to be reviewed to optimise passing the message.
|
||||
session->send(std::make_shared<std::string>(msg));
|
||||
session->send(std::move(msg));
|
||||
|
||||
// Set the challenge-issued flag to help later checks in on_message.
|
||||
session->flags_.set(util::SESSION_FLAG::USER_CHALLENGE_ISSUED);
|
||||
@@ -47,7 +46,7 @@ void user_session_handler::on_connect(sock::socket_session *session)
|
||||
/**
|
||||
* This gets hit every time we receive some data from a client connected to the HP public port.
|
||||
*/
|
||||
void user_session_handler::on_message(sock::socket_session *session, const std::string &message)
|
||||
void user_session_handler::on_message(sock::socket_session *session, std::string &&message)
|
||||
{
|
||||
// First check whether this session is pending challenge.
|
||||
// Meaning we have previously issued a challenge to the client,
|
||||
|
||||
@@ -11,8 +11,8 @@ class user_session_handler : public sock::socket_session_handler
|
||||
{
|
||||
public:
|
||||
void on_connect(sock::socket_session *session);
|
||||
void on_message(sock::socket_session *session, const std::string &message);
|
||||
void on_message(sock::socket_session *session, std::string &&message);
|
||||
void on_close(sock::socket_session *session);
|
||||
};
|
||||
|
||||
}
|
||||
} // namespace usr
|
||||
Reference in New Issue
Block a user