diff --git a/CMakeLists.txt b/CMakeLists.txt index eac653a4..c131209b 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -34,7 +34,6 @@ add_executable(hpcore src/hpfs/hpfs.cpp src/comm/comm_session.cpp src/comm/comm_server.cpp - src/comm/comm_client.cpp src/msg/fbuf/common_helpers.cpp src/msg/fbuf/p2pmsg_helpers.cpp src/msg/fbuf/ledger_helpers.cpp @@ -67,7 +66,7 @@ add_dependencies(hpcore add_custom_command(TARGET hpcore POST_BUILD # COMMAND strip ./build/hpcore # COMMAND strip ./build/appbill - COMMAND cp ./test/bin/websocketd ./test/bin/websocat ./test/bin/hpfs ./build/ + COMMAND cp ./test/bin/hpws ./test/bin/hpfs ./build/ ) target_precompile_headers(hpcore PUBLIC src/pchheader.hpp) @@ -77,7 +76,7 @@ target_precompile_headers(hpcore PUBLIC src/pchheader.hpp) add_custom_target(docker COMMAND mkdir -p ./test/local-cluster/bin COMMAND cp ./build/hpcore ./build/appbill ./test/local-cluster/bin/ - COMMAND cp ./test/bin/fusermount3 ./test/bin/libfuse3.so.3 ./test/bin/libblake3.so ./test/bin/websocketd ./test/bin/websocat ./test/bin/hpfs ./test/local-cluster/bin/ + COMMAND cp ./test/bin/fusermount3 ./test/bin/libfuse3.so.3 ./test/bin/libblake3.so ./test/bin/hpws ./test/bin/hpfs ./test/local-cluster/bin/ COMMAND docker build -t hpcore:latest ./test/local-cluster ) set_target_properties(docker PROPERTIES EXCLUDE_FROM_ALL TRUE) diff --git a/README.md b/README.md index 11a14830..0c33fc00 100644 --- a/README.md +++ b/README.md @@ -7,13 +7,12 @@ A C++ version of hotpocket designed for production envrionments, original protot ## Libraries * Crypto - Libsodium https://github.com/jedisct1/libsodium -* Websockets - Server: [Websocketd (forked)](https://github.com/codetsunami/websocketd) | Client: [Websocat](https://github.com/vi/websocat) | Pipe: [netcat (OpenBSD)](https://man.openbsd.org/nc.1) * jsoncons (for JSON and BSON) - https://github.com/danielaparker/jsoncons * P2P Protocol - https://google.github.io/flatbuffers * Fuse filesystem - https://github.com/libfuse/libfuse * Reader Writer Queue - https://github.com/cameron314/readerwriterqueue * Concurrent Queue - https://github.com/cameron314/concurrentqueue -* Boost Stacktrace) - https://www.boost.org +* Boost Stacktrace - https://www.boost.org ## Setting up Hot Pocket development environment Run the setup script located at the repo root (tested on Ubuntu 18.04). @@ -31,7 +30,7 @@ If you update flatbuffers message definitions, you need to run the flatbuffers c `sudo snap install flatbuffers --edge` -Example: When you make a change to `p2pmsg_content_.fbc` defnition file, you need to run this: +Example: When you make a change to `p2pmsg_content.fbc` defnition file, you need to run this: `flatc -o src/msg/fbuf/ --gen-mutable --cpp src/msg/fbuf/p2pmsg_content.fbs` @@ -52,7 +51,7 @@ Code is divided into subsystems via namespaces. **ledger::** Maintains the ledger and handles ledger syncing activites. -**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for websocketd/websocat. +**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for [hpws](https://github.com/RichardAH/hpws). **util::** Contains shared data structures/helper functions used by multiple subsystems. diff --git a/dev-setup.sh b/dev-setup.sh index e064811b..bb157c78 100755 --- a/dev-setup.sh +++ b/dev-setup.sh @@ -5,7 +5,7 @@ set -e # exit on error sudo apt-get update -sudo apt-get install -y build-essential +sudo apt-get install -y build-essential libssl-dev workdir=~/hpcore-setup diff --git a/src/comm/comm_client.cpp b/src/comm/comm_client.cpp deleted file mode 100644 index 827486fd..00000000 --- a/src/comm/comm_client.cpp +++ /dev/null @@ -1,100 +0,0 @@ -#include "comm_client.hpp" -#include "comm_session.hpp" -#include "comm_session_handler.hpp" -#include "../hplog.hpp" -#include "../util.hpp" - -namespace comm -{ - - int comm_client::start(std::string_view host, const uint16_t port, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size) - { - return start_websocat_process(host, port); - } - - void comm_client::stop() - { - if (read_fd > 0) - close(read_fd); - if (write_fd > 0) - close(write_fd); - - if (websocat_pid > 0) - util::kill_process(websocat_pid, false); // Kill websocat. - } - - int comm_client::start_websocat_process(std::string_view host, const uint16_t port) - { - // setup pipe I/O - if (pipe(read_pipe) < 0 || pipe(write_pipe) < 0) - { - LOG_ERROR << errno << ": websocat pipe creation failed."; - return -1; - } - - const pid_t pid = fork(); - - if (pid > 0) - { - // HotPocket process. - - read_fd = read_pipe[0]; - write_fd = write_pipe[1]; - - // Close unused fds by us. - close(write_pipe[0]); - close(read_pipe[1]); - - // Wait for some time and check if websocat process has closed the pipe. - util::sleep(300); - pollfd fds[1] = {{read_fd}}; - if (poll(fds, 1, 0) == -1 || (fds[0].revents & POLLHUP)) - { - close(read_fd); - close(write_fd); - - util::kill_process(pid, false); - return -1; - } - - websocat_pid = pid; - } - else if (pid == 0) - { - // Websocat process. - util::fork_detach(); - - close(write_pipe[1]); //parent write - close(read_pipe[0]); //parent read - - dup2(write_pipe[0], STDIN_FILENO); //child read - close(write_pipe[0]); - dup2(read_pipe[1], STDOUT_FILENO); //child write - close(read_pipe[1]); - - std::string url = std::string("wss://").append(host).append(":").append(std::to_string(port)); - - // Fill process args. - char *execv_args[] = { - conf::ctx.websocat_exe_path.data(), - url.data(), - (char *)"-k", // Accept invalid certificates - (char *)"-b", // Binary mode - (char *)"-E", // Close on EOF - (char *)"-q", // Quiet mode - NULL}; - - const int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": websocat process execv failed.\n"; - exit(1); - } - else - { - LOG_ERROR << errno << ": fork() failed when starting websocat process."; - return -1; - } - - return 0; - } - -} // namespace comm diff --git a/src/comm/comm_client.hpp b/src/comm/comm_client.hpp deleted file mode 100644 index 18fd38a5..00000000 --- a/src/comm/comm_client.hpp +++ /dev/null @@ -1,27 +0,0 @@ -#ifndef _HP_COMM_CLIENT_ -#define _HP_COMM_CLIENT_ - -#include "../pchheader.hpp" -#include "comm_session.hpp" - -namespace comm -{ - -class comm_client -{ - pid_t websocat_pid = 0; - int read_pipe[2]; // parent to child pipe - int write_pipe[2]; // child to parent pipe - - int start_websocat_process(std::string_view host, const uint16_t port); - -public: - int read_fd = 0, write_fd = 0; - - int start(std::string_view host, const uint16_t port, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size); - void stop(); -}; - -} // namespace comm - -#endif diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index c3d2b185..f6d2366f 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -1,74 +1,36 @@ #include "comm_server.hpp" -#include "comm_client.hpp" #include "comm_session.hpp" #include "comm_session_handler.hpp" #include "../hplog.hpp" #include "../util.hpp" #include "../bill/corebill.h" +#include "../hpws/hpws.hpp" namespace comm { + constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 16 * 1024 * 1024; int comm_server::start( - const uint16_t port, const char *domain_socket_name, const SESSION_TYPE session_type, - const bool is_binary, const bool use_size_header, const bool require_tls, - const uint64_t (&metric_thresholds)[4], const std::set &req_known_remotes, const uint64_t max_msg_size) + const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], + const std::set &req_known_remotes, const uint64_t max_msg_size) { - int accept_fd = open_domain_socket(domain_socket_name); - if (accept_fd > 0) - { - watchdog_thread = std::thread( - &comm_server::connection_watchdog, this, accept_fd, session_type, is_binary, - std::ref(metric_thresholds), req_known_remotes, max_msg_size); + const uint64_t final_max_msg_size = max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE; - inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this, session_type); - - return start_websocketd_process(port, domain_socket_name, is_binary, - use_size_header, require_tls, max_msg_size); - } - - return -1; - } - - int comm_server::open_domain_socket(const char *domain_socket_name) - { - int fd = socket(AF_UNIX, SOCK_STREAM, 0); - if (fd == -1) - { - LOG_ERROR << errno << ": Domain socket open error"; + if (start_hpws_server(port, final_max_msg_size) == -1) return -1; - } - sockaddr_un addr; - memset(&addr, 0, sizeof(addr)); - addr.sun_family = AF_UNIX; + watchdog_thread = std::thread( + &comm_server::connection_watchdog, this, session_type, + std::ref(metric_thresholds), req_known_remotes, final_max_msg_size); - strncpy(addr.sun_path, domain_socket_name, sizeof(addr.sun_path) - 1); - unlink(domain_socket_name); + inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this, session_type); - if (bind(fd, (struct sockaddr *)&addr, sizeof(addr)) == -1) - { - LOG_ERROR << errno << ": Domain socket bind error"; - return -1; - } - - if (listen(fd, 5) == -1) - { - LOG_ERROR << errno << ": Domain socket listen error"; - return -1; - } - - // Set non-blocking behaviour. - // We do this so the accept() call returns immediately without blocking the listening thread. - int flags = fcntl(fd, F_GETFL); - fcntl(fd, F_SETFL, flags | O_NONBLOCK); - - return fd; // This is the fd we should call accept() on. + return 0; } void comm_server::connection_watchdog( - const int accept_fd, const SESSION_TYPE session_type, const bool is_binary, - const uint64_t (&metric_thresholds)[4], const std::set &req_known_remotes, const uint64_t max_msg_size) + const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], + const std::set &req_known_remotes, const uint64_t max_msg_size) { util::mask_signal(); @@ -80,7 +42,7 @@ namespace comm util::sleep(100); // Accept any new incoming connection if available. - check_for_new_connection(sessions, accept_fd, session_type, is_binary, metric_thresholds, max_msg_size); + check_for_new_connection(sessions, session_type, metric_thresholds); // Restore any missing outbound connections. if (!req_known_remotes.empty()) @@ -88,100 +50,91 @@ namespace comm if (loop_counter == 20) { loop_counter = 0; - maintain_known_connections(sessions, outbound_clients, req_known_remotes, session_type, is_binary, max_msg_size, metric_thresholds); + maintain_known_connections(sessions, req_known_remotes, session_type, max_msg_size, metric_thresholds); } loop_counter++; } // Cleanup any sessions that needs closure. - std::set closed_session_fds; - for (auto &[fd, session] : sessions) + for (auto itr = sessions.begin(); itr != sessions.end();) { - if (session.state == SESSION_STATE::MUST_CLOSE) - session.close(true); + if (itr->state == SESSION_STATE::MUST_CLOSE) + itr->close(true); - if (session.state == SESSION_STATE::CLOSED) - closed_session_fds.emplace(fd); - } - for (const int fd : closed_session_fds) - { - // Delete from sessions. - sessions.erase(fd); - - // Delete from outbound clients. - const auto client_itr = outbound_clients.find(fd); - if (client_itr != outbound_clients.end()) - { - client_itr->second.stop(); - outbound_clients.erase(client_itr); - } + if (itr->state == SESSION_STATE::CLOSED) + itr = sessions.erase(itr); + else + ++itr; } } // If we reach this point that means we are shutting down. - // Close all sessions and clients - for (auto &[fd, session] : sessions) + // Close and erase all sessions. + for (comm_session &session : sessions) session.close(false); - for (auto &[fd, client] : outbound_clients) - client.stop(); + + sessions.clear(); LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped."; } void comm_server::check_for_new_connection( - std::unordered_map &sessions, const int accept_fd, - const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4], - const uint64_t max_msg_size) + std::list &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4]) { - // Accept new client connection (if available) - int client_fd = accept(accept_fd, NULL, NULL); - if (client_fd == -1 && errno != EAGAIN) - { - LOG_ERROR << errno << ": Domain socket accept error"; - } - else if (client_fd > 0) - { - // New client connected. - const std::string ip = get_cgi_ip(client_fd); - if (!ip.empty()) - { - if (corebill::is_banned(ip)) - { - LOG_DEBUG << "Dropping connection for banned host " << ip; - close(client_fd); - } - else - { - comm_session session(ip, client_fd, client_fd, session_type, is_binary, true, metric_thresholds, max_msg_size); - if (session.on_connect() == 0) - { - std::scoped_lock lock(sessions_mutex); - const auto [itr, success] = sessions.try_emplace(client_fd, std::move(session)); + std::variant accept_result = hpws_server.value().accept(true); - // Thread is seperately started after the moving operation to overcome the difficulty - // in accessing class member variables inside the thread. - // Class member variables gives unacceptable values if the thread starts before the move operation. - itr->second.start_messaging_threads(); - } - } + if (std::holds_alternative(accept_result)) + { + const hpws::error error = std::get(accept_result); + if (error.first == 199) // No client connected. + return; + + LOG_ERROR << "Error in hpws accept():" << error.first << " " << error.second; + return; + } + + // New client connected. + hpws::client client = std::move(std::get(accept_result)); + const std::variant host_result = client.host_address(); + if (std::holds_alternative(host_result)) + { + const hpws::error error = std::get(host_result); + LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; + } + else + { + const std::string &host_address = std::get(host_result); + + if (corebill::is_banned(host_address)) + { + // We just let the client object gets destructed without adding it to a session. + LOG_DEBUG << "Dropping connection for banned host " << host_address; } else { - close(client_fd); - LOG_ERROR << "Closed bad client socket: " << client_fd; + comm_session session(host_address, std::move(client), session_type, true, metric_thresholds); + if (session.on_connect() == 0) + { + std::scoped_lock lock(sessions_mutex); + comm_session &inserted_session = sessions.emplace_back(std::move(session)); + + // Thread is seperately started after the moving operation to overcome the difficulty + // in accessing class member variables inside the thread. + // Class member variables gives unacceptable values if the thread starts before the move operation. + inserted_session.start_messaging_threads(); + } } } } void comm_server::maintain_known_connections( - std::unordered_map &sessions, std::unordered_map &outbound_clients, - const std::set &req_known_remotes, const SESSION_TYPE session_type, const bool is_binary, - const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]) + std::list &sessions, const std::set &req_known_remotes, + const SESSION_TYPE session_type, const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]) { // Find already connected known remote parties list std::set known_remotes; - for (const auto &[fd, session] : sessions) + for (const comm_session &session : sessions) { if (session.state != SESSION_STATE::CLOSED && !session.known_ipport.first.empty()) known_remotes.emplace(session.known_ipport); @@ -200,28 +153,39 @@ namespace comm const uint16_t port = ipport.second; LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); - comm::comm_client client; - if (client.start(host, port, metric_thresholds, conf::cfg.peermaxsize) == -1) + std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, max_msg_size, host, port, "/", {}, util::fork_detach); + + if (std::holds_alternative(client_result)) { - LOG_ERROR << "Outbound connection attempt failed: " << host << ":" << std::to_string(port); + const hpws::error error = std::get(client_result); + LOG_ERROR << "Outbound connection hpws error:" << error.first << " " << error.second; } else { - comm::comm_session session(host, client.read_fd, client.write_fd, comm::SESSION_TYPE::PEER, is_binary, false, metric_thresholds, max_msg_size); - session.known_ipport = ipport; - if (session.on_connect() == 0) + hpws::client client = std::move(std::get(client_result)); + const std::variant host_result = client.host_address(); + if (std::holds_alternative(host_result)) { + const hpws::error error = std::get(host_result); + LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; + } + else + { + const std::string &host_address = std::get(host_result); + comm::comm_session session(host_address, std::move(client), session_type, false, metric_thresholds); + session.known_ipport = ipport; + if (session.on_connect() == 0) { std::scoped_lock lock(sessions_mutex); - const auto [itr, success] = sessions.try_emplace(client.read_fd, std::move(session)); + comm_session &inserted_session = sessions.emplace_back(std::move(session)); + // Thread is seperately started after the moving operation to overcome the difficulty // in accessing class member variables inside the thread. // Class member variables gives unacceptable values if the thread starts before the move operation. - itr->second.start_messaging_threads(); - } + inserted_session.start_messaging_threads(); - outbound_clients.emplace(client.read_fd, std::move(client)); - known_remotes.emplace(ipport); + known_remotes.emplace(ipport); + } } } } @@ -238,7 +202,7 @@ namespace comm { // Process one message from each session in round-robin fashion. std::scoped_lock lock(sessions_mutex); - for (auto &[fd, session] : sessions) + for (comm_session &session : sessions) { const int result = session.process_next_inbound_message(); @@ -258,170 +222,38 @@ namespace comm LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " message processor stopped."; } - int comm_server::start_websocketd_process( - const uint16_t port, const char *domain_socket_name, const bool is_binary, - const bool use_size_header, const bool require_tls, const uint64_t max_msg_size) + int comm_server::start_hpws_server(const uint16_t port, const uint64_t max_msg_size) { - // setup pipe for firewall - int firewall_pipe[2]; // parent to child pipe + std::variant result = hpws::server::create( + conf::ctx.hpws_exe_path, + max_msg_size, + port, + 512, // Max connections + 2, // Max connections per IP. + conf::ctx.tls_cert_file, + conf::ctx.tls_key_file, + {}, + util::fork_detach); - if (pipe(firewall_pipe)) + if (std::holds_alternative(result)) { - LOG_ERROR << errno << ": pipe() call failed for firewall"; - } - else - { - firewall_out = firewall_pipe[1]; - } - - const pid_t pid = fork(); - - if (pid > 0) - { - // HotPocket process. - - // Close the child reading end of the pipe in the parent - if (firewall_out > 0) - close(firewall_pipe[0]); - - // Wait for some time and check if websocketd is still running properly. - // Sending signal 0 to test whether process exist. - util::sleep(20); - if (util::kill_process(pid, false, 0) == -1) - return -1; - - websocketd_pid = pid; - } - else if (pid == 0) - { - // Websocketd process. - util::fork_detach(); - - // We are using websocketd forked repo: https://github.com/codetsunami/websocketd - - if (firewall_out > 0) - { - // Close parent writing end of the pipe in the child - close(firewall_pipe[1]); - // Override stdin in the child's file table - dup2(firewall_pipe[0], 0); - } - - std::vector args_vec; - args_vec.reserve(16); - - const std::string max_msg_size_str = (max_msg_size > 0) ? std::to_string(max_msg_size) : "134217728"; //128MB - - // Fill process args. - args_vec.push_back(conf::ctx.websocketd_exe_path); - - if (require_tls) - { - args_vec.push_back("--ssl"); - args_vec.push_back("--sslcert"); - args_vec.push_back(conf::ctx.tls_cert_file); - args_vec.push_back("--sslkey"); - args_vec.push_back(conf::ctx.tls_key_file); - } - - args_vec.push_back("--port"); - args_vec.push_back(std::to_string(port)); - args_vec.push_back(is_binary ? "--binary=true" : "--binary=false"); - args_vec.push_back(use_size_header ? "--sizeheader=true" : "--sizeheader=false"); - args_vec.push_back(std::string("--maxframe=").append(max_msg_size_str)); - args_vec.push_back("--loglevel=error"); - args_vec.push_back("nc"); // netcat (OpenBSD) is used for domain socket redirection. - args_vec.push_back("-U"); // Use UNIX domain socket - args_vec.push_back(domain_socket_name); - - char *execv_args[args_vec.size()]; - int idx = 0; - for (std::string &arg : args_vec) - execv_args[idx++] = arg.data(); - execv_args[idx] = NULL; - - const int ret = execv(execv_args[0], execv_args); - std::cerr << errno << ": websocketd process execv failed.\n"; - exit(1); - } - else - { - LOG_ERROR << errno << ": fork() failed when starting websocketd process."; + const hpws::error e = std::get(result); + LOG_ERROR << "Error creating hpws server:" << e.first << " " << e.second; return -1; } + hpws_server.emplace(std::move(std::get(result))); + return 0; } - void comm_server::firewall_ban(std::string_view ip, const bool unban) - { - if (firewall_out < 0) - return; - - iovec iov[]{ - {(void *)(unban ? "r" : "a"), 1}, - {(void *)ip.data(), ip.length()}}; - writev(firewall_out, iov, 2); - } - - /** - * If the fd supplied was produced by accept()ing unix domain socket connection - * the process at the other end is inspected for CGI environment variables - * and the REMOTE_ADDR variable is returned as std::string, otherwise empty string - */ - std::string comm_server::get_cgi_ip(const int fd) - { - socklen_t length; - ucred uc; - length = sizeof(struct ucred); - - // Ask the operating system for information about the other process - if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &uc, &length) == -1) - { - LOG_ERROR << errno << ": Could not retrieve PID from unix domain socket"; - return ""; - } - - // Open /proc//environ for that process - std::stringstream ss; - ss << "/proc/" << uc.pid << "/environ"; - std::string fn = ss.str(); - - const int envfd = open(fn.c_str(), O_RDONLY | O_CLOEXEC); - if (!envfd) - { - LOG_ERROR << errno << ": Could not open environ block for process on other end of unix domain socket PID=" << uc.pid; - return ""; - } - - // Read environ block - char envblock[0x7fff]; - const ssize_t bytes_read = read(envfd, envblock, 0x7fff); //0x7fff bytes is an operating system size limit for this block - close(envfd); - - // Find the REMOTE_ADDR entry. Envrion block delimited by \0 - for (char *upto = envblock, *last = envblock; upto - envblock < bytes_read; ++upto) - { - if (*upto == '\0') - { - if (upto - last > 12 && strncmp(last, "REMOTE_ADDR=", 12) == 0) - return std::string((const char *)(last + 12)); - last = upto + 1; - } - } - - LOG_ERROR << "Could not find REMOTE_ADDR variable in /proc/" << uc.pid << "/environ"; - return ""; - } - void comm_server::stop() { should_stop_listening = true; watchdog_thread.join(); - inbound_message_processor_thread.join(); + hpws_server.reset(); - if (websocketd_pid > 0) - util::kill_process(websocketd_pid, false); // Kill websocketd. + inbound_message_processor_thread.join(); } } // namespace comm diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index ec5368ac..fdddcd79 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -3,58 +3,45 @@ #include "../pchheader.hpp" #include "comm_session.hpp" -#include "comm_client.hpp" +#include "../hpws/hpws.hpp" namespace comm { class comm_server { - pid_t websocketd_pid = 0; - int firewall_out = -1; // at some point we may want to listen for firewall_in but at the moment unimplemented + std::optional hpws_server; std::thread watchdog_thread; // Connection watcher thread. std::thread inbound_message_processor_thread; // Incoming message processor thread. bool should_stop_listening = false; - // Map with read fd to connected session mappings. - std::unordered_map sessions; + std::list sessions; std::mutex sessions_mutex; - // Map with read fd to connected comm client mappings. - std::unordered_map outbound_clients; - - int open_domain_socket(const char *domain_socket_name); - void connection_watchdog( - const int accept_fd, const SESSION_TYPE session_type, const bool is_binary, - const uint64_t (&metric_thresholds)[4], const std::set &eq_known_remotes, const uint64_t max_msg_size); + const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], + const std::set &req_known_remotes, const uint64_t max_msg_size); void inbound_message_processor_loop(const SESSION_TYPE session_type); - int start_websocketd_process( - const uint16_t port, const char *domain_socket_name, const bool is_binary, - const bool use_size_header, const bool require_tls, const uint64_t max_msg_size); + int start_hpws_server(const uint16_t port, const uint64_t max_msg_size); - int poll_fds(pollfd *pollfds, const int accept_fd, const std::unordered_map &sessions); + int poll_fds(pollfd *pollfds, const int accept_fd, const std::list &sessions); void check_for_new_connection( - std::unordered_map &sessions, const int accept_fd, - const SESSION_TYPE session_type, const bool is_binary, const uint64_t (&metric_thresholds)[4], - const uint64_t max_msg_size); + std::list &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4]); void maintain_known_connections( - std::unordered_map &sessions, std::unordered_map &outbound_clients, - const std::set &req_known_remotes, const SESSION_TYPE session_type, const bool is_binary, - const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]); + std::list &sessions, const std::set &req_known_remotes, + const SESSION_TYPE session_type, const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]); std::string get_cgi_ip(const int fd); public: // Start accepting incoming connections int start( - const uint16_t port, const char *domain_socket_name, const SESSION_TYPE session_type, - const bool is_binary, const bool use_size_header, const bool require_tls, - const uint64_t (&metric_thresholds)[4], const std::set &req_known_remotes, const uint64_t max_msg_size); + const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], + const std::set &req_known_remotes, const uint64_t max_msg_size); void stop(); void firewall_ban(std::string_view ip, const bool unban); }; diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index d1b9734d..d01889cc 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -6,28 +6,25 @@ #include "../util.hpp" #include "../conf.hpp" #include "../bill/corebill.h" +#include "../hpws/hpws.hpp" namespace comm { constexpr uint32_t INTERVALMS = 60000; - constexpr uint8_t SIZE_HEADER_LEN = 8; - constexpr short READER_POLL_EVENTS = POLLIN | POLLRDHUP; // Global instances of user and peer session handlers. usr::user_session_handler user_sess_handler; p2p::peer_session_handler peer_sess_handler; comm_session::comm_session( - std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, - const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size) + std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type, + const bool is_inbound, const uint64_t (&metric_thresholds)[4]) - : read_fd(read_fd), - write_fd(write_fd), + : address(ip), + hpws_client(std::move(hpws_client)), session_type(session_type), - uniqueid(std::to_string(read_fd).append(":").append(ip)), - is_binary(is_binary), + uniqueid(ip), is_inbound(is_inbound), - max_msg_size(max_msg_size), in_msg_queue(32) { // Create new session_thresholds and insert it to thresholds vector. @@ -53,31 +50,33 @@ namespace comm while (state != SESSION_STATE::CLOSED) { - pollfd pollfds[1] = {{read_fd, READER_POLL_EVENTS}}; - - if (poll(pollfds, 1, 20) == -1) - { - LOG_ERROR << errno << ": Session reader poll failed."; - break; - } - - const short result = pollfds[0].revents; bool should_disconnect = false; + hpws::client &client = hpws_client.value(); - if (result & POLLIN) + std::variant read_result = client.read(); + if (std::holds_alternative(read_result)) { - // read_result -1 means error and we should disconnect the client. - // read_result 0 means no bytes were read. - // read_result 1 means some bytes were read. - // read_result 2 means full message were read and processed successfully. - const int read_result = attempt_read(); - - if (read_result == -1) - should_disconnect = true; - } - - if (!should_disconnect && (result & (POLLERR | POLLHUP | POLLRDHUP | POLLNVAL))) should_disconnect = true; + const hpws::error error = std::get(read_result); + if (error.first != 1) // 1 indicates channel has closed. + LOG_DEBUG << "hpws client read failed:" << error.first << " " << error.second; + } + else + { + // Enqueue the message for processing. + std::string_view data = std::get(read_result); + std::vector msg(data.size()); + memcpy(msg.data(), data.data(), data.size()); + in_msg_queue.enqueue(std::move(msg)); + + // Signal the hpws client that we are ready for next message. + std::optional error = client.ack(data); + if (error.has_value()) + { + LOG_DEBUG << "hpws client ack failed:" << error.value().first << " " << error.value().second; + should_disconnect = true; + } + } if (should_disconnect) { @@ -99,50 +98,6 @@ namespace comm return peer_sess_handler.on_connect(*this); } - /** - * Attempts to read message data from the given socket fd and passes the message on to the session. - * @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some - * bytes were read but a full message is not yet formed. 2 if a fully formed message has been - * read into the read buffer. - */ - int comm_session::attempt_read() - { - size_t available_bytes = 0; - if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 || - (max_msg_size > 0 && - available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0)))) - return -1; - - int res = 0; - - // Try to read a complete message using available bytes. - if (available_bytes > 0) - { - increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, available_bytes); - - if (is_binary) - { - res = attempt_binary_msg_construction(available_bytes); - } - else - { - read_buffer.resize(available_bytes); - res = read(read_fd, read_buffer.data(), available_bytes) < available_bytes ? -1 : 2; - } - - if (res == 2) // Full message has been read into read buffer. - { - std::vector msg; - msg.swap(read_buffer); - read_buffer_filled_size = 0; - - in_msg_queue.enqueue(std::move(msg)); - } - } - - return res; - } - /** * Processes the next queued message (if any). * @return 0 if no messages in queue. 1 if message was processed. -1 means session must be closed. @@ -182,57 +137,25 @@ namespace comm */ int comm_session::send(std::string_view message) { - // Making a copy of the message before it is destroyed from the parent scope. - std::string msg(message); - if (state == SESSION_STATE::CLOSED) return -1; - // Passing the ownership of msg to the queue using move operator for memory efficiency. - out_msg_queue.enqueue(std::move(msg)); - + // Passing the ownership of message to the queue. + out_msg_queue.enqueue(std::string(message)); return 0; } /** - * This function constructs and sends the message to the node from the given message. + * This function constructs and sends the message to the target from the given message. * @param message Message to be sent via the pipe. * @return 0 on successful message sent and -1 on error. */ int comm_session::process_outbound_message(std::string_view message) { - // Prepare the memory segments to map with writev(). - iovec memsegs[2]; - uint8_t header_buf[SIZE_HEADER_LEN] = {0, 0, 0, 0, 0, 0, 0, 0}; - - if (is_binary) + std::optional error = hpws_client.value().write(message); + if (error.has_value()) { - // In binary mode, we need to prefix every message with the message size header. - uint32_t len = message.length(); - - // Reserve the first 4 bytes for future (TODO). - header_buf[4] = len >> 24; - header_buf[5] = (len >> 16) & 0xff; - header_buf[6] = (len >> 8) & 0xff; - header_buf[7] = len & 0xff; - - memsegs[0].iov_base = header_buf; - memsegs[0].iov_len = SIZE_HEADER_LEN; - memsegs[1].iov_base = (char *)message.data(); - memsegs[1].iov_len = message.length(); - } - else - { - // In text mode, we need to append every message with '\n' - memsegs[0].iov_base = (char *)message.data(); - memsegs[0].iov_len = message.length(); - memsegs[1].iov_base = (char *)"\n"; - memsegs[1].iov_len = 1; - } - - if (writev(write_fd, memsegs, 2) == -1) - { - LOG_ERROR << errno << ": Session " << uniqueid.substr(0, 10) << " send writev failed."; + LOG_DEBUG << "hpws client write failed:" << error.value().first << " " << error.value().second; return -1; } return 0; @@ -294,85 +217,21 @@ namespace comm } state = SESSION_STATE::CLOSED; - ::close(read_fd); - if (read_fd != write_fd) - ::close(write_fd); - // Wait untill both reader & writer threads gracefully stop. - reader_thread.join(); + // Destruct the hpws client instance so it will close the sockets and related processes. + hpws_client.reset(); + + // Wait untill reader/writer threads gracefully stop. writer_thread.join(); + reader_thread.join(); LOG_DEBUG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " << uniqueid.substr(0, 10) << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); } /** - * Attempts to construct the full binary message pending to be read. Only relevant for Binary mode. - * @param available_bytes Count of bytes that is available to read from the client socket. - * @return -1 on error and client must be disconnected. 0 if no message data bytes were read. 1 if some - * bytes were read but a full message is not yet formed. 2 if a fully formed message has been - * read into the read buffer. - */ - int comm_session::attempt_binary_msg_construction(const size_t available_bytes) - { - // If we have previously encountered a size header and we are waiting until all message - // bytes are received, we must have the expected message size > 0. - - size_t data_bytes = available_bytes; - - // If we are not tracking a previous size header, then we must check for a size header. - if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN) - { - // Read the size header. - uint8_t header_buf[SIZE_HEADER_LEN]; - if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1) - return -1; // Indicates that we should disconnect the client. - - data_bytes -= SIZE_HEADER_LEN; - - // We are using last 4 bytes (big endian) in the header for the message size. - uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7]; - - // Remember the expected msg size until sufficient bytes are available. - expected_msg_size = upcoming_msg_size; - read_buffer.resize(expected_msg_size); - } - - if (expected_msg_size > 0 && data_bytes > 0) - { - // Claculate bytes remaining to form complete message. - const size_t remaining_len = expected_msg_size - read_buffer_filled_size; - - // We know expected message size, and enough bytes are available to read complete expected message. - if (data_bytes >= remaining_len) - { - // Complete the buffer by reading remaining bytes. - if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1) - return -1; // Indicates that we should disconnect the client. - read_buffer_filled_size += remaining_len; - - const size_t read_len = expected_msg_size; - expected_msg_size = 0; // reset the expected msg size. - - return 2; // Full message has been read. - } - else - { - // Collect any available bytes to the buffer. - if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1) - return -1; // Indicates that we should disconnect the client. - read_buffer_filled_size += data_bytes; - - return 1; // Some bytes were read, but full message is not yet formed. - } - } - - return 0; // No message data bytes was read. - } - - /** - * Set thresholds to the socket session -*/ + * Set thresholds to the socket session + */ void comm_session::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms) { session_threshold &t = thresholds[threshold_type]; @@ -382,9 +241,9 @@ namespace comm } /* -* Increment the provided thresholds counter value with the provided amount and validate it against the -* configured threshold limit. -*/ + * Increment the provided thresholds counter value with the provided amount and validate it against the + * configured threshold limit. + */ void comm_session::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount) { session_threshold &t = thresholds[threshold_type]; diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 01822df5..cb75ede7 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -4,6 +4,7 @@ #include "../pchheader.hpp" #include "comm_session_threshold.hpp" #include "../conf.hpp" +#include "../hpws/hpws.hpp" namespace comm { @@ -34,30 +35,22 @@ namespace comm */ class comm_session { - const int read_fd = 0; - const int write_fd = 0; + private: + std::optional hpws_client; const SESSION_TYPE session_type; - const uint64_t max_msg_size = 0; std::vector thresholds; // track down various communication thresholds - uint32_t expected_msg_size = 0; // Next expected message size based on size header. - std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. - uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. - std::thread reader_thread; // The thread responsible for reading messages from the read fd. std::thread writer_thread; // The thread responsible for writing messages to the write fd. moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. void reader_loop(); - int attempt_read(); - int attempt_binary_msg_construction(const size_t available_bytes); public: - const std::string address; // IP address of the remote party. - const bool is_binary; const bool is_inbound; bool is_self = false; + const std::string address; // IP address of the remote party. std::string uniqueid; std::string issued_challenge; conf::ip_port_pair known_ipport; @@ -65,8 +58,8 @@ namespace comm CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED; comm_session( - std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, - const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size); + std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type, + const bool is_inbound, const uint64_t (&metric_thresholds)[4]); int on_connect(); void start_messaging_threads(); int process_next_inbound_message(); diff --git a/src/conf.cpp b/src/conf.cpp index 6dbe30d4..d79f0236 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -91,7 +91,6 @@ namespace conf cfg.peerport = 22860; cfg.roundtime = 1000; cfg.pubport = 8080; - cfg.pubtls = true; #ifndef NDEBUG cfg.loglevel_type = conf::LOG_SEVERITY::DEBUG; @@ -141,8 +140,7 @@ namespace conf // Take the parent directory path. ctx.exe_dir = dirname(exepath.data()); - ctx.websocketd_exe_path = ctx.exe_dir + "/" + "websocketd"; - ctx.websocat_exe_path = ctx.exe_dir + "/" + "websocat"; + ctx.hpws_exe_path = ctx.exe_dir + "/" + "hpws"; ctx.hpfs_exe_path = ctx.exe_dir + "/" + "hpfs"; ctx.contract_dir = basedir; @@ -281,9 +279,6 @@ namespace conf cfg.pubport = d["pubport"].as(); cfg.roundtime = d["roundtime"].as(); - if (d.contains("pubtls")) // For backwards compatibility. - cfg.pubtls = d["pubtls"].as(); - cfg.pubmaxsize = d["pubmaxsize"].as(); cfg.pubmaxcpm = d["pubmaxcpm"].as(); cfg.pubmaxbadmpm = d["pubmaxbadmpm"].as(); @@ -352,7 +347,6 @@ namespace conf d.insert_or_assign("pubport", cfg.pubport); d.insert_or_assign("roundtime", cfg.roundtime); - d.insert_or_assign("pubtls", cfg.pubtls); d.insert_or_assign("pubmaxsize", cfg.pubmaxsize); d.insert_or_assign("pubmaxcpm", cfg.pubmaxcpm); d.insert_or_assign("pubmaxbadmpm", cfg.pubmaxbadmpm); diff --git a/src/conf.hpp b/src/conf.hpp index 52e0d2dc..11ed1a8f 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -36,8 +36,7 @@ namespace conf { std::string command; // The CLI command issued to launch HotPocket std::string exe_dir; // Hot Pocket executable dir. - std::string websocketd_exe_path; // Websocketd executable file path. - std::string websocat_exe_path; // Websocketd executable file path. + std::string hpws_exe_path; // hpws executable file path. std::string hpfs_exe_path; // hpfs executable file path. std::string contract_dir; // Contract base directory full path @@ -77,7 +76,6 @@ namespace conf uint16_t roundtime = 0; // Consensus round time in ms uint16_t pubport = 0; // Listening port for public user connections - bool pubtls = true; // Whether user connections are secured with TLS. uint64_t pubmaxsize = 0; // User message max size in bytes uint64_t pubmaxcpm = 0; // User message rate (characters(bytes) per minute) uint64_t pubmaxbadmpm = 0; // User bad messages per minute diff --git a/src/hpws/hpws.hpp b/src/hpws/hpws.hpp new file mode 100644 index 00000000..004f2137 --- /dev/null +++ b/src/hpws/hpws.hpp @@ -0,0 +1,873 @@ +#ifndef HPWS_INCLUDE +#define HPWS_INCLUDE +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#define DECODE_O_SIZE(control_msg, into) \ + { \ + into = ((uint32_t)control_msg[2] << 24) + ((uint32_t)control_msg[3] << 16) + \ + ((uint32_t)control_msg[4] << 8) + ((uint32_t)control_msg[5] << 0); \ + } + +#define ENCODE_O_SIZE(control_msg, from) \ + { \ + uint32_t f = from; \ + control_msg[2] = (unsigned char)((f >> 24) & 0xff); \ + control_msg[3] = (unsigned char)((f >> 16) & 0xff); \ + control_msg[4] = (unsigned char)((f >> 8) & 0xff); \ + control_msg[5] = (unsigned char)((f >> 0) & 0xff); \ + } + +#define HPWS_DEBUG 0 + +namespace hpws +{ + /*typedef enum e_retcode { + SUCCESS + } retcode; + */ + using error = std::pair; + +// used when waiting for messages that should already be on the pipe +#define HPWS_SMALL_TIMEOUT 10 +// used when waiting for server process to spawn +#define HPWS_LONG_TIMEOUT 2500 + + typedef union + { + struct sockaddr sa; + struct sockaddr_in sin; + struct sockaddr_in6 sin6; + struct sockaddr_storage ss; + } addr_t; + + class server; + + class client + { + + private: + pid_t child_pid = 0; // if this client was created by a connect this is set + // this value can't be changed once it's established between the processes + uint32_t max_buffer_size; + bool moved = false; + addr_t endpoint; + std::string get; // the get req this websocket was opened with + int control_line_fd; + int buffer_fd[4]; // 0 1 - in buffers, 2 3 - out buffers + int buffer_lock[2] = {0, 0}; // this records if buffers 2 and 3 have been sent out awaiting an ack or not + void *buffer[4]; + int pending_read[2] = {0, 0}; // if we receive a read message in a non-read function we place the pending size + // here in position 0 if buffer 0 and position 1 if buffer 1, then when read is + // called we return immediately with the content + // to prevent pending buffers becoming out of order a read counter is kept incrementing for each read + // and when there are pending reads the counter at the time of the read is inserted into this array + uint64_t pending_read_counter[2] = {0xFFFFFFFFFFFFFFFFULL, 0xFFFFFFFFFFFFFFFFULL}; + uint64_t read_counter = 0; + + // private constructor + client( + std::string_view get, + addr_t endpoint, + int control_line_fd, + uint32_t max_buffer_size, + pid_t child_pid, + int buffer_fd[4], + void *buffer[4]) : endpoint(endpoint), + control_line_fd(control_line_fd), + max_buffer_size(max_buffer_size), + child_pid(child_pid), get(get) + { + for (int i = 0; i < 4; ++i) + { + this->buffer[i] = buffer[i]; + this->buffer_fd[i] = buffer_fd[i]; + } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] child constructed pid = %d\n", child_pid); + } + + public: + // No copy constructor + client(const client &) = delete; + + // only a move constructor + client(client &&old) : child_pid(old.child_pid), + max_buffer_size(old.max_buffer_size), + endpoint(old.endpoint), + control_line_fd(old.control_line_fd), + get(old.get) + { + old.moved = true; + for (int i = 0; i < 4; ++i) + { + this->buffer[i] = old.buffer[i]; + this->buffer_fd[i] = old.buffer_fd[i]; + } + + for (int i = 0; i < 2; ++i) + { + buffer_lock[i] = old.buffer_lock[i]; + pending_read[i] = old.pending_read[i]; + pending_read_counter[i] = old.pending_read_counter[i]; + read_counter = old.read_counter; + } + } + + ~client() + { + if (!moved) + { + + // RH TODO ensure this pid terminates by following up with a SIGKILL + if (child_pid > 0) + { + kill(child_pid, SIGTERM); + int status; + waitpid(child_pid, &status, 0 /* should we use WNOHANG? */); + } + + for (int i = 0; i < 4; ++i) + { + munmap(buffer[i], max_buffer_size); + close(buffer_fd[i]); + } + + close(control_line_fd); + } + } + + const std::variant host_address() + { + char hostname[NI_MAXHOST]; + const int ret = getnameinfo((sockaddr *)&endpoint, sizeof(sockaddr), hostname, sizeof(hostname), NULL, 0, NI_NUMERICHOST); + if (ret != 0) + return error{10, gai_strerror(ret)}; + + return hostname; + } + + std::variant read() + { + + unsigned char buf[32]; + int bytes_read = 0; + + read_start:; + + // if during writng we got a read message it's queued as a pending read, so process that first + int do_pending_read = -1; + if (pending_read[0] || pending_read[1]) + do_pending_read = (pending_read_counter[0] > pending_read_counter[1] ? 1 : 0); + + if (do_pending_read > -1) + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] pending read from buffer %d\n", do_pending_read); + bytes_read = pending_read[do_pending_read]; + uint32_t len = pending_read[do_pending_read]; + pending_read[do_pending_read] = 0; + pending_read_counter[do_pending_read] = 0xFFFFFFFFFFFFFFFFULL; + return std::string_view{(const char *)(buffer[do_pending_read]), len}; + } + else + { + + bytes_read = recv(control_line_fd, buf, sizeof(buf), 0); + if (bytes_read < 1) + { + if (HPWS_DEBUG) + { + perror("recv"); + fprintf(stderr, "[HPWS.HPP] bytes received %d\n", bytes_read); + } + return error{1, "[read] control line could not be read"}; // todo clean up somehow? + } + } + + switch (buf[0]) + { + case 'o': + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] o message received\n"); + + if (bytes_read != 6) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + ++read_counter; + // there's a pending buffer for us + uint32_t len = 0; + DECODE_O_SIZE(buf, len); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] o message len: %u\n", len); + + int bufno = buf[1] - '0'; + if (bufno != 0 && bufno != 1) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + + if (HPWS_DEBUG) + { + fprintf(stderr, "[HPWS.HPP] read %d\n", len); + for (uint32_t i = 0; i < len; ++i) + putc(((char *)(buffer[bufno]))[i], stderr); + fprintf(stderr, "\n---\n"); + } + return std::string_view{(const char *)(buffer[bufno]), len}; + } + case 'a': + { + if (bytes_read != 2) + return error{4, "received an ack longer than 2 bytes"}; + int bufno = buf[1] - '0'; + if (!(bufno == 0 || bufno == 1)) + return error{5, "received an ack with an invalid buffer, expecting 0 or 1"}; + // unlock the buffer + buffer_lock[bufno] = 0; + goto read_start; + } + case 'c': + return error{1000, "ws closed"}; + default: + fprintf(stderr, "[HPWS.HPP] read unknown control message 1: `%.*s`\n", bytes_read, buf); + return error{2, "unknown control line command was sent by hpws"}; + } + } + + std::optional write(std::string_view to_write) + { + // check if we have any free buffers + if (buffer_lock[0] && buffer_lock[1]) + { + // no free buffers, wait for a ack + unsigned char buf[32]; + int bytes_read = 0; + + write_start:; + bytes_read = recv(control_line_fd, buf, sizeof(buf), 0); + if (bytes_read < 1) + { + perror("recv"); + return error{1, "[write] control line could not be read"}; // todo clean up somehow? + } + + switch (buf[0]) + { + case 'o': + { + if (bytes_read != 6) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + ++read_counter; + uint32_t len = 0; + DECODE_O_SIZE(buf, len); + + int bufno = buf[1] - '0'; + if (bufno != 0 && bufno != 1) + return error{3, "invalid buffer in 'o' command sent by hpws"}; + pending_read[bufno] = len; + pending_read_counter[bufno] = read_counter; + goto write_start; + } + case 'a': + { + if (bytes_read != 2) + return error{4, "received an ack longer than 2 bytes"}; + int bufno = buf[1] - '0'; + if (!(bufno == 0 || bufno == 1)) + return error{5, "received an ack with an invalid buffer, expecting 0 or 1"}; + // unlock the buffer + buffer_lock[bufno] = 0; + break; + } + case 'c': + return error{1000, "ws closed"}; + default: + fprintf(stderr, "[HPWS.HPP] read unknown control message 2: `%.*s`\n", bytes_read, buf); + return error{2, "unknown control line command was sent by hpws"}; + } + } + + // execution to here ensures at least one buffer is free + int bufno = (buffer_lock[0] == 0 ? 2 : 3); + + // update the selected buffer lock + buffer_lock[bufno - 2] = 1; + + // write into the buffer + memcpy(buffer[bufno], to_write.data(), to_write.size()); + + // send the control message informing hpws that a message is ready on this buffer + uint32_t len = to_write.size(); + char buf[6] = {'o', (char)('0' + (bufno - 2)), 0, 0, 0, 0}; + ENCODE_O_SIZE(buf, len); + + if (::write(control_line_fd, buf, 6) != 6) + return error{6, "could not write o message to control line"}; + + return std::nullopt; + } + + std::optional ack(std::string_view from_read) + { + char msg[2] = {'a', '0'}; + if (from_read.data() == buffer[1]) + msg[1]++; + if (send(control_line_fd, msg, 2, 0) < 2) + return error{10, "could not send ack down control line"}; + return std::nullopt; + } + + static std::variant connect( + std::string_view bin_path, + uint32_t max_buffer_size, + std::string_view host, + uint16_t port, + std::string_view get, + std::vector argv, + std::function fork_child_init = NULL) + { + +#define HPWS_CONNECT_ERROR(code, msg) \ + { \ + error_code = code; \ + error_msg = msg; \ + goto connect_error; \ + } + + int error_code = -1; + const char *error_msg = NULL; + int fd[2] = {-1, -1}; + int pid = -1; + int count_args = 12 + argv.size(); + char const **argv_pass = NULL; + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) + HPWS_CONNECT_ERROR(100, "could not create unix domain socket pair"); + + // construct the arguments + char shm_size[32]; + + if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) + HPWS_CONNECT_ERROR(90, "couldn't write shm size to string"); + + char port_str[6]; + if (snprintf(port_str, 6, "%d", port) <= 0) + HPWS_CONNECT_ERROR(91, "couldn't write port to string"); + + argv_pass = + reinterpret_cast(alloca(sizeof(char *) * count_args)); + { + int upto = 0; + argv_pass[upto++] = bin_path.data(); + argv_pass[upto++] = "--client"; + argv_pass[upto++] = "--maxmsg"; + argv_pass[upto++] = shm_size; + argv_pass[upto++] = "--host"; + argv_pass[upto++] = host.data(); + argv_pass[upto++] = "--port"; + argv_pass[upto++] = port_str; + argv_pass[upto++] = "--cntlfd"; + argv_pass[upto++] = "3"; + argv_pass[upto++] = "--get"; + argv_pass[upto++] = get.data(); + for (std::string_view &arg : argv) + argv_pass[upto++] = arg.data(); + argv_pass[upto] = NULL; + } + + pid = vfork(); + + if (pid) + { + + // --- PARENT + + close(fd[1]); + + int child_fd = fd[0]; + + int flags = fcntl(child_fd, F_GETFD, NULL); + if (flags < 0) + HPWS_CONNECT_ERROR(101, "could not get flags from unix domain socket"); + + flags |= FD_CLOEXEC; + if (fcntl(child_fd, F_SETFD, flags)) + HPWS_CONNECT_ERROR(102, "could notset flags for unix domain socket"); + + // we will set a timeout and wait for the initial startup message from hpws client mode + struct pollfd pfd; + int ret; + + pfd.fd = child_fd; + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + // timeout or error + if (ret < 1) + HPWS_CONNECT_ERROR(1, "timeout waiting for hpws connect message"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for addr_t\n"); + // first thing we'll receive is the sockaddr union + addr_t child_addr; + + int bytes_read = + recv(child_fd, (unsigned char *)(&child_addr), sizeof(child_addr), 0); + + if (bytes_read < sizeof(child_addr)) + HPWS_CONNECT_ERROR(202, "received message on control line was not sizeof(addr_t)"); + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for buffer fds\n"); + + // second thing we will receive is the four fds for the buffers + int buffer_fd[4] = {-1, -1, -1, -1}; + void *mapping[4]; + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + int bytes_read = + recvmsg(child_fd, &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + HPWS_CONNECT_ERROR(203, "non-scm_rights message sent on accept child control line"); + memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); + for (int i = 0; i < 4; ++i) + { + //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); + if (buffer_fd[i] < 0) + HPWS_CONNECT_ERROR(203, "child accept scm_rights a passed buffer fd was negative"); + mapping[i] = + mmap(0, max_buffer_size, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); + if (mapping[i] == (void *)(-1)) + HPWS_CONNECT_ERROR(204, "could not mmap scm_rights passed buffer fd"); + } + } + + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for 'r'\n"); + + // now we wait for a 'r' ready message or for the socket/client to die + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + char rbuf[1]; + bytes_read = recv(fd[0], rbuf, sizeof(rbuf), 0); + if (bytes_read < 1) + HPWS_CONNECT_ERROR(2, "nil message sent by hpws on startup"); + + if (rbuf[0] != 'r') + HPWS_CONNECT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); + + return client{ + get, + child_addr, + child_fd, + max_buffer_size, + pid, + buffer_fd, + mapping}; + } + else + { + + // --- CHILD + if (fork_child_init) + fork_child_init(); + + close(fd[0]); + + // dup fd[1] into fd 3 + dup2(fd[1], 3); + close(fd[1]); + + // we're assuming all fds above 3 will have close_exec flag + execv(bin_path.data(), (char *const *)argv_pass); + // we will send a nil message down the pipe to help the parent know somethings gone wrong + char nil[1]; + nil[0] = 0; + send(3, nil, 1, 0); + exit(1); // execl failure as child will always result in exit here + } + + connect_error:; + + // NB: execution to here can only happen in parent process + // clean up any mess after error + if (pid > 0) + { + kill((pid_t)pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ + int status; + waitpid(pid, &status, 0 /* should we use WNOHANG? */); + } + if (fd[0] > 0) + close(fd[0]); + if (fd[1] > 0) + close(fd[1]); + + return error{error_code, std::string{error_msg}}; + } + friend class server; + }; + + class server + { + + private: + pid_t server_pid_; + int master_control_fd_; + uint32_t max_buffer_size_; + bool moved = false; + + // private constructor + server(pid_t server_pid, int master_control_fd, uint32_t max_buffer_size) + : server_pid_(server_pid), master_control_fd_(master_control_fd), max_buffer_size_(max_buffer_size) {} + + public: + // No copy constructor + server(const server &) = delete; + + // only a move constructor + server(server &&old) : server_pid_(old.server_pid_), + master_control_fd_(old.master_control_fd_), + max_buffer_size_(old.max_buffer_size_) + { + old.moved = true; + } + + pid_t server_pid() + { + return server_pid_; + } + + int master_control_fd() + { + return master_control_fd_; + } + + uint32_t max_buffer_size() + { + return max_buffer_size_; + } + + std::variant accept(const bool no_block = false) + { +#define HPWS_ACCEPT_ERROR(code, msg) \ + { \ + return error{code, msg}; \ + } + + int child_fd = -1; + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int))]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + // If no-block is specified, we first check any bytes available on control fd + // before attempting to do a blocking a read. + if (no_block) + { + struct pollfd master_pfd; + master_pfd.fd = this->master_control_fd_; + master_pfd.events = POLLIN; + const int master_poll_result = poll(&master_pfd, 1, HPWS_SMALL_TIMEOUT); + + if (master_poll_result == -1) // 1 ms timeout + HPWS_ACCEPT_ERROR(200, "poll failed on master control line"); + + if (master_poll_result == 0) // No data available + HPWS_ACCEPT_ERROR(199, "no new client available"); + } + + int bytes_read = + recvmsg(this->master_control_fd_, &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + HPWS_ACCEPT_ERROR(200, "non-scm_rights message sent on master control line"); + memcpy(&child_fd, CMSG_DATA(cmsg), sizeof(child_fd)); + if (child_fd < 0) + HPWS_ACCEPT_ERROR(201, "scm_rights passed fd was negative"); + } + + // read info from child control line with a timeout + struct pollfd pfd; + int ret; + + pfd.fd = child_fd; + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_SMALL_TIMEOUT); // 1 ms timeout + + // timeout or error + if (ret < 1) + return error{202, "timeout waiting for hpws accept child message"}; + + // first thing we'll receive is the pid of the client + // must not use pid_t here since we transfer across IPC channel as a uint32. + uint32_t pid = 0; + if (recv(child_fd, (unsigned char *)(&pid), sizeof(pid), 0) < sizeof(pid)) + HPWS_ACCEPT_ERROR(212, "did not receive expected 4 byte pid of child process on accept"); + + // second thing we'll receive is IP address structure of the client + addr_t buf; + int bytes_read = + recv(child_fd, (unsigned char *)(&buf), sizeof(buf), 0); + + if (bytes_read < sizeof(buf)) + return error{202, "received message on master control line was not sizeof(sockaddr_in6)"}; + + // third thing we will receive is the four fds for the buffers + int buffer_fd[4] = {-1, -1, -1, -1}; + void *mapping[4]; + { + struct msghdr child_msg = {0}; + memset(&child_msg, 0, sizeof(child_msg)); + char cmsgbuf[CMSG_SPACE(sizeof(int) * 4)]; + child_msg.msg_control = cmsgbuf; + child_msg.msg_controllen = sizeof(cmsgbuf); + + int bytes_read = + recvmsg(child_fd, &child_msg, 0); + struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg); + if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS) + return error{203, "non-scm_rights message sent on accept child control line"}; + memcpy(&buffer_fd, CMSG_DATA(cmsg), sizeof(buffer_fd)); + for (int i = 0; i < 4; ++i) + { + //fprintf(stderr, "scm passed buffer_fd[%d] = %d\n", i, buffer_fd[i]); + if (buffer_fd[i] < 0) + return error{203, "child accept scm_rights a passed buffer fd was negative"}; + mapping[i] = + mmap(0, max_buffer_size_, PROT_READ | PROT_WRITE, MAP_SHARED, buffer_fd[i], 0); + if (mapping[i] == (void *)(-1)) + return error{204, "could not mmap scm_rights passed buffer fd"}; + } + } + { + if (HPWS_DEBUG) + fprintf(stderr, "[HPWS.HPP] waiting for 'r' on accept\n"); + struct pollfd pfd; + int ret; + + pfd.fd = child_fd; + pfd.events = POLLIN; + // now we wait for a 'r' ready message or for the socket/client to die + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + char rbuf[1]; + bytes_read = recv(child_fd, rbuf, sizeof(rbuf), 0); + if (bytes_read < 1) + HPWS_ACCEPT_ERROR(2, "nil message sent by hpws on startup on accept"); + + if (rbuf[0] != 'r') + HPWS_ACCEPT_ERROR(3, "unexpected content in message sent by hpws client mode on startup"); + } + + // RH TODO: accept needs a proper child cleanup on failure + return client{ + "", + buf, + child_fd, + max_buffer_size_, + (pid_t)pid, + buffer_fd, + mapping}; + } + + ~server() + { + if (!moved) + { + + // RH TODO ensure this pid terminates by following up with a SIGKILL + if (server_pid_ > 0) + { + kill(server_pid_, SIGTERM); + int status; + waitpid(server_pid_, &status, 0 /* should we use WNOHANG? */); + } + + close(master_control_fd_); + } + } + + static std::variant create( + std::string_view bin_path, + uint32_t max_buffer_size, + uint16_t port, + uint32_t max_con, + uint16_t max_con_per_ip, + std::string_view cert_path, + std::string_view key_path, + std::vector argv, //additional_arguments + std::function fork_child_init = NULL) + { +#define HPWS_SERVER_ERROR(code, msg) \ + { \ + error_code = code; \ + error_msg = msg; \ + goto server_error; \ + } + + int error_code = -1; + const char *error_msg = NULL; + int fd[2] = {-1, -1}; + pid_t pid = -1; + int count_args = 17 + argv.size(); + char const **argv_pass = NULL; + + if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, fd)) + HPWS_SERVER_ERROR(100, "could not create unix domain socket pair"); + + // construct the arguments + char shm_size[32]; + + if (snprintf(shm_size, 32, "%d", max_buffer_size) <= 0) + HPWS_SERVER_ERROR(90, "couldn't write shm size to string"); + + char port_str[6]; + if (snprintf(port_str, 6, "%d", port) <= 0) + HPWS_SERVER_ERROR(91, "couldn't write port to string"); + + char max_con_str[11]; + if (snprintf(max_con_str, 11, "%d", max_con) <= 0) + HPWS_SERVER_ERROR(92, "couldn't write max_con to string"); + + char max_con_per_ip_str[6]; + if (snprintf(max_con_per_ip_str, 6, "%d", max_con_per_ip) <= 0) + HPWS_SERVER_ERROR(93, "couldn't write max_con_per_ip to string"); + + argv_pass = + reinterpret_cast(alloca(sizeof(char *) * count_args)); + { + int upto = 0; + argv_pass[upto++] = bin_path.data(); + argv_pass[upto++] = "--server"; + argv_pass[upto++] = "--maxmsg"; + argv_pass[upto++] = shm_size; + argv_pass[upto++] = "--port"; + argv_pass[upto++] = port_str; + argv_pass[upto++] = "--cert"; + argv_pass[upto++] = cert_path.data(); + argv_pass[upto++] = "--key"; + argv_pass[upto++] = key_path.data(); + argv_pass[upto++] = "--cntlfd"; + argv_pass[upto++] = "3"; + argv_pass[upto++] = "--maxcon"; + argv_pass[upto++] = max_con_str; + argv_pass[upto++] = "--maxconip"; + argv_pass[upto++] = max_con_per_ip_str; + for (std::string_view &arg : argv) + argv_pass[upto++] = arg.data(); + argv_pass[upto] = NULL; + } + + pid = vfork(); + if (pid) + { + + // --- PARENT + + close(fd[1]); + + int flags = fcntl(fd[0], F_GETFD, NULL); + if (flags < 0) + HPWS_SERVER_ERROR(101, "could not get flags from unix domain socket"); + + flags |= FD_CLOEXEC; + if (fcntl(fd[0], F_SETFD, flags)) + HPWS_SERVER_ERROR(102, "could notset flags for unix domain socket"); + + // we will set a timeout and wait for the initial startup message from hpws server mode + struct pollfd pfd; + int ret; + + pfd.fd = fd[0]; + pfd.events = POLLIN; + ret = poll(&pfd, 1, HPWS_LONG_TIMEOUT); // default= 1500 ms timeout + + // timeout or error + if (ret < 1) + HPWS_SERVER_ERROR(1, "timeout waiting for hpws startup message"); + + char buf[1024]; + int bytes_read = recv(fd[0], buf, sizeof(buf) - 1, 0); + if (bytes_read < 1) + HPWS_SERVER_ERROR(2, "nil message sent by hpws on startup"); + + buf[bytes_read] = '\0'; + if (strncmp(buf, "startup", 7) != 0) + { + fprintf(stderr, "startup message: `%.*s`\n", bytes_read, buf); + HPWS_SERVER_ERROR(3, "unexpected content in message sent by hpws on startup"); + } + return server{ + pid, + fd[0], + max_buffer_size}; + } + else + { + + // --- CHILD + if (fork_child_init) + fork_child_init(); + + close(fd[0]); + + // dup fd[1] into fd 3 + dup2(fd[1], 3); + close(fd[1]); + + // we're assuming all fds above 3 will have close_exec flag + execv(bin_path.data(), (char *const *)argv_pass); + // we will send a nil message down the pipe to help the parent know somethings gone wrong + char nil[1]; + nil[0] = 0; + send(3, nil, 1, 0); + exit(1); // execl failure as child will always result in exit here + } + + server_error:; + + // NB: execution to here can only happen in parent process + // clean up any mess after error + if (pid > 0) + { + kill(pid, SIGKILL); /* RH TODO change this to SIGTERM and set a timeout? */ + int status; + waitpid(pid, &status, 0 /* should we use WNOHANG? */); + } + if (fd[0] > 0) + close(fd[0]); + if (fd[1] > 0) + close(fd[1]); + + return error{error_code, std::string{error_msg}}; + } + }; +} // namespace hpws + +#endif diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index e7be4875..12d3341d 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -1,6 +1,5 @@ #include "../pchheader.hpp" #include "../comm/comm_server.hpp" -#include "../comm/comm_client.hpp" #include "../conf.hpp" #include "../crypto.hpp" #include "../util.hpp" @@ -48,8 +47,7 @@ namespace p2p int start_peer_connections() { if (ctx.listener.start( - conf::cfg.peerport, ".sock-peer", comm::SESSION_TYPE::PEER, - true, false, true, metric_thresholds, conf::cfg.peers, conf::cfg.peermaxsize) == -1) + conf::cfg.peerport, comm::SESSION_TYPE::PEER, metric_thresholds, conf::cfg.peers, conf::cfg.peermaxsize) == -1) return -1; LOG_INFO << "Started listening for peer connections on " << std::to_string(conf::cfg.peerport); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 4ed07075..39809332 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -3,7 +3,6 @@ #include "../pchheader.hpp" #include "../comm/comm_server.hpp" -#include "../comm/comm_client.hpp" #include "../comm/comm_session.hpp" #include "../usr/user_input.hpp" #include "peer_session_handler.hpp" diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 93710f51..19c55ae0 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -9,7 +9,6 @@ #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" #include "../comm/comm_session.hpp" -#include "../comm/comm_client.hpp" #include "p2p.hpp" #include "peer_session_handler.hpp" #include "../state/state_sync.hpp" diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index a131abb2..4cccfde8 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -57,8 +57,7 @@ namespace usr int start_listening() { if (ctx.listener.start( - conf::cfg.pubport, ".sock-user", comm::SESSION_TYPE::USER, - true, true, conf::cfg.pubtls, metric_thresholds, std::set(), conf::cfg.pubmaxsize) == -1) + conf::cfg.pubport, comm::SESSION_TYPE::USER, metric_thresholds, std::set(), conf::cfg.pubmaxsize) == -1) return -1; LOG_INFO << "Started listening for user connections on " << std::to_string(conf::cfg.pubport); diff --git a/test/bin/hpws b/test/bin/hpws new file mode 100755 index 00000000..f078f862 Binary files /dev/null and b/test/bin/hpws differ diff --git a/test/bin/websocat b/test/bin/websocat deleted file mode 100755 index fd69e788..00000000 Binary files a/test/bin/websocat and /dev/null differ diff --git a/test/bin/websocketd b/test/bin/websocketd deleted file mode 100755 index bf48e30b..00000000 Binary files a/test/bin/websocketd and /dev/null differ diff --git a/test/local-cluster/Dockerfile b/test/local-cluster/Dockerfile index 62a6f2cf..f18238e7 100644 --- a/test/local-cluster/Dockerfile +++ b/test/local-cluster/Dockerfile @@ -3,8 +3,7 @@ FROM node:12.18.3-buster-slim RUN apt-get update -# Install netcat for websocketd <--> domain socket redirection. -RUN apt-get install -y netcat-openbsd libgomp1 +RUN apt-get install -y libgomp1 libssl-dev # Install shared libraries. # Copy shared libraries and register it. @@ -17,7 +16,6 @@ COPY ./bin/fusermount3 /usr/local/bin/ WORKDIR /hp COPY ./bin/hpcore . COPY ./bin/hpfs . -COPY ./bin/websocketd . -COPY ./bin/websocat . +COPY ./bin/hpws . ENTRYPOINT ["/hp/hpcore"] \ No newline at end of file diff --git a/test/vm-cluster/cluster.sh b/test/vm-cluster/cluster.sh index c83cf976..5241f898 100755 --- a/test/vm-cluster/cluster.sh +++ b/test/vm-cluster/cluster.sh @@ -261,7 +261,7 @@ if [ $mode = "new" ] || [ $mode = "update" ]; then fi if [ $mode = "new" ]; then - cp ../bin/{libfuse3.so.3,libblake3.so,fusermount3,websocketd,websocat,hpfs} hpfiles/bin/ + cp ../bin/{libfuse3.so.3,libblake3.so,fusermount3,hpws,hpfs} hpfiles/bin/ cp ./setup-hp.sh hpfiles/ fi diff --git a/test/vm-cluster/setup-hp.sh b/test/vm-cluster/setup-hp.sh index 9503800c..3a71d9a0 100755 --- a/test/vm-cluster/setup-hp.sh +++ b/test/vm-cluster/setup-hp.sh @@ -27,7 +27,7 @@ if [ -x "$(command -v fusermount3)" ]; then echo "FUSE already installed." else echo "Installing FUSE and other shared libraries..." - sudo apt-get -y install libgomp1 + sudo apt-get -y install libgomp1 libssl-dev sudo cp $basedir/hpfiles/bin/{libfuse3.so.3,libblake3.so} /usr/local/lib/ sudo ldconfig sudo cp $basedir/hpfiles/bin/fusermount3 /usr/local/bin/ @@ -69,11 +69,11 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then sudo chmod +x $contdir/stop.sh # Create check.sh script (print pids belonging to this contract dir) - echo "echo hpcore pid:\$($contdir/getpid.sh hpcore) hpfs pid:\$($contdir/getpid.sh hpfs) websocketd pid:\$($contdir/getpid.sh websocketd) websocat pid:\$($contdir/getpid.sh websocat)" > $contdir/check.sh + echo "echo hpcore pid:\$($contdir/getpid.sh hpcore) hpfs pid:\$($contdir/getpid.sh hpfs) hpws pid:\$($contdir/getpid.sh hpws)" > $contdir/check.sh sudo chmod +x $contdir/check.sh # Create kill.sh script - echo "sudo kill \$($contdir/getpid.sh hpcore hpfs websocketd websocat)" > $contdir/kill.sh + echo "sudo kill \$($contdir/getpid.sh hpcore hpfs hpws)" > $contdir/kill.sh sudo chmod +x $contdir/kill.sh # Configure .screenrc