diff --git a/src/hpsh/hpsh.cpp b/src/hpsh/hpsh.cpp index e7b5e30b..e6994e59 100644 --- a/src/hpsh/hpsh.cpp +++ b/src/hpsh/hpsh.cpp @@ -1,92 +1,254 @@ -#include "../conf.hpp" -#include "../util/util.hpp" +#include "hpsh.hpp" namespace hpsh { - pid_t hpsh_pid; - static int fd1[2]; - static int fd2[2]; + constexpr const char *HPSH_EXEC_PATH = "./executable/hpsh"; + constexpr int MAX_BUFFER_LEN = 1024; + constexpr int POLL_TIMEOUT = 1000; + constexpr uint32_t READ_BUFFER_SIZE = 128 * 1024; - int deinit() + hpsh_context ctx; + + int init() { - //kill(hpsh_pid, SIGTERM); - close(fd1[0]); - close(fd1[1]); - close(fd2[0]); - close(fd1[1]); + std::cout << "Initializing hpsh process.." << std::endl; - LOG_INFO << "HPSH stopped."; + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, ctx.control_fds) == -1) + { + std::cerr << errno << "Error initializing socket pair." << std::endl; + return -1; + } + + ctx.hpsh_pid = fork(); + if (ctx.hpsh_pid == -1) + { + std::cerr << errno << "Error forking." << std::endl; + close(ctx.control_fds[0]); + close(ctx.control_fds[1]); + return -1; + } + else if (ctx.hpsh_pid > 0) + { + close(ctx.control_fds[0]); + + ctx.watcher_thread = std::thread(response_watcher); + } + else if (ctx.hpsh_pid == 0) + { + util::fork_detach(); + + close(ctx.control_fds[1]); + + std::cout << "Starting hpsh process... " << std::endl; + + std::string fd_str; + fd_str.resize(10); + snprintf(fd_str.data(), sizeof(fd_str), "%d", ctx.control_fds[0]); + + char *argv[] = {(char *)HPSH_EXEC_PATH, (char *)("-s1"), fd_str.data(), NULL}; + execv(argv[0], argv); + + std::cerr << errno << "Error executing hpfs." << std::endl; + + close(ctx.control_fds[0]); + exit(EXIT_FAILURE); + } return 0; } - int init() + + void deinit() { - LOG_INFO << "Initializing HPSH"; + std::cout << "De-initializing hpsh process.." << std::endl; - if (socketpair(AF_UNIX, SOCK_STREAM, 0, fd1) == -1 || socketpair(AF_UNIX, SOCK_STREAM, 0, fd2) == -1) + ctx.is_shutting_down = true; + + if (ctx.hpsh_pid > 0) + kill(ctx.hpsh_pid, SIGTERM); + + // Joining consensus processing thread. + if (ctx.watcher_thread.joinable()) + ctx.watcher_thread.join(); + + // close sockets. + close(ctx.control_fds[0]); + for (const auto &command : ctx.commands) { - return -1; + close(command.child_fds[0]); + close(command.child_fds[1]); } - pid_t pid = fork(); - if (pid == -1) + if (ctx.hpsh_pid > 0) { - return -1; + // Check if the hpsh has exited voluntarily. + if (check_hpsh_exited(false) == 0) + { + // Issue kill signal to kill the hpsh process. + kill(ctx.hpsh_pid, SIGKILL); + check_hpsh_exited(true); // Blocking wait until exit. + } } - if (pid == 0) - { - hpsh_pid = getpid(); - char cfd1[10], cfd2[10]; - snprintf(cfd1, 10, "%d", fd1[0]); - snprintf(cfd2, 10, "%d", fd2[1]); + std::cout << "Stopped hpsh process.." << std::endl; + } - char *argv[] = {const_cast(conf::ctx.hpsh_exe_path.c_str()), (char *)("-s1"), cfd1, (char *)("-s2"), cfd2, NULL}; - LOG_DEBUG << "Starting HPSH Executable"; - execv(argv[0], argv); - LOG_DEBUG << "Failed to execute hpsh"; - exit(EXIT_FAILURE); - } - else - { - close(fd1[0]); - close(fd2[1]); + int check_hpsh_exited(const bool block) + { + int scstatus = 0; + const int wait_res = waitpid(ctx.hpsh_pid, &scstatus, block ? 0 : WNOHANG); + if (wait_res == 0) // Child still running. + { return 0; } + if (wait_res == -1) + { + std::cerr << errno << ": hpsh process waitpid error. pid:" << ctx.hpsh_pid << std::endl; + ctx.hpsh_pid = 0; + return -1; + } + else // Child has exited + { + ctx.hpsh_pid = 0; + + if (WIFEXITED(scstatus)) + { + std::cerr << "Contract process ended normally." << std::endl; + return 1; + } + else + { + std::cerr << "Contract process ended prematurely. Exit code " << WEXITSTATUS(scstatus) << std::endl; + return -1; + } + } } - std::string serve(const char *message) + int execute(std::string_view pubkey, std::string_view message) { - char buffer[1024]; + if (ctx.is_shutting_down) + return -1; - ssize_t bytes_written = write(fd1[1], message, strlen(message)); - if (bytes_written == -1) { - perror("Error when writing to HPSH socket"); - } - - LOG_DEBUG << "\nMessage sent from hpcore: " << message; - - while (true) + int child_fds[2]; + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, child_fds) == -1) { - int bytesRead; - bytesRead = read(fd2[0], buffer, sizeof(buffer)); - if (bytesRead < 0) - { - // Handle read error - perror("read"); - return "error when reading"; - } - - - buffer[bytesRead] = '\0'; // Null-terminate the buffer - - LOG_DEBUG << "\nMessage received in hpcore: " << buffer; - if(bytesRead<1024){ - break; - } - + std::cerr << errno << "Error initializing socket pair." << std::endl; + return -1; + } + + struct msghdr msg = {0}; + struct cmsghdr *cmsg; + char iobuf[1]; + struct iovec io = { + .iov_base = iobuf, + .iov_len = sizeof(iobuf)}; + union + { /* Ancillary data buffer, wrapped in a union + in order to ensure it is suitably aligned */ + char buf[CMSG_SPACE(sizeof(int))]; + struct cmsghdr align; + } u; + + msg.msg_iov = &io; + msg.msg_iovlen = 1; + msg.msg_control = u.buf; + msg.msg_controllen = sizeof(u.buf); + cmsg = CMSG_FIRSTHDR(&msg); + cmsg->cmsg_level = SOL_SOCKET; + cmsg->cmsg_type = SCM_RIGHTS; + cmsg->cmsg_len = CMSG_LEN(sizeof(int)); + memcpy(CMSG_DATA(cmsg), child_fds, sizeof(int)); + + if (sendmsg(ctx.control_fds[1], &msg, 0) < 0) + { + std::cerr << errno << "Error writing to control fd." << std::endl; + close(child_fds[0]); + close(child_fds[1]); + return -1; + } + + if (write(child_fds[1], message.data(), sizeof(message)) < 0) + { + std::cerr << errno << "Error writing to child fd." << std::endl; + close(child_fds[0]); + close(child_fds[1]); + return -1; + } + + { + std::scoped_lock lock(ctx.command_mutex); + ctx.commands.push_back(command_context{std::string(pubkey), {child_fds[0], child_fds[1]}, std::string(), false}); + } + + return 0; + } + + void response_watcher() + { + util::mask_signal(); + + while (!ctx.is_shutting_down) + { + if (ctx.commands.size() > 0) + { + auto itr = ctx.commands.begin(); + while (itr != ctx.commands.end()) + { + if (ctx.is_shutting_down) + break; + + struct pollfd pfd; + pfd.fd = itr->child_fds[1]; + pfd.events = POLLIN; + + if (poll(&pfd, 1, POLL_TIMEOUT) == -1) + { + std::cerr << errno << "Error in poll" << std::endl; + continue; + } + else if (pfd.revents & POLLIN) + { + itr->response.resize(READ_BUFFER_SIZE); + const int res = read(pfd.fd, itr->response.data(), READ_BUFFER_SIZE); + + if (res > 0) + itr->response.resize(res); // Resize back to the actual bytes read. + else if (res == -1) + { + // Assuming that EPIPE or ECONNRESET resulted from contract termination, consider this as a neutral read. + if (errno == EPIPE || errno == ECONNRESET) + itr->read_completed = true; + else + std::cerr << errno << "Error reading from fd" << std::endl; + } + } + else + { + itr->read_completed = true; + } + + // Send command back to user; + if (itr->read_completed) + { + std::cout << "Received full output for user " << itr->pubkey << std::endl; + std::cout << itr->response.data() << std::endl; + + std::list collected_commands; + { + std::scoped_lock lock(ctx.command_mutex); + itr = ctx.commands.erase(itr); + } + } + else + { + itr++; + } + } + } + else + { + util::sleep(1000); + } } - return buffer; } } \ No newline at end of file diff --git a/src/hpsh/hpsh.hpp b/src/hpsh/hpsh.hpp index 4d9bad19..db41dfd3 100644 --- a/src/hpsh/hpsh.hpp +++ b/src/hpsh/hpsh.hpp @@ -1,14 +1,54 @@ -#ifndef HPSH_H -#define HPSH_H +#ifndef _HP_HPSH_ +#define _HP_HPSH_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include "util.hpp" -#include "../conf.hpp" -#include "../util/util.hpp" namespace hpsh { - int deinit(); + struct command_context + { + std::string pubkey; + int child_fds[2]; + std::string response; + bool read_completed = false; + }; + + struct hpsh_context + { + std::mutex command_mutex; + std::list commands; + int control_fds[2]; + int hpsh_pid; + std::thread watcher_thread; + bool is_shutting_down; + }; + + extern hpsh_context ctx; + int init(); - std::string serve(const char* command); + + void deinit(); + + int check_hpsh_exited(const bool block); + + int execute(std::string_view pubkey, std::string_view message); + + void response_watcher(); } #endif \ No newline at end of file diff --git a/src/hpsh/util.cpp b/src/hpsh/util.cpp new file mode 100644 index 00000000..2e9d97a0 --- /dev/null +++ b/src/hpsh/util.cpp @@ -0,0 +1,45 @@ +#include "util.hpp" + +namespace util +{ + constexpr mode_t DIR_PERMS = 0755; + + /** + * Sleeps the current thread for specified no. of milliseconds. + */ + void sleep(const uint64_t milliseconds) + { + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); + } + + // Applies signal mask to the calling thread. + void mask_signal() + { + sigset_t mask; + sigemptyset(&mask); + sigaddset(&mask, SIGINT); + sigaddset(&mask, SIGPIPE); + pthread_sigmask(SIG_BLOCK, &mask, NULL); + } + + /** + * Clears signal mask and signal handlers from the caller. + * Called by other processes forked from hpcore threads so they get detatched from + * the hpcore signal setup. + */ + void fork_detach() + { + // Restore signal handlers to defaults. + signal(SIGINT, SIG_DFL); + signal(SIGSEGV, SIG_DFL); + signal(SIGABRT, SIG_DFL); + + // Remove any signal masks applied by hpcore. + sigset_t mask; + sigemptyset(&mask); + pthread_sigmask(SIG_SETMASK, &mask, NULL); + + // Set process group id (so the terminal doesn't send kill signals to forked children). + setpgrp(); + } +} // namespace util diff --git a/src/hpsh/util.hpp b/src/hpsh/util.hpp new file mode 100644 index 00000000..beb9833f --- /dev/null +++ b/src/hpsh/util.hpp @@ -0,0 +1,30 @@ +#ifndef _HP_UTIL_UTIL_ +#define _HP_UTIL_UTIL_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +/** + * Contains helper functions and data structures used by multiple other subsystems. + */ + +namespace util +{ + void sleep(const uint64_t milliseconds); + + void mask_signal(); + + void fork_detach(); + +} // namespace util + +#endif diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 87655008..6871841c 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -280,8 +280,12 @@ namespace usr { LOG_INFO << "shell input received:" << content; - std::string response = hpsh::serve(content.c_str()); - LOG_INFO << "response: " << response; + // std::string response = hpsh::serve(content.c_str()); + if (hpsh::execute(std::string("user_").append(std::to_string(1)), content.c_str()) == -1) + { + std::cout << "\nError sending message:" << content.c_str() << std::endl; + } + // LOG_INFO << "response: " << response; return 0; } diff --git a/test/bin/hpsh b/test/bin/hpsh index d58f320b..cf4cd294 100755 Binary files a/test/bin/hpsh and b/test/bin/hpsh differ