Merged latest from master.

This commit is contained in:
asanka-indrajith
2019-10-09 09:02:30 -04:00
10 changed files with 485 additions and 4 deletions

View File

@@ -13,6 +13,9 @@ add_executable(hpcore
src/usr/usr.cpp
src/shared.cpp
src/p2p/message.pb.cc
src/sock/socket_client.cpp
src/sock/socket_server.cpp
src/sock/socket_session.cpp
)
target_link_libraries(hpcore

View File

@@ -48,9 +48,9 @@ struct ContractConfig
string listenip;
vector<string> peers;
vector<string> unl;
int peerport;
unsigned short peerport;
int roundtime;
int pubport;
unsigned short pubport;
int pubmaxsize;
int pubmaxcpm;
};

View File

@@ -82,5 +82,5 @@ int parse_cmd(int argc, char **argv)
cout << "hpcore <command> <contract dir> (command = run | new | rekey)\n";
cout << "Example: hpcore run ~/mycontract\n";
return -1;
}
return 0;
}

View File

@@ -0,0 +1,75 @@
#include <iostream>
#include "socket_client.h"
using tcp = net::ip::tcp;
using error = boost::system::error_code;
namespace sock
{
socket_client::socket_client(net::io_context &ioc, socket_session_handler &session_handler)
: resolver_(net::make_strand(ioc)), ws_(net::make_strand(ioc)), socket_(ioc), sess_handler_(session_handler)
{
}
void socket_client::run(char const *host, char const *port)
{
host_ = host;
port_ = (unsigned short)std::strtoul(port, NULL, 0);
// Look up the domain name
resolver_.async_resolve(
host,
port,
[self = shared_from_this()](error ec, tcp::resolver::results_type results) {
self->on_resolve(ec, results);
});
}
void socket_client::on_resolve(error ec, tcp::resolver::results_type results)
{
if (ec)
socket_client_fail(ec, "socket_client_resolve");
// Make the connection on the IP address we get from a lookup
beast::get_lowest_layer(ws_).async_connect(
results,
[self = shared_from_this()](error ec, tcp::resolver::results_type::endpoint_type type) {
self->on_connect(ec, type);
});
}
void socket_client::on_connect(error ec, tcp::resolver::results_type::endpoint_type)
{
if (ec)
socket_client_fail(ec, "socket_client_connect");
// Turn off the timeout on the tcp_stream, because
// the websocket stream has its own timeout system.
beast::get_lowest_layer(ws_).expires_never();
// Set suggested timeout settings for the websocket
ws_.set_option(
websocket::stream_base::timeout::suggested(
beast::role_type::client));
// Perform the websocket handshake
ws_.async_handshake(host_, "/",
[self = shared_from_this()](error ec) {
self->on_handshake(ec);
});
}
void socket_client::on_handshake(error ec)
{
std::make_shared<socket_session>(
std::move(socket_), sess_handler_)
->client_run(port_, host_, ec);
}
void socket_client::socket_client_fail(beast::error_code ec, char const *what)
{
std::cerr << what << ": " << ec.message() << "\n";
}
} // namespace sock

51
src/sock/socket_client.h Normal file
View File

@@ -0,0 +1,51 @@
#ifndef _SOCK_CLIENT_SESSION_H_
#define _SOCK_CLIENT_SESSION_H_
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include "socket_session.h"
#include "socket_session_handler.h"
namespace beast = boost::beast;
namespace net = boost::asio;
namespace websocket = boost::beast::websocket;
namespace http = boost::beast::http;
using tcp = net::ip::tcp;
using error = boost::system::error_code;
namespace sock
{
/**
* Represents an active WebSocket client connection
* Based on the implementation from https://github.com/vinniefalco/CppCon2018
*/
class socket_client : public std::enable_shared_from_this<socket_client>
{
tcp::resolver resolver_;
tcp::socket socket_;
websocket::stream<beast::tcp_stream> ws_;
std::string host_;
unsigned short port_;
socket_session_handler &sess_handler_;
void on_resolve(error ec, tcp::resolver::results_type results);
void on_connect(error ec, tcp::resolver::results_type::endpoint_type);
void on_handshake(error ec);
void on_close(error ec);
void socket_client_fail(beast::error_code ec, char const *what);
public:
// Resolver and socket require an io_context
socket_client(net::io_context &ioc, socket_session_handler &session_handler);
//Entry point to the client which requires an active host and port
void run(char const *host, char const *port);
};
} // namespace sock
#endif

100
src/sock/socket_server.cpp Normal file
View File

@@ -0,0 +1,100 @@
#include <iostream>
#include <string>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include <boost/asio/strand.hpp>
#include "socket_server.h"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
namespace sock
{
socket_server::socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler)
: acceptor_(ioc), socket_(ioc), sess_handler_(session_handler)
{
error_code ec;
// Open the acceptor
acceptor_.open(endpoint.protocol(), ec);
if (ec)
{
fail(ec, "open");
return;
}
// Allow address reuse
acceptor_.set_option(net::socket_base::reuse_address(true));
if (ec)
{
fail(ec, "set_option");
return;
}
// Bind to the server address
acceptor_.bind(endpoint, ec);
if (ec)
{
fail(ec, "bind");
return;
}
// Start listening for connections
acceptor_.listen(
net::socket_base::max_listen_connections, ec);
if (ec)
{
fail(ec, "listen");
return;
}
}
void socket_server::run()
{
// Start accepting a connection
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
// Report a failure
void socket_server::fail(error_code ec, char const *what)
{
// Don't report on canceled operations
if (ec == net::error::operation_aborted)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
// Handle a connection
void socket_server::on_accept(error_code ec)
{
if (ec)
{
return fail(ec, "accept");
}
else
{
unsigned short port = socket_.remote_endpoint().port();
std::string address = socket_.remote_endpoint().address().to_string();
// Launch a new session for this connection
std::make_shared<socket_session>(
std::move(socket_), sess_handler_)
->server_run(port, address);
}
// Accept another connection
acceptor_.async_accept(
socket_,
[self = shared_from_this()](error_code ec) {
self->on_accept(ec);
});
}
} // namespace sock

37
src/sock/socket_server.h Normal file
View File

@@ -0,0 +1,37 @@
#ifndef _SOCK_SERVER_LISTENER_H_
#define _SOCK_SERVER_LISTENER_H_
#include <boost/asio.hpp>
#include "socket_session_handler.h"
namespace net = boost::asio; // namespace asio
using tcp = net::ip::tcp;
using error = boost::system::error_code; // from <boost/system/error_code.hpp>
namespace sock
{
/**
* Represents an active WebSocket server connection
* Based on the implementation from https://github.com/vinniefalco/CppCon2018
*/
class socket_server : public std::enable_shared_from_this<socket_server>
{
tcp::acceptor acceptor_;
tcp::socket socket_;
socket_session_handler &sess_handler_;
void fail(error ec, char const *what);
void on_accept(error ec);
public:
socket_server(net::io_context &ioc, tcp::endpoint endpoint, socket_session_handler &session_handler);
// Start accepting incoming connections
void run();
};
} // namespace sock
#endif

133
src/sock/socket_session.cpp Normal file
View File

@@ -0,0 +1,133 @@
#include <iostream>
#include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp>
#include "socket_session.h"
namespace net = boost::asio;
using tcp = net::ip::tcp;
using error_code = boost::system::error_code;
namespace sock
{
socket_session::socket_session(tcp::socket socket, socket_session_handler &sess_handler)
: ws_(std::move(socket)), sess_handler_(sess_handler)
{
}
void socket_session::server_run(const unsigned short &port, const std::string &address)
{
port_ = port;
address_ = address;
// Accept the websocket handshake
ws_.async_accept(
[sp = shared_from_this()](
error ec) {
sp->on_accept(ec);
});
}
void socket_session::client_run(const unsigned short &port, const std::string &address, error ec)
{
port_ = port;
address_ = address;
sess_handler_.on_connect(this, ec);
if (ec)
return fail(ec, "handshake");
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
void socket_session::fail(error_code ec, char const *what)
{
// Don't report these
if (ec == net::error::operation_aborted ||
ec == websocket::error::closed)
return;
std::cerr << what << ": " << ec.message() << "\n";
}
void socket_session::on_accept(error_code ec)
{
sess_handler_.on_connect(this, ec);
// Handle the error, if any
if (ec)
return fail(ec, "accept");
// Read a message
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
void socket_session::on_read(error_code ec, std::size_t)
{
auto const string_message = std::make_shared<std::string const>(std::move(beast::buffers_to_string(buffer_.data())));
sess_handler_.on_message(this, string_message, ec);
// Handle the error, if any
if (ec)
return fail(ec, "read");
// Clear the buffer
buffer_.consume(buffer_.size());
// Read another message
ws_.async_read(
buffer_,
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_read(ec, bytes);
});
}
void socket_session::send(std::shared_ptr<std::string const> const &ss)
{
// Always add to queue
queue_.push_back(ss);
// Are we already writing?
if (queue_.size() > 1)
return;
// We are not currently writing, so send this immediately
ws_.async_write(
net::buffer(*queue_.front()),
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
void socket_session::on_write(error_code ec, std::size_t)
{
// Handle the error, if any
if (ec)
return fail(ec, "write");
// Remove the string from the queue
queue_.erase(queue_.begin());
// Send the next message if any
if (!queue_.empty())
ws_.async_write(
net::buffer(*queue_.front()),
[sp = shared_from_this()](
error_code ec, std::size_t bytes) {
sp->on_write(ec, bytes);
});
}
} // namespace sock

55
src/sock/socket_session.h Normal file
View File

@@ -0,0 +1,55 @@
#ifndef _SOCK_SERVER_SESSION_H_
#define _SOCK_SERVER_SESSION_H_
#include <string>
#include <vector>
#include <boost/asio.hpp>
#include <boost/beast.hpp>
#include "socket_session_handler.h"
namespace beast = boost::beast;
namespace net = boost::asio;
namespace websocket = boost::beast::websocket;
namespace http = boost::beast::http;
using tcp = net::ip::tcp;
using error = boost::system::error_code;
namespace sock
{
//Forward Declaration
class socket_session_handler;
/**
* Represents an active WebSocket connection
*/
class socket_session : public std::enable_shared_from_this<socket_session>
{
beast::flat_buffer buffer_;
websocket::stream<tcp::socket> ws_;
std::vector<std::shared_ptr<std::string const>> queue_;
socket_session_handler &sess_handler_;
void fail(error ec, char const *what);
void on_accept(error ec);
void on_read(error ec, std::size_t bytes_transferred);
void on_write(error ec, std::size_t bytes_transferred);
public:
socket_session(tcp::socket socket, socket_session_handler &sess_handler);
unsigned short port_;
std::string address_;
void server_run(const unsigned short &port, const std::string &address);
void client_run(const unsigned short &port, const std::string &address, error ec);
//Used to send message through an active websocket connection
void send(std::shared_ptr<std::string const> const &ss);
};
} // namespace sock
#endif

View File

@@ -0,0 +1,27 @@
#ifndef _SOCK_SESSION_HANDLER_H_
#define _SOCK_SESSION_HANDLER_H_
#include "socket_session.h"
#include <boost/beast/core.hpp>
using error = boost::system::error_code;
namespace sock
{
// Forward declaration
class socket_session;
/**
* Represents a WebSocket sessions handler. Can inherit from this class and access websocket events
*/
class socket_session_handler
{
public:
virtual void on_connect(socket_session *session, error ec) = 0;
virtual void on_message(socket_session *session, std::shared_ptr<std::string const> const &message, error ec) = 0;
virtual void on_close(socket_session *session) = 0;
};
} // namespace sock
#endif