diff --git a/CMakeLists.txt b/CMakeLists.txt index b97cefa..d8e5747 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,7 +25,6 @@ add_subdirectory(src/killswitch) add_executable(sagent src/conf.cpp src/comm/comm_handler.cpp - src/comm/comm_session.cpp src/util/util.cpp src/salog.cpp src/crypto.cpp @@ -51,7 +50,7 @@ add_dependencies(sagent ) add_custom_command(TARGET sagent POST_BUILD - COMMAND bash -c "cp -r ./dependencies/{hpfs,hpws,user-install.sh,user-uninstall.sh} ./build/" + COMMAND bash -c "cp -r ./dependencies/{hpfs,user-install.sh,user-uninstall.sh} ./build/" COMMAND tar xf ./dependencies/contract_template.tar -C ./build/ --no-same-owner COMMAND cp ./dependencies/hp.cfg ./build/contract_template/cfg/ COMMAND cp ./bootstrap-contract/script.sh ./build/contract_template/contract_fs/seed/state/ @@ -64,7 +63,7 @@ target_precompile_headers(sagent PUBLIC src/pchheader.hpp) # Add target to generate the installer setup. add_custom_target(installer COMMAND mkdir -p ./build/sashimono-installer - COMMAND bash -c "cp -r ./build/{sagent,hpfs,hpws,user-install.sh,user-uninstall.sh,contract_template} ./build/sashimono-installer/" + COMMAND bash -c "cp -r ./build/{sagent,hpfs,user-install.sh,user-uninstall.sh,contract_template} ./build/sashimono-installer/" COMMAND bash -c "cp -r ./installer/{docker-install.sh,registry-install.sh,registry-uninstall.sh,sashimono-install.sh,sashimono-uninstall.sh} ./build/sashimono-installer/" COMMAND bash -c "cp -r ./dependencies/{user-cgcreate.sh,libblake3.so} ./build/sashimono-installer/" COMMAND tar cfz ./build/sashimono-installer.tar.gz --directory=./build/ sashimono-installer diff --git a/README.md b/README.md index ce893ce..6e6d5f5 100644 --- a/README.md +++ b/README.md @@ -39,7 +39,7 @@ Run `make installer` ('sashimono-installer.tar.gz' will be placed in build direc ## Code structure Code is divided into subsystems via namespaces. -**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for [hpws](https://github.com/RichardAH/hpws). +**comm::** Handles socket related functionality. **conf::** Handles configuration. Loads and holds the central configuration object. Used by most of the subsystems. diff --git a/dependencies/hpws b/dependencies/hpws deleted file mode 100755 index 69c98f5..0000000 Binary files a/dependencies/hpws and /dev/null differ diff --git a/dev-setup.sh b/dev-setup.sh index fd01517..4dd3e6b 100755 --- a/dev-setup.sh +++ b/dev-setup.sh @@ -85,6 +85,10 @@ cgroupsuffix="-cg" ! sudo groupadd $group && echo "Group creation failed." ! sudo echo "@$group cpu,memory %u$cgroupsuffix" >>/etc/cgrules.conf && echo "Cgroup rule creation failed." +# Setting up Sashimono admin group. +admin_group="sashiadmin" +! sudo groupadd $admin_group && echo "Admin group creation failed." + # Build sagent cmake . make \ No newline at end of file diff --git a/examples/message-board/message-board.js b/examples/message-board/message-board.js index e24d2c7..f4eaa3a 100644 --- a/examples/message-board/message-board.js +++ b/examples/message-board/message-board.js @@ -118,7 +118,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'initiate', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName, peers: peers ? peers.split(',') : [], unl: unl ? unl.split(',') : [], @@ -133,7 +132,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'destroy', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; @@ -142,7 +140,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'start', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; @@ -151,7 +148,6 @@ const interatctiveInterface = async () => { sendToAllAgents(JSON.stringify({ id: uuidv4(), type: 'stop', - owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', container_name: containerName })) break; diff --git a/installer/sashimono-install.sh b/installer/sashimono-install.sh index 0e480ca..df614f9 100755 --- a/installer/sashimono-install.sh +++ b/installer/sashimono-install.sh @@ -8,6 +8,7 @@ sashimono_data=/etc/sashimono sashimono_service="sashimono-agent" cgcreate_service="sashimono-cgcreate" group="sashimonousers" +admin_group="sashiadmin" cgroupsuffix="-cg" registryuser="sashidockerreg" registryport=4444 @@ -58,7 +59,7 @@ function rollback() { } # Install Sashimono agent binaries into sashimono bin dir. -cp "$script_dir"/{sagent,hpfs,hpws,user-cgcreate.sh,user-install.sh,user-uninstall.sh} $sashimono_bin +cp "$script_dir"/{sagent,hpfs,user-cgcreate.sh,user-install.sh,user-uninstall.sh} $sashimono_bin chmod -R +x $sashimono_bin # Download and install rootless dockerd. @@ -79,6 +80,9 @@ selfip=$(ip -4 a l ens3 | awk '/inet/ {print $2}' | cut -d/ -f1) ! groupadd $group && echo "Group creation failed." && rollback ! echo "@$group cpu,memory %u$cgroupsuffix" >>/etc/cgrules.conf && echo "Cgroup rule creation failed." && rollback +# Setting up Sashimono admin group. +! groupadd $admin_group && echo "Admin group creation failed." && rollback + # Setup Sashimono data dir. cp -r "$script_dir"/contract_template $sashimono_data $sashimono_bin/sagent new $sashimono_data $selfip $registry_addr diff --git a/installer/sashimono-uninstall.sh b/installer/sashimono-uninstall.sh index 5cb8099..c987fbb 100755 --- a/installer/sashimono-uninstall.sh +++ b/installer/sashimono-uninstall.sh @@ -9,6 +9,7 @@ sashimono_service="sashimono-agent" cgcreate_service="sashimono-cgcreate" registryuser="sashidockerreg" group="sashimonousers" +admin_group="sashiadmin" cgroupsuffix="-cg" quiet=$1 @@ -80,5 +81,7 @@ echo "Deleting cgroup rules..." groupdel $group sed -i -r "/^@$group\s+cpu,memory\s+%u$cgroupsuffix/d" /etc/cgrules.conf +groupdel $admin_group + echo "Sashimono uninstalled successfully." exit 0 diff --git a/src/comm/comm_handler.cpp b/src/comm/comm_handler.cpp index 43105e8..ef2a88f 100644 --- a/src/comm/comm_handler.cpp +++ b/src/comm/comm_handler.cpp @@ -1,19 +1,60 @@ #include "comm_handler.hpp" #include "../util/util.hpp" #include "../conf.hpp" -#include "hpws.hpp" + +#define __HANDLE_RESPONSE(id, type, content, ret) \ + { \ + std::string res; \ + msg_parser.build_response(res, type, id, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ + send(res); \ + return ret; \ + } namespace comm { constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 1 * 1024 * 1024; // 1MB; bool init_success; + constexpr const int POLL_TIMEOUT = 10; + constexpr const int BUFFER_SIZE = 1024; + constexpr const int EMPTY_READ_TRESHOLD = 5; + msg::msg_parser msg_parser; comm_ctx ctx; int init() { - ctx.comm_handler_thread = std::thread(comm_handler_loop); + ctx.connection_socket = socket(AF_UNIX, SOCK_SEQPACKET, 0); + if (ctx.connection_socket == -1) + { + LOG_ERROR << errno << ": Error creating the socket."; + return -1; + } + struct sockaddr_un sock_name; + memset(&sock_name, 0, sizeof(struct sockaddr_un)); + sock_name.sun_family = AF_UNIX; + strncpy(sock_name.sun_path, conf::ctx.socket_path.c_str(), sizeof(sock_name.sun_path) - 1); + + // Remove the socket if it already exists. + unlink(conf::ctx.socket_path.c_str()); + + const std::string command = "chown :sashiadmin " + conf::ctx.socket_path; + + char mode[] = "0660"; // rw-rw---- + const mode_t permission_mode = strtol(mode, 0, 8); // Char to octal conversion. + + if (bind(ctx.connection_socket, (const struct sockaddr *)&sock_name, sizeof(struct sockaddr_un)) == -1 || + chmod(conf::ctx.socket_path.c_str(), permission_mode) == -1 || + system(command.data()) == -1 || + listen(ctx.connection_socket, 20) == -1) + { + LOG_ERROR << errno << ": Error binding the socket for " << conf::ctx.socket_path; + close(ctx.connection_socket); + return -1; + } + + msg_parser = msg::msg_parser(); + ctx.comm_handler_thread = std::thread(comm_handler_loop); init_success = true; return 0; @@ -27,48 +68,25 @@ namespace comm if (ctx.comm_handler_thread.joinable()) ctx.comm_handler_thread.join(); + + close(ctx.connection_socket); + unlink(conf::ctx.socket_path.c_str()); } } /** - * Make a connection and session to the given host. + * This accepts connections to the socket. * This only gets called whithin the comm handler thread. - * @param ip_port Ip and port of the host. * @return 0 on success -1 on error. */ - int connect(const conf::host_ip_port &ip_port) + int connect() { - std::string_view host = ip_port.host_address; - const uint16_t port = ip_port.port; - - LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); - - std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, DEFAULT_MAX_MSG_SIZE, host, port, "/", {}, util::fork_detach); - - if (std::holds_alternative(client_result)) + ctx.data_socket = accept(ctx.connection_socket, NULL, NULL); + if (ctx.data_socket == -1) { - const hpws::error error = std::get(client_result); - if (error.first != 202) - LOG_ERROR << "Connection hpws error:" << error.first << " " << error.second; + LOG_ERROR << errno << ": Error accepting the new connection."; return -1; } - else - { - hpws::client client = std::move(std::get(client_result)); - const std::variant host_result = client.host_address(); - if (std::holds_alternative(host_result)) - { - const hpws::error error = std::get(host_result); - LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; - return -1; - } - else - { - const std::string &host_address = std::get(host_result); - ctx.session.emplace(host_address, std::move(client)); - ctx.session->init(); - } - } return 0; } @@ -78,11 +96,8 @@ namespace comm */ void disconnect() { - if (ctx.session.has_value()) - { - ctx.session->close(); - ctx.session.reset(); - } + close(ctx.data_socket); + ctx.data_socket = -1; } void comm_handler_loop() @@ -90,29 +105,45 @@ namespace comm LOG_INFO << "Message processor started."; util::mask_signal(); + struct pollfd pfd; + int empty_read_count = 0; // Helps to detect when the client is disconnected. while (!ctx.is_shutting_down) { - // Process queued messaged only if there's a session. - if (ctx.session.has_value()) + // Process queued messaged only if there's a socket connection. + if (ctx.data_socket != -1) { - // If no messages were processed in this cycle, wait for some time. - if (ctx.session->process_inbound_msg_queue() <= 0) - util::sleep(10); - - // If session is marked for closure since there's an issue, We disconnect the current session. - // And try to create a new session in the next round - if (ctx.session->state == SESSION_STATE::MUST_CLOSE) - { - LOG_DEBUG << "Closing the session due to a failure: " << ctx.session->display_name(); + std::string message; + const int ret = read_socket(message); + if (ret == -1) disconnect(); + else if (ret > 0) + handle_message(message); + else + { + empty_read_count++; + // Empty reads happens when client closed the connection. + // Disconnect connection after 5 consecutive empty reads. + if (empty_read_count == EMPTY_READ_TRESHOLD) + { + disconnect(); + empty_read_count = 0; + } util::sleep(1000); } } else { - // If host connection failed wait for some time. - if (connect(conf::cfg.server.ip_port) == -1) + pfd.fd = ctx.connection_socket; + pfd.events = POLLIN; + + // Wait for some time if no connections are available. + if (poll(&pfd, 1, POLL_TIMEOUT) > 0) + { + connect(); + empty_read_count = 0; + } + else util::sleep(1000); } } @@ -130,4 +161,113 @@ namespace comm { ctx.comm_handler_thread.join(); } + + /** + * Handles the received message. + * @param msg Received message. + * @return 0 on success -1 on error. + */ + int handle_message(std::string_view msg) + { + std::string id, type; + if (msg_parser.parse(msg) == -1 || msg_parser.extract_type_and_id(type, id) == -1) + __HANDLE_RESPONSE("", "error", "format_error", -1); + + if (type == msg::MSGTYPE_CREATE) + { + msg::create_msg msg; + if (msg_parser.extract_create_message(msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "format_error", -1); + + hp::instance_info info; + if (hp::create_new_instance(info, msg.pubkey, msg.contract_id, msg.image) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "create_error", -1); + + std::string create_res; + msg_parser.build_create_response(create_res, info); + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, create_res, 0); + } + else if (type == msg::MSGTYPE_INITIATE) + { + msg::initiate_msg msg; + if (msg_parser.extract_initiate_message(msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "format_error", -1); + + if (hp::initiate_instance(msg.container_name, msg) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "init_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "initiated", 0); + } + else if (type == msg::MSGTYPE_DESTROY) + { + msg::destroy_msg msg; + if (msg_parser.extract_destroy_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "format_error", -1); + + if (hp::destroy_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroyed", 0); + } + else if (type == msg::MSGTYPE_START) + { + msg::start_msg msg; + if (msg_parser.extract_start_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "format_error", -1); + + if (hp::start_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "start_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "started", 0); + } + else if (type == msg::MSGTYPE_STOP) + { + msg::stop_msg msg; + if (msg_parser.extract_stop_message(msg)) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "format_error", -1); + + if (hp::stop_container(msg.container_name) == -1) + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stop_error", -1); + + __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stopped", 0); + } + else + __HANDLE_RESPONSE(id, "error", "type_error", -1); + + return 0; + } + + /** + * Sends the given message to the connected client. + * @param message Message to send. + * @return 0 on success -1 on error. + **/ + int send(std::string_view message) + { + if (ctx.data_socket == -1) + return -1; + + const int ret = write(ctx.data_socket, message.data(), message.length() + 1); + // Close connection after sending the response to the client. + disconnect(); + return ret == -1 ? -1 : 0; + } + + /** + * Reads the message from the connected client. + * @param message Placeholder to store the message. + * @return Number of bytes read on success -1 on error. + **/ + int read_socket(std::string &message) + { + char buffer[BUFFER_SIZE]; + const int ret = read(ctx.data_socket, buffer, BUFFER_SIZE); + if (ret == -1) + { + LOG_ERROR << errno << ": Error receiving data."; + return -1; + } + message = std::string(buffer); + return ret; + } } // namespace comm diff --git a/src/comm/comm_handler.hpp b/src/comm/comm_handler.hpp index 7f6b081..6ec2243 100644 --- a/src/comm/comm_handler.hpp +++ b/src/comm/comm_handler.hpp @@ -2,15 +2,16 @@ #define _SA_COMM_COMM_SERVER_ #include "../pchheader.hpp" -#include "comm_session.hpp" +#include "../msg/msg_parser.hpp" namespace comm { struct comm_ctx { - std::optional session; bool is_shutting_down = false; std::thread comm_handler_thread; // Incoming message processor thread. + int connection_socket = -1; + int data_socket = -1; }; extern comm_ctx ctx; @@ -19,14 +20,20 @@ namespace comm void deinit(); - int connect(const conf::host_ip_port &ip_port); + int connect(); void disconnect(); void comm_handler_loop(); + int handle_message(std::string_view msg); + + int send(std::string_view message); + void wait(); + int read_socket(std::string &message); + } // namespace comm #endif diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp deleted file mode 100644 index 607a0cb..0000000 --- a/src/comm/comm_session.cpp +++ /dev/null @@ -1,302 +0,0 @@ -#include "comm_session.hpp" -#include "../util/util.hpp" -#include "../hp_manager.hpp" - -#define __HANDLE_RESPONSE(id, type, content, ret) \ - { \ - std::string res; \ - msg_parser.build_response(res, type, id, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ - send(res); \ - return ret; \ - } - -namespace comm -{ - constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 64; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... - - comm_session::comm_session( - std::string_view host_address, hpws::client &&hpws_client) - : uniqueid(host_address), - host_address(host_address), - hpws_client(std::move(hpws_client)), - msg_parser(msg::msg_parser()), - in_msg_queue(MAX_IN_MSG_QUEUE_SIZE) - { - } - - /** - * Init() should be called to activate the session. - * Because we are starting threads here, after init() is called, the session object must not be "std::moved". - * @return returns 0 on successful init, otherwise -1; - */ - int comm_session::init() - { - if (state == SESSION_STATE::NONE) - { - reader_thread = std::thread(&comm_session::reader_loop, this); - writer_thread = std::thread(&comm_session::outbound_msg_queue_processor, this); - state = SESSION_STATE::ACTIVE; - - // Send an initial message to the host. - std::string res; - msg_parser.build_response(res, msg::MSGTYPE_INIT, {}, "Connection initiated."); - send(res); - LOG_DEBUG << "Session started: " << uniqueid; - } - - return 0; - } - - /** - * Listening for receiving messages and process them. - */ - void comm_session::reader_loop() - { - util::mask_signal(); - - while (state != SESSION_STATE::CLOSED && hpws_client) - { - // If reading from the hpws_client failed we'll mark this session to closure. - bool should_disconnect = false; - - const std::variant read_result = hpws_client->read(); - if (std::holds_alternative(read_result)) - { - should_disconnect = true; - const hpws::error error = std::get(read_result); - if (error.first != 1) // 1 indicates channel has closed. - LOG_DEBUG << "hpws client read failed:" << error.first << " " << error.second; - } - else - { - // Enqueue the message for processing. - std::string_view data = std::get(read_result); - in_msg_queue.try_enqueue(std::string(data)); - - // Signal the hpws client that we are ready for next message. - const std::optional error = hpws_client->ack(data); - if (error.has_value()) - { - should_disconnect = true; - LOG_DEBUG << "hpws client ack failed:" << error->first << " " << error->second; - } - } - - if (should_disconnect) - { - // Here we mark the session as needing to close. - // The session will be properly "closed" and cleared from comm_handler. - // Then comm_handler will try to initiate a new session with the host. - mark_for_closure(); - break; - } - } - } - - /** - * Processes the unprocessed queued inbound messages (if any). - * @return 0 if no messages in queue. 1 if messages were processed. -1 error occured - */ - int comm_session::process_inbound_msg_queue() - { - if (state == SESSION_STATE::CLOSED) - return -1; - - bool messages_processed = false; - std::string msg_to_process; - - // Process all messages in queue. - while (in_msg_queue.try_dequeue(msg_to_process)) - { - handle_message(msg_to_process); - msg_to_process.clear(); - messages_processed = true; - } - - return messages_processed ? 1 : 0; - } - - /** - * This function constructs and sends the message to the target from the given message. - * @param message Message to be sent via the pipe. - * @return 0 on successful message sent and -1 on error. - */ - int comm_session::process_outbound_message(std::string_view message) - { - if (state == SESSION_STATE::CLOSED || !hpws_client) - return -1; - - const std::optional error = hpws_client->write(message); - if (error.has_value()) - { - LOG_ERROR << "hpws client write failed:" << error->first << " " << error->second; - return -1; - } - return 0; - } - - /** - * Loop to keep processing outbound messages in the queue. - */ - void comm_session::outbound_msg_queue_processor() - { - // Appling a signal mask to prevent receiving control signals from linux kernel. - util::mask_signal(); - - // Keep checking until the session is terminated. - while (state != SESSION_STATE::CLOSED) - { - bool messages_sent = false; - std::string msg_to_send; - - // Send all messages in queue. - while (out_msg_queue.try_dequeue(msg_to_send)) - { - process_outbound_message(msg_to_send); - msg_to_send.clear(); - messages_sent = true; - } - - // Wait for small delay if there were no outbound messages. - if (!messages_sent) - util::sleep(10); - } - } - - /** - * Handles the received message. - * @param msg Received message. - * @return 0 on success -1 on error. - */ - int comm_session::handle_message(std::string_view msg) - { - std::string id, type; - if (msg_parser.parse(msg) == -1 || msg_parser.extract_type_and_id(type, id) == -1) - __HANDLE_RESPONSE("", "error", "format_error", -1); - - if (type == msg::MSGTYPE_CREATE) - { - msg::create_msg msg; - if (msg_parser.extract_create_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "format_error", -1); - - hp::instance_info info; - if (hp::create_new_instance(info, msg.pubkey, msg.contract_id, msg.image) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "create_error", -1); - - std::string create_res; - msg_parser.build_create_response(create_res, info); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, create_res, 0); - } - else if (type == msg::MSGTYPE_INITIATE) - { - msg::initiate_msg msg; - if (msg_parser.extract_initiate_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "format_error", -1); - - if (hp::initiate_instance(msg.container_name, msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "init_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "initiated", 0); - } - else if (type == msg::MSGTYPE_DESTROY) - { - msg::destroy_msg msg; - if (msg_parser.extract_destroy_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "format_error", -1); - - if (hp::destroy_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroyed", 0); - } - else if (type == msg::MSGTYPE_START) - { - msg::start_msg msg; - if (msg_parser.extract_start_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "format_error", -1); - - if (hp::start_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "start_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "started", 0); - } - else if (type == msg::MSGTYPE_STOP) - { - msg::stop_msg msg; - if (msg_parser.extract_stop_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "format_error", -1); - - if (hp::stop_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stop_error", -1); - - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stopped", 0); - } - else - __HANDLE_RESPONSE(id, "error", "type_error", -1); - - return 0; - } - - /** - * Adds the given message to the outbound message queue. - * @param message Message to be added to the outbound queue. - * @return 0 on successful addition and -1 if the session is already closed. - */ - int comm_session::send(std::string_view message) - { - if (state == SESSION_STATE::CLOSED) - return -1; - - // Passing the ownership of message to the queue. - out_msg_queue.enqueue(std::string(message)); - - return 0; - } - - /** - * Mark the session as needing to close. - * The session will be properly "closed" by comm_handler. - */ - void comm_session::mark_for_closure() - { - if (state == SESSION_STATE::CLOSED) - return; - - state = SESSION_STATE::MUST_CLOSE; - } - - /** - * Close the connection and wrap up any session processing threads. - * This will be only called by the global comm_handler. - */ - void comm_session::close() - { - if (state == SESSION_STATE::CLOSED) - return; - - state = SESSION_STATE::CLOSED; - - // Destruct the hpws client instance so it will close the sockets and related processes. - hpws_client.reset(); - - // Wait untill reader/writer threads gracefully stop. - if (writer_thread.joinable()) - writer_thread.join(); - - if (reader_thread.joinable()) - reader_thread.join(); - - LOG_DEBUG << "Session closed: " << uniqueid; - } - - /** - * Returns printable name for the session based on uniqueid (used for logging). - * @return The display name as a string. - */ - const std::string comm_session::display_name() const - { - return uniqueid; - } - -} // namespace comm \ No newline at end of file diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp deleted file mode 100644 index 8d7c7af..0000000 --- a/src/comm/comm_session.hpp +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef _HP_COMM_COMM_SESSION_ -#define _HP_COMM_COMM_SESSION_ - -#include "../pchheader.hpp" -#include "../conf.hpp" -#include "hpws.hpp" -#include "../msg/msg_parser.hpp" - -namespace comm -{ - enum SESSION_STATE - { - NONE, // Session is not yet initialized properly. - ACTIVE, // Session is active and functioning. - MUST_CLOSE, // Session socket is in unusable state and must be closed. - CLOSED // Session is fully closed. - }; - - /** - * Represents an active WebSocket connection - */ - class comm_session - { - private: - std::optional hpws_client; - msg::msg_parser msg_parser; // Message parser. - const std::string uniqueid; // IP address. - const std::string host_address; // Connection host address of the remote party. - - std::thread reader_thread; // The thread responsible for reading messages from the read fd. - std::thread writer_thread; // The thread responsible for writing messages to the write fd. - moodycamel::ReaderWriterQueue in_msg_queue; // Holds incoming messages waiting to be processed. - moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. - - void reader_loop(); - int handle_message(std::string_view msg); - int process_outbound_message(std::string_view message); - void outbound_msg_queue_processor(); - void mark_for_closure(); - - public: - SESSION_STATE state = SESSION_STATE::NONE; - comm_session( - std::string_view host_address, hpws::client &&hpws_client); - int init(); - int send(std::string_view message); - int process_inbound_msg_queue(); - void close(); - const std::string display_name() const; - }; - -} // namespace comm - -#endif diff --git a/src/comm/hpws.hpp b/src/comm/hpws.hpp deleted file mode 100644 index 737f538..0000000 --- a/src/comm/hpws.hpp +++ /dev/null @@ -1,995 +0,0 @@ -#ifndef HPWS_INCLUDE -#define HPWS_INCLUDE -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -#define DECODE_O_SIZE(control_msg, into) \ - { \ - into = ((uint32_t)control_msg[2] << 24) + ((uint32_t)control_msg[3] << 16) + \ - ((uint32_t)control_msg[4] << 8) + ((uint32_t)control_msg[5] << 0); \ - } - -#define ENCODE_O_SIZE(control_msg, from) \ - { \ - uint32_t f = from; \ - control_msg[2] = (unsigned char)((f >> 24) & 0xff); \ - control_msg[3] = (unsigned char)((f >> 16) & 0xff); \ - control_msg[4] = (unsigned char)((f >> 8) & 0xff); \ - control_msg[5] = (unsigned char)((f >> 0) & 0xff); \ - } - -#define HPWS_DEBUG 0 - -namespace hpws -{ - /*typedef enum e_retcode { - SUCCESS - } retcode; - */ - using error = std::pair; - -// used when waiting for messages that should already be on the pipe -#define HPWS_SMALL_TIMEOUT 10 -// used when waiting for server process to spawn -#define HPWS_LONG_TIMEOUT 1500 // This timeout has to account the possible delays in communication via internet. - - typedef union - { - struct sockaddr sa; - struct sockaddr_in sin; - struct sockaddr_in6 sin6; - struct sockaddr_storage ss; - } addr_t; - - class server; - - class client - { - - private: - pid_t child_pid = 0; // if this client was created by a connect this is set - // this value can't be changed once it's established between the processes - uint32_t max_buffer_size; - bool moved = false; - addr_t endpoint; - std::string get; // the get req this websocket was opened with - int control_line_fd[2]; // see below in client constructor - int buffer_fd[4]; // 0 1 - in buffers, 2 3 - out buffers - int buffer_lock[2] = {0, 0}; // this records if buffers 2 and 3 have been sent out awaiting an ack or not - void *buffer[4]; - - // private constructor - client( - std::string_view get, - addr_t endpoint, - int control_line_fd_0, // hpws -> sagent [ hpws sends bufs to us over this line ] - int control_line_fd_1, // sagent -> hpws [ we send bufs to hpws over this line ] - uint32_t max_buffer_size, - pid_t child_pid, - int buffer_fd[4], - void *buffer[4]) : endpoint(endpoint), - max_buffer_size(max_buffer_size), - child_pid(child_pid), get(get) - { - control_line_fd[0] = control_line_fd_0; - control_line_fd[1] = control_line_fd_1; - for (int i = 0; i < 4; ++i) - { - this->buffer[i] = buffer[i]; - this->buffer_fd[i] = buffer_fd[i]; - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] child constructed pid = %d\n", child_pid); - } - - public: - // No copy constructor - client(const client &) = delete; - - // only a move constructor - client(client &&old) : child_pid(old.child_pid), - max_buffer_size(old.max_buffer_size), - endpoint(old.endpoint), - get(old.get) - { - old.moved = true; - for (int i = 0; i <= 1; ++i) - { - this->control_line_fd[i] = old.control_line_fd[i]; - buffer_lock[i] = old.buffer_lock[i]; - } - for (int i = 0; i < 4; ++i) - { - this->buffer[i] = old.buffer[i]; - this->buffer_fd[i] = old.buffer_fd[i]; - } - } - - ~client() - { - if (!moved) - { - - // RH TODO ensure this pid terminates by following up with a SIGKILL - if (child_pid > 0) - { - kill(child_pid, SIGTERM); - int status; - waitpid(child_pid, &status, 0 /* should we use WNOHANG? */); - } - - for (int i = 0; i < 4; ++i) - { - munmap(buffer[i], max_buffer_size); - ::close(buffer_fd[i]); - } - - ::close(control_line_fd[0]); - ::close(control_line_fd[1]); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] child destructed pid = %d\n", child_pid); - } - } - - const std::variant host_address() - { - char hostname[NI_MAXHOST]; - const int ret = getnameinfo((sockaddr *)&endpoint, sizeof(sockaddr), hostname, sizeof(hostname), NULL, 0, NI_NUMERICHOST); - if (ret != 0) - return error{10, gai_strerror(ret)}; - - return hostname; - } - - std::variant read() - { - - unsigned char buf[32]; - int bytes_read = recv(control_line_fd[0], buf, sizeof(buf), 0); - if (bytes_read < 1) - { - if (HPWS_DEBUG) - { - perror("recv"); - fprintf(stderr, "[HPWS.HPP] bytes received %d\n", bytes_read); - } - return error{1, "[read] control line could not be read"}; // todo clean up somehow? - } - - switch (buf[0]) - { - case 'o': - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] o message received\n"); - - if (bytes_read != 6) - return error{3, "invalid buffer in 'o' command sent by hpws"}; - - uint32_t len = 0; - DECODE_O_SIZE(buf, len); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] o message len: %u\n", len); - - int bufno = buf[1] - '0'; - if (bufno != 0 && bufno != 1) - return error{3, "invalid buffer in 'o' command sent by hpws"}; - - if (HPWS_DEBUG) - { - fprintf(stderr, "[HPWS.HPP] read %d\n", len); - for (uint32_t i = 0; i < len; ++i) - putc(((char *)(buffer[bufno]))[i], stderr); - fprintf(stderr, "\n---\n"); - } - return std::string_view{(const char *)(buffer[bufno]), len}; - } - case 'c': - { - return error{1000, "ws closed"}; - } - default: - { - fprintf(stderr, "[HPWS.HPP] read unknown control message 1: `%.*s`\n", bytes_read, buf); - return error{2, "unknown control line command was sent by hpws"}; - } - } - } - - void close() - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] close called\n"); - - // send the control message informing hpws that we wish to close - char buf[1] = {'c'}; - - ::write(control_line_fd[1], buf, 1); - - // wait for the process to end gracefully - int status; - printf("waitpid result: %d\n", waitpid(child_pid, &status, 0)); // add timeout here? - } - - std::optional write(std::string_view to_write) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] write called for message: `%.*s`\n", - (int)to_write.size(), to_write.data()); - - // check if we have any free buffers - if (buffer_lock[0] && buffer_lock[1]) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] no free buffers while writing, waiting for an ack on control_line_fd[1]=%d\n", control_line_fd[1]); - - // no free buffers, wait for a ack - unsigned char buf[32]; - int bytes_read = 0; - - bytes_read = recv(control_line_fd[1], buf, sizeof(buf), 0); - if (bytes_read < 1) - { - perror("recv"); - return error{1, "[write] control line could not be read"}; // todo clean up somehow? - } - - switch (buf[0]) - { - case 'a': - { - if (bytes_read != 2) - return error{4, "received an ack longer than 2 bytes"}; - int bufno = buf[1] - '0'; - if (!(bufno == 0 || bufno == 1)) - return error{5, "received an ack with an invalid buffer, expecting 0 or 1"}; - // unlock the buffer - buffer_lock[bufno] = 0; - break; - } - case 'c': - return error{1000, "ws closed"}; - default: - fprintf(stderr, "[HPWS.HPP] read unknown control message 2: `%.*s`\n", bytes_read, buf); - return error{2, "unknown control line command was sent by hpws"}; - } - } - - // execution to here ensures at least one buffer is free - int bufno = (buffer_lock[0] == 0 ? 2 : 3); - - // update the selected buffer lock - buffer_lock[bufno - 2] = 1; - - // write into the buffer - memcpy(buffer[bufno], to_write.data(), to_write.size()); - - // send the control message informing hpws that a message is ready on this buffer - uint32_t len = to_write.size(); - char buf[6] = {'o', (char)('0' + (bufno - 2)), 0, 0, 0, 0}; - ENCODE_O_SIZE(buf, len); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] writing 'o' to control_line_fd[1]=%d\n", - control_line_fd[1]); - - if (::write(control_line_fd[1], buf, 6) != 6) - return error{6, "could not write o message to control line"}; - - return std::nullopt; - } - - std::optional ack(std::string_view from_read) - { - char msg[2] = {'a', '0'}; - if (from_read.data() == buffer[1]) - msg[1]++; - if (send(control_line_fd[0], msg, 2, 0) < 2) - return error{10, "could not send ack down control line"}; - return std::nullopt; - } - - static std::variant connect( - std::string_view bin_path, - uint32_t max_buffer_size, - std::string_view host, - uint16_t port, - std::string_view get, - std::vector argv, - std::function fork_child_init = NULL) - { - -#define HPWS_CONNECT_ERROR(code, msg) \ - { \ - error_code = code; \ - error_msg = msg; \ - goto connect_error; \ - } - - int error_code = -1; - const char *error_msg = NULL; - int fd[4] = {-1, -1, -1, -1}; // 0,1 are hpws->sagent, 2,3 are sagent->hpws - int buffer_fd[4] = {-1, -1, -1, -1}; - void *mapping[4] = {NULL, NULL, NULL, NULL}; - int pid = -1; - int count_args = 14 + argv.size(); - char const **argv_pass = NULL; - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) - HPWS_CONNECT_ERROR(100, "could not create unix domain socket pair"); - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd + 2)) - HPWS_CONNECT_ERROR(101, "could not create unix domain socket pair"); - - // construct the arguments - char shm_size[32]; - - if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) - HPWS_CONNECT_ERROR(90, "couldn't write shm size to string"); - - char port_str[6]; - if (snprintf(port_str, 6, "%d", port) <= 0) - HPWS_CONNECT_ERROR(91, "couldn't write port to string"); - - char cfd1[10]; - char cfd2[10]; - - snprintf(cfd1, 10, "%d", fd[1]); - snprintf(cfd2, 10, "%d", fd[3]); - - argv_pass = - reinterpret_cast(alloca(sizeof(char *) * count_args)); - { - int upto = 0; - argv_pass[upto++] = bin_path.data(); - argv_pass[upto++] = "--client"; - argv_pass[upto++] = "--maxmsg"; - argv_pass[upto++] = shm_size; - argv_pass[upto++] = "--host"; - argv_pass[upto++] = host.data(); - argv_pass[upto++] = "--port"; - argv_pass[upto++] = port_str; - argv_pass[upto++] = "--cntlfd"; - argv_pass[upto++] = cfd1; - argv_pass[upto++] = "--cntlfd2"; - argv_pass[upto++] = cfd2; - argv_pass[upto++] = "--get"; - argv_pass[upto++] = get.data(); - for (std::string_view &arg : argv) - argv_pass[upto++] = arg.data(); - argv_pass[upto] = NULL; - } - - pid = vfork(); - - if (pid) - { - - // --- PARENT - - // Fds are set to -1, so when error occurred these fds won't get closed again. - ::close(fd[1]); - fd[1] = -1; - ::close(fd[3]); - fd[3] = -1; - - int child_fd[2] = {fd[0], fd[2]}; - - int flags[2] = { - fcntl(child_fd[0], F_GETFD, NULL), - fcntl(child_fd[1], F_GETFD, NULL)}; - - if (flags[0] < 0 || flags[1] < 0) - HPWS_CONNECT_ERROR(101, "could not get flags from unix domain socket"); - - flags[0] |= FD_CLOEXEC; - flags[1] |= FD_CLOEXEC; - if (fcntl(child_fd[0], F_SETFD, flags[0]) || fcntl(child_fd[1], F_SETFD, flags[1])) - HPWS_CONNECT_ERROR(102, "could notset flags for unix domain socket"); - - // we will set a timeout and wait for the initial startup message from hpws client mode - struct pollfd pfd; - int ret; - - pfd.fd = child_fd[0]; // we receive setup events on control line 0 (hpws->sagent) - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - // timeout or error - if (ret < 1) - HPWS_CONNECT_ERROR(1, "timeout waiting for hpws connect message"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for addr_t\n"); - // first thing we'll receive is the sockaddr union - addr_t child_addr; - - int bytes_read = - recv(child_fd[0], (unsigned char *)(&child_addr), sizeof(child_addr), 0); - - if (bytes_read < sizeof(child_addr)) - HPWS_CONNECT_ERROR(202, "received message on control line was not sizeof(addr_t)"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for buffer fds\n"); - - // second thing we will receive is the four fds for the buffers - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - int bytes_read = - recvmsg(child_fd[0], &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_CONNECT_ERROR(203, "non-scm_rights message sent on accept child control line"); - memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); - for (int i = 0; i < 4; ++i) - { - //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); - if (buffer_fd[i] < 0) - HPWS_CONNECT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); - mapping[i] = - mmap(0, max_buffer_size, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); - if (mapping[i] == (void *)(-1)) - HPWS_CONNECT_ERROR(204, "could not mmap scm_rights passed buffer fd"); - } - } - - for (int i = 0; i <= 1; ++i) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d\n", i, child_fd[i]); - - pfd.fd = child_fd[i]; - // now we wait for a 'r' ready message or for the socket/client to die - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - char rbuf[2]; - bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); - if (bytes_read < 1) - HPWS_CONNECT_ERROR(2, "nil message sent by hpws on startup"); - - if (rbuf[0] != 'r') - HPWS_CONNECT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); - - if (rbuf[1] != '0' + i) - HPWS_CONNECT_ERROR(4, "received wrong r message on control fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] received 'r%c' on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); - } - - return client{ - get, - child_addr, - child_fd[0], - child_fd[1], - max_buffer_size, - pid, - buffer_fd, - mapping}; - } - else - { - - // --- CHILD - if (fork_child_init) - fork_child_init(); - - ::close(fd[0]); - ::close(fd[2]); - - // dup fd[1] into fd 3 - /*if (dup2(fd[1], 3) == -1) - perror("dup2 fd[1]"); - if (dup2(fd[3], 4) == -1) - perror("dup2 fd[3]"); - */ - // ::close(fd[1]); - // ::close(fd[3]); - - // we're assuming all fds above 3 will have close_exec flag - execv(bin_path.data(), (char *const *)argv_pass); - // we will send a nil message down the pipe to help the parent know somethings gone wrong - char nil[1]; - nil[0] = 0; - send(3, nil, 1, 0); - exit(1); // execl failure as child will always result in exit here - } - - connect_error:; - - // NB: execution to here can only happen in parent process - // clean up any mess after error - if (pid > 0) - { - kill((pid_t)pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ - int status; - waitpid(pid, &status, 0 /* should we use WNOHANG? */); - } - for (int i = 0; i < 4; ++i) - { - if (fd[i] > 0) - ::close(fd[i]); - if (mapping[i] != MAP_FAILED && mapping[i] != NULL) - munmap(mapping[i], max_buffer_size); - if (buffer_fd[i] > -1) - ::close(buffer_fd[i]); - } - - return error{error_code, std::string{error_msg}}; - } - friend class server; - }; - - class server - { - - private: - pid_t server_pid_; - int master_control_fd_; - uint32_t max_buffer_size_; - bool moved = false; - - // private constructor - server(pid_t server_pid, int master_control_fd, uint32_t max_buffer_size) - : server_pid_(server_pid), master_control_fd_(master_control_fd), max_buffer_size_(max_buffer_size) {} - - void accept_cleanup(void *mapping[4], int child_fd[2], int buffer_fd[4], uint32_t pid_child) - { - for (int i = 0; i < 4; i++) - { - if (mapping[i] != MAP_FAILED && mapping[i] != NULL) - munmap(mapping[i], max_buffer_size_); - if (i < 2 && child_fd[i] > -1) - ::close(child_fd[i]); - if (buffer_fd[i] > -1) - ::close(buffer_fd[i]); - } - - if (pid_child > 0) - { - int ret1 = kill(pid_child, SIGTERM); - int wstat; - int ret2 = waitpid(pid_child, &wstat, 0); - } - } - - public: - // No copy constructor - server(const server &) = delete; - - // only a move constructor - server(server &&old) : server_pid_(old.server_pid_), - master_control_fd_(old.master_control_fd_), - max_buffer_size_(old.max_buffer_size_) - { - old.moved = true; - } - - pid_t server_pid() - { - return server_pid_; - } - - int master_control_fd() - { - return master_control_fd_; - } - - uint32_t max_buffer_size() - { - return max_buffer_size_; - } - - std::variant accept(const bool no_block = false) - { - - static int calls = 0; - ++calls; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[0] called %d\n", calls); - -#define HPWS_ACCEPT_ERROR(code, msg) \ - { \ - accept_cleanup(mapping, child_fd, buffer_fd, pid); \ - return error{code, msg}; \ - } - - int child_fd[2] = {-1, -1}; - int buffer_fd[4] = {-1, -1, -1, -1}; - void *mapping[4] = {NULL, NULL, NULL, NULL}; - // must not use pid_t here since we transfer across IPC channel as a uint32. - uint32_t pid = 0; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[1] called %d\n", calls); - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 2)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[2] called %d\n", calls); - - // If no-block is specified, we first check any bytes available on control fd - // before attempting to do a blocking a read. - if (no_block) - { - struct pollfd master_pfd; - master_pfd.fd = this->master_control_fd_; - master_pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; - const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT); - - if (master_poll_result == -1) // 1 ms timeout - HPWS_ACCEPT_ERROR(200, "poll failed on master control line"); - - if (master_poll_result == 0) // No data available - HPWS_ACCEPT_ERROR(199, "no new client available"); - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[3] called %d\n", calls); - - int bytes_read = - recvmsg(this->master_control_fd_, &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_ACCEPT_ERROR(200, "non-scm_rights message sent on master control line"); - memcpy(&child_fd, CMSG_DATA(cmsg), sizeof(child_fd)); - if (child_fd[0] < 0 || child_fd[1] < 0) - HPWS_ACCEPT_ERROR(201, "scm_rights passed fd/s were negative"); - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] On accept received SCM: child_fd[0] = %d, child_fd[1] = %d\n", - child_fd[0], child_fd[1]); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[4] called %d\n", calls); - } - - // read info from child control line with a timeout - struct pollfd pfd; - int ret; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[5] called %d\n", calls); - - pfd.fd = child_fd[0]; // expect all setup messages on the hpws->sagent controlfd (0) - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[6] called %d\n", calls); - - // timeout or error - if (ret < 1) - HPWS_ACCEPT_ERROR(202, "timeout waiting for hpws accept child message"); - - // first thing we'll receive is the pid of the client - if (recv(child_fd[0], (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid)) - HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[7] called %d\n", calls); - - // second thing we'll receive is IP address structure of the client - addr_t buf; - int bytes_read = - recv(child_fd[0], (unsigned char *)(&buf), sizeof(buf), 0); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[8] called %d\n", calls); - - if (bytes_read < sizeof(buf)) - HPWS_ACCEPT_ERROR(202, "received message on master control line was not sizeof(sockaddr_in6)"); - - // third thing we will receive is the four fds for the buffers - { - struct msghdr child_msg = {0}; - memset(&child_msg, 0, sizeof(child_msg)); - char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; - child_msg.msg_control = cmsgbuf; - child_msg.msg_controllen = sizeof(cmsgbuf); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[9] called %d\n", calls); - - int bytes_read = - recvmsg(child_fd[0], &child_msg, 0); - struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); - if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) - HPWS_ACCEPT_ERROR(203, "non-scm_rights message sent on accept child control line"); - memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[10] called %d\n", calls); - - for (int i = 0; i < 4; ++i) - { - //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); - if (buffer_fd[i] < 0) - HPWS_ACCEPT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); - mapping[i] = - mmap(0, max_buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); - if (mapping[i] == MAP_FAILED) - HPWS_ACCEPT_ERROR(204, "could not mmap scm_rights passed buffer fd"); - } - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[11] called %d\n", calls); - } - { - struct pollfd pfd; - int ret; - - for (int i = 0; i <= 1; ++i) - { - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] waiting for 'r' on child_fd[%d]=%d accept\n", i, child_fd[i]); - pfd.fd = child_fd[i]; - pfd.events = POLLERR | POLLHUP | POLLNVAL | POLLIN; - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[12] called %d\n", calls); - - // now we wait for a 'r' ready message or for the socket/client to die - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - if (!(pfd.revents & POLLIN)) - HPWS_ACCEPT_ERROR(5, "could not read from client_fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[12a] called %d - ret %d\n", calls, ret); - char rbuf[2]; - bytes_read = recv(child_fd[i], rbuf, sizeof(rbuf), 0); - if (bytes_read < 1) - HPWS_ACCEPT_ERROR(2, "nil message sent by hpws on startup on accept"); - - if (rbuf[0] != 'r') - HPWS_ACCEPT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); - - if (rbuf[1] != '0' + i) - HPWS_ACCEPT_ERROR(4, "received wrong r message on control line fd"); - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] 'r%c' received on child_fd[%d]=%d\n", rbuf[1], i, child_fd[i]); - } - - if (HPWS_DEBUG) - fprintf(stderr, "[HPWS.HPP] Accept[13] called %d\n", calls); - } - - return client{ - "", - buf, - child_fd[0], - child_fd[1], - max_buffer_size_, - (pid_t)pid, - buffer_fd, - mapping}; - } - - ~server() - { - if (!moved) - { - - // RH TODO ensure this pid terminates by following up with a SIGKILL - if (server_pid_ > 0) - { - kill(server_pid_, SIGTERM); - int status; - waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */); - } - - ::close(master_control_fd_); - } - } - - static std::variant create( - std::string_view bin_path, - uint32_t max_buffer_size, - uint16_t port, - uint32_t max_con, - uint16_t max_con_per_ip, - std::string_view cert_path, - std::string_view key_path, - std::vector argv, //additional_arguments - std::function fork_child_init = NULL) - { -#define HPWS_SERVER_ERROR(code, msg) \ - { \ - error_code = code; \ - error_msg = msg; \ - goto server_error; \ - } - - int error_code = -1; - const char *error_msg = NULL; - int fd[2] = {-1, -1}; - pid_t pid = -1; - int count_args = 17 + argv.size(); - char const **argv_pass = NULL; - - if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) - HPWS_SERVER_ERROR(100, "could not create unix domain socket pair"); - - // construct the arguments - char shm_size[32]; - - if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) - HPWS_SERVER_ERROR(90, "couldn't write shm size to string"); - - char port_str[6]; - if (snprintf(port_str, 6, "%d", port) <= 0) - HPWS_SERVER_ERROR(91, "couldn't write port to string"); - - char max_con_str[11]; - if (snprintf(max_con_str, 11, "%d", max_con) <= 0) - HPWS_SERVER_ERROR(92, "couldn't write max_con to string"); - - char max_con_per_ip_str[6]; - if (snprintf(max_con_per_ip_str, 6, "%d", max_con_per_ip) <= 0) - HPWS_SERVER_ERROR(93, "couldn't write max_con_per_ip to string"); - - argv_pass = - reinterpret_cast(alloca(sizeof(char *) * count_args)); - { - int upto = 0; - argv_pass[upto++] = bin_path.data(); - argv_pass[upto++] = "--server"; - argv_pass[upto++] = "--maxmsg"; - argv_pass[upto++] = shm_size; - argv_pass[upto++] = "--port"; - argv_pass[upto++] = port_str; - argv_pass[upto++] = "--cert"; - argv_pass[upto++] = cert_path.data(); - argv_pass[upto++] = "--key"; - argv_pass[upto++] = key_path.data(); - argv_pass[upto++] = "--cntlfd"; - argv_pass[upto++] = "3"; - argv_pass[upto++] = "--maxcon"; - argv_pass[upto++] = max_con_str; - argv_pass[upto++] = "--maxconip"; - argv_pass[upto++] = max_con_per_ip_str; - for (std::string_view &arg : argv) - argv_pass[upto++] = arg.data(); - argv_pass[upto] = NULL; - } - - pid = vfork(); - if (pid) - { - - // --- PARENT - - // Fds are set to -1, so when error occurred these fds won't get closed again. - ::close(fd[1]); - fd[1] = -1; - - int flags = fcntl(fd[0], F_GETFD, NULL); - if (flags < 0) - HPWS_SERVER_ERROR(101, "could not get flags from unix domain socket"); - - flags |= FD_CLOEXEC; - if (fcntl(fd[0], F_SETFD, flags)) - HPWS_SERVER_ERROR(102, "could notset flags for unix domain socket"); - - // we will set a timeout and wait for the initial startup message from hpws server mode - struct pollfd pfd; - int ret; - - pfd.fd = fd[0]; - pfd.events = POLLIN; - ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout - - // timeout or error - if (ret < 1) - HPWS_SERVER_ERROR(1, "timeout waiting for hpws startup message"); - - char buf[1024]; - int bytes_read = recv(fd[0], buf, sizeof(buf) - 1, 0); - if (bytes_read < 1) - { - int status; - // Wait and obtain exit status code of hpws. - if (waitpid(pid, &status, 0) > 0) - { - switch (WEXITSTATUS(status)) - { - case 70: - HPWS_SERVER_ERROR(31, "Could not create listen socket."); - - case 72: - HPWS_SERVER_ERROR(32, "Could not bind socket for listen."); - - case 74: - HPWS_SERVER_ERROR(33, "Listen() failed."); - - default: - break; - } - } - HPWS_SERVER_ERROR(2, "nil message sent by hpws on startup"); - } - - buf[bytes_read] = '\0'; - if (strncmp(buf, "startup", 7) != 0) - { - fprintf(stderr, "startup message: `%.*s`\n", bytes_read, buf); - HPWS_SERVER_ERROR(3, "unexpected content in message sent by hpws on startup"); - } - return server{ - pid, - fd[0], - max_buffer_size}; - } - else - { - - // --- CHILD - if (fork_child_init) - fork_child_init(); - - ::close(fd[0]); - - // dup fd[1] into fd 3 - dup2(fd[1], 3); - ::close(fd[1]); - - // we're assuming all fds above 3 will have close_exec flag - execv(bin_path.data(), (char *const *)argv_pass); - // we will send a nil message down the pipe to help the parent know somethings gone wrong - char nil[1]; - nil[0] = 0; - send(3, nil, 1, 0); - exit(1); // execl failure as child will always result in exit here - } - - server_error:; - - // NB: execution to here can only happen in parent process - // clean up any mess after error - if (pid > 0) - { - kill(pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ - int status; - waitpid(pid, &status, 0 /* should we use WNOHANG? */); - } - if (fd[0] > 0) - ::close(fd[0]); - if (fd[1] > 0) - ::close(fd[1]); - - return error{error_code, std::string{error_msg}}; - } - }; -} // namespace hpws - -#endif diff --git a/src/conf.cpp b/src/conf.cpp index 23225c5..f68e991 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -61,7 +61,6 @@ namespace conf cfg.hp.host_address = host_addr.empty() ? "127.0.0.1" : std::string(host_addr); cfg.hp.init_peer_port = 22861; cfg.hp.init_user_port = 8081; - cfg.server.ip_port = {"127.0.0.1", 5000}; cfg.system.max_instance_count = 5; cfg.system.max_mem_kbytes = 1024000; // Total 1GB RAM @@ -109,11 +108,12 @@ namespace conf // If data dir is not specified, use the same dir as executables. ctx.data_dir = datadir.empty() ? ctx.exe_dir : util::realpath(datadir); - ctx.hpws_exe_path = ctx.exe_dir + "/hpws"; ctx.hpfs_exe_path = ctx.exe_dir + "/hpfs"; ctx.user_install_sh = ctx.exe_dir + "/user-install.sh"; ctx.user_uninstall_sh = ctx.exe_dir + "/user-uninstall.sh"; + ctx.socket_path = ctx.data_dir + "/sa.sock"; + ctx.contract_template_path = ctx.data_dir + "/contract_template"; ctx.config_file = ctx.data_dir + "/sa.cfg"; ctx.log_dir = ctx.data_dir + "/log"; @@ -125,11 +125,10 @@ namespace conf */ int validate_dir_paths() { - const std::string paths[7] = { + const std::string paths[6] = { ctx.config_file, ctx.log_dir, ctx.data_dir, - ctx.hpws_exe_path, ctx.contract_template_path, ctx.user_install_sh, ctx.user_uninstall_sh}; @@ -140,8 +139,6 @@ namespace conf { if (path == ctx.config_file) std::cerr << path << " does not exist. Initialize with command.\n"; - else if (path == ctx.hpws_exe_path) - std::cerr << path << " binary does not exist.\n"; else std::cerr << path << " does not exist.\n"; return -1; @@ -236,35 +233,6 @@ namespace conf } } - // server - { - jpath = "server"; - - try - { - const jsoncons::ojson &server = d["server"]; - - cfg.server.ip_port.host_address = server["host"].as(); - cfg.server.ip_port.port = server["port"].as(); - - if (cfg.server.ip_port.host_address.empty()) - { - std::cerr << "Configured server host_address is empty.\n"; - return -1; - } - else if (cfg.server.ip_port.port <= 0) - { - std::cerr << "Configured server port invalid.\n"; - return -1; - } - } - catch (const std::exception &e) - { - print_missing_field_error(jpath, e); - return -1; - } - } - // system { jpath = "system"; @@ -351,16 +319,6 @@ namespace conf d.insert_or_assign("hp", hp_config); } - // Server configs. - { - jsoncons::ojson server_config; - - server_config.insert_or_assign("host", cfg.server.ip_port.host_address); - server_config.insert_or_assign("port", cfg.server.ip_port.port); - - d.insert_or_assign("server", server_config); - } - // System configs. { jsoncons::ojson system_config; diff --git a/src/conf.hpp b/src/conf.hpp index 7a843de..8ba375d 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -66,11 +66,6 @@ namespace conf size_t max_file_count = 0; // Max no. of log files to keep. }; - struct server_config - { - host_ip_port ip_port; - }; - struct hp_config { std::string host_address; @@ -95,7 +90,6 @@ namespace conf { std::string version; hp_config hp; - server_config server; system_config system; docker_config docker; log_config log; @@ -105,10 +99,11 @@ namespace conf { std::string command; // The CLI command issued to launch Sashimono agent std::string exe_dir; // Sashimono Agent executable dir. - std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. std::string contract_template_path; // Path to default contract. + std::string socket_path; // Path to the unix socket file. + std::string user_install_sh; std::string user_uninstall_sh; diff --git a/src/main.cpp b/src/main.cpp index e8527f7..7a6fef2 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -107,9 +107,6 @@ int main(int argc, char **argv) signal(SIGSEGV, &segfault_handler); signal(SIGABRT, &segfault_handler); - // Become a sub-reaper so we can gracefully reap hpws child processes via hpws.hpp. - // (Otherwise they will get reaped by OS init process and we'll end up with race conditions with gracefull kills) - prctl(PR_SET_CHILD_SUBREAPER, 1); // Disable SIGPIPE to avoid crashing on broken pipe IO. { diff --git a/src/msg/json/msg_json.cpp b/src/msg/json/msg_json.cpp index de5da5e..6775898 100644 --- a/src/msg/json/msg_json.cpp +++ b/src/msg/json/msg_json.cpp @@ -77,43 +77,6 @@ namespace msg::json return 0; } - /** - * Extracts type, id and pubkey in the msg. - * @param type Type in the message. - * @param id id in the message. - * @param pubkey Pubkey in the message. - * @param d The json document holding the read request message. - * Accepted signed input container format: - * { - * ... - * "type": "", - * "id": "", - * "owner_pubkey": "", - * ... - * } - * @return 0 on successful extraction. -1 for failure. - */ - int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d) - { - if (extract_type_and_id(type, id, d) == -1) - return -1; - - if (!d.contains(msg::FLD_PUBKEY)) - { - LOG_ERROR << "Field owner_pubkey is missing."; - return -1; - } - - if (!d[msg::FLD_PUBKEY].is()) - { - LOG_ERROR << "Invalid owner_pubkey value."; - return -1; - } - - pubkey = d[msg::FLD_PUBKEY].as(); - return 0; - } - /** * Extracts create message from msg. * @param msg Populated msg object. @@ -129,9 +92,16 @@ namespace msg::json */ int extract_create_message(create_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; + + if (!d.contains(msg::FLD_PUBKEY)) + { + LOG_ERROR << "Field owner_pubkey is missing."; + return -1; + } + if (!d.contains(msg::FLD_CONTRACT_ID)) { LOG_ERROR << "Field contract_id is missing."; @@ -144,6 +114,12 @@ namespace msg::json return -1; } + if (!d[msg::FLD_PUBKEY].is()) + { + LOG_ERROR << "Invalid owner_pubkey value."; + return -1; + } + if (!d[msg::FLD_CONTRACT_ID].is()) { LOG_ERROR << "Invalid contract_id value."; @@ -156,6 +132,7 @@ namespace msg::json return -1; } + msg.pubkey = d[msg::FLD_PUBKEY].as(); msg.contract_id = d[msg::FLD_CONTRACT_ID].as(); msg.image = d[msg::FLD_IMAGE].as(); return 0; @@ -181,7 +158,7 @@ namespace msg::json */ int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -327,7 +304,7 @@ namespace msg::json */ int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -360,7 +337,7 @@ namespace msg::json */ int extract_start_message(start_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -393,7 +370,7 @@ namespace msg::json */ int extract_stop_message(stop_msg &msg, const jsoncons::json &d) { - if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + if (extract_type_and_id(msg.type, msg.id, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) diff --git a/src/msg/json/msg_json.hpp b/src/msg/json/msg_json.hpp index 5a73d43..61bcd6e 100644 --- a/src/msg/json/msg_json.hpp +++ b/src/msg/json/msg_json.hpp @@ -14,8 +14,6 @@ namespace msg::json int extract_type_and_id(std::string &extracted_type, std::string &extracted_id, const jsoncons::json &d); - int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d); - int extract_create_message(create_msg &msg, const jsoncons::json &d); int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d); diff --git a/src/msg/msg_common.hpp b/src/msg/msg_common.hpp index 0af6479..ccba83f 100644 --- a/src/msg/msg_common.hpp +++ b/src/msg/msg_common.hpp @@ -21,7 +21,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; std::set peers; std::set unl; @@ -35,7 +34,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; @@ -43,7 +41,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; @@ -51,7 +48,6 @@ namespace msg { std::string id; std::string type; - std::string pubkey; std::string container_name; }; diff --git a/src/pchheader.hpp b/src/pchheader.hpp index 0a87404..236075a 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -18,14 +18,17 @@ #include #include #include +#include #include #include +#include #include #include #include #include #include #include +#include #include #include #include