From 3ea0299964cdf0dc3bee6a00dfc275c97f3274c5 Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Sun, 8 Nov 2020 22:14:08 +0530 Subject: [PATCH] Comm session re-architecture and self comm channel. (#145) * Introduced self comm channel instead of loopback websocket. * Introduced comm_session and comm_server inheritance hierarchy. * Separated peer session and user session classes. --- CMakeLists.txt | 5 +- src/comm/comm_server.cpp | 297 --------------------------- src/comm/comm_server.hpp | 243 +++++++++++++++++++--- src/comm/comm_session.cpp | 100 ++++----- src/comm/comm_session.hpp | 47 ++--- src/comm/comm_session_handler.hpp | 23 --- src/comm/comm_session_threshold.hpp | 4 +- src/conf.cpp | 5 - src/p2p/p2p.cpp | 145 ++++++------- src/p2p/p2p.hpp | 17 +- src/p2p/peer_comm_server.cpp | 114 ++++++++++ src/p2p/peer_comm_server.hpp | 29 +++ src/p2p/peer_comm_session.cpp | 36 ++++ src/p2p/peer_comm_session.hpp | 30 +++ src/p2p/peer_session_handler.cpp | 170 +++++++++------ src/p2p/peer_session_handler.hpp | 26 +-- src/p2p/self_node.cpp | 28 +++ src/p2p/self_node.hpp | 12 ++ src/usr/user_comm_server.hpp | 15 ++ src/usr/user_comm_session.cpp | 41 ++++ src/usr/user_comm_session.hpp | 27 +++ src/usr/user_session_handler.cpp | 138 ++++++------- src/usr/user_session_handler.hpp | 14 +- src/usr/usr.cpp | 16 +- src/usr/usr.hpp | 26 +-- test/local-cluster/cluster-create.sh | 3 +- 26 files changed, 891 insertions(+), 720 deletions(-) delete mode 100644 src/comm/comm_server.cpp delete mode 100644 src/comm/comm_session_handler.hpp create mode 100644 src/p2p/peer_comm_server.cpp create mode 100644 src/p2p/peer_comm_server.hpp create mode 100644 src/p2p/peer_comm_session.cpp create mode 100644 src/p2p/peer_comm_session.hpp create mode 100644 src/p2p/self_node.cpp create mode 100644 src/p2p/self_node.hpp create mode 100644 src/usr/user_comm_server.hpp create mode 100644 src/usr/user_comm_session.cpp create mode 100644 src/usr/user_comm_session.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index 0cc9e7ec..535948ad 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,15 +33,18 @@ add_executable(hpcore src/hpfs/h32.cpp src/hpfs/hpfs.cpp src/comm/comm_session.cpp - src/comm/comm_server.cpp src/msg/fbuf/common_helpers.cpp src/msg/fbuf/p2pmsg_helpers.cpp src/msg/fbuf/ledger_helpers.cpp src/msg/json/usrmsg_json.cpp src/msg/bson/usrmsg_bson.cpp src/msg/usrmsg_parser.cpp + src/p2p/peer_comm_server.cpp + src/p2p/peer_comm_session.cpp src/p2p/peer_session_handler.cpp + src/p2p/self_node.cpp src/p2p/p2p.cpp + src/usr/user_comm_session.cpp src/usr/user_session_handler.cpp src/usr/usr.cpp src/usr/read_req.cpp diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp deleted file mode 100644 index 1681ac69..00000000 --- a/src/comm/comm_server.cpp +++ /dev/null @@ -1,297 +0,0 @@ -#include "comm_server.hpp" -#include "comm_session.hpp" -#include "comm_session_handler.hpp" -#include "../hplog.hpp" -#include "../util.hpp" -#include "../bill/corebill.h" -#include "../hpws/hpws.hpp" -#include "../p2p/p2p.hpp" - -namespace comm -{ - constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 16 * 1024 * 1024; - constexpr float WEAKLY_CONNECTED_THRESHOLD = 0.7; - - int comm_server::start( - const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], - const std::set &req_known_remotes, const uint64_t max_msg_size) - { - const uint64_t final_max_msg_size = max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE; - - if (start_hpws_server(port, final_max_msg_size) == -1) - return -1; - - watchdog_thread = std::thread( - &comm_server::connection_watchdog, this, session_type, - std::ref(metric_thresholds), req_known_remotes, final_max_msg_size); - - inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this, session_type); - - return 0; - } - - void comm_server::connection_watchdog( - const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], - const std::set &req_known_remotes, const uint64_t max_msg_size) - { - util::mask_signal(); - - // Counter to track when to initiate outbound client connections. - int16_t loop_counter = -1; - - while (!should_stop_listening) - { - util::sleep(100); - - // Accept any new incoming connection if available. - check_for_new_connection(sessions, session_type, metric_thresholds); - - // Restore any missing outbound connections. - if (!req_known_remotes.empty()) - { - if (loop_counter == 20) - { - loop_counter = 0; - maintain_known_connections(sessions, req_known_remotes, session_type, max_msg_size, metric_thresholds); - } - loop_counter++; - } - - // Cleanup any sessions that needs closure. - for (auto itr = sessions.begin(); itr != sessions.end();) - { - if (itr->state == SESSION_STATE::MUST_CLOSE) - itr->close(true); - - if (itr->state == SESSION_STATE::CLOSED) - itr = sessions.erase(itr); - else - ++itr; - } - - flatbuffers::FlatBufferBuilder fbuf(1024); - if (sessions.size() > 1) - { - if (is_weakly_connected()) - { - if (!weakly_connected_status_sent) - { - LOG_DEBUG << "Weakly connected status announcement sent"; - p2p::send_connected_status_announcement(fbuf, true); - // Mark that the p2p message forwarding is requested. - weakly_connected_status_sent = true; - } - } - else - { - if (weakly_connected_status_sent) - { - LOG_DEBUG << "Strongly connected status announcement sent"; - p2p::send_connected_status_announcement(fbuf, false); - weakly_connected_status_sent = false; - } - } - } - } - - // If we reach this point that means we are shutting down. - - // Close and erase all sessions. - for (comm_session &session : sessions) - session.close(false); - - sessions.clear(); - - LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " listener stopped."; - } - - bool comm_server::is_weakly_connected() - { - return (sessions.size() - 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD); - } - - void comm_server::check_for_new_connection( - std::list &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4]) - { - std::variant accept_result = hpws_server.value().accept(true); - - if (std::holds_alternative(accept_result)) - { - const hpws::error error = std::get(accept_result); - if (error.first == 199) // No client connected. - return; - - LOG_ERROR << "Error in hpws accept():" << error.first << " " << error.second; - return; - } - - // New client connected. - hpws::client client = std::move(std::get(accept_result)); - const std::variant host_result = client.host_address(); - if (std::holds_alternative(host_result)) - { - const hpws::error error = std::get(host_result); - LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; - } - else - { - const std::string &host_address = std::get(host_result); - - if (corebill::is_banned(host_address)) - { - // We just let the client object gets destructed without adding it to a session. - LOG_DEBUG << "Dropping connection for banned host " << host_address; - } - else - { - comm_session session(host_address, std::move(client), session_type, true, metric_thresholds); - if (session.on_connect() == 0) - { - std::scoped_lock lock(sessions_mutex); - comm_session &inserted_session = sessions.emplace_back(std::move(session)); - - // Thread is seperately started after the moving operation to overcome the difficulty - // in accessing class member variables inside the thread. - // Class member variables gives unacceptable values if the thread starts before the move operation. - inserted_session.start_messaging_threads(); - // Making sure the newly connected node get the weakly connected announcement if the number of - // connected peers are under the threshold. - if ((sessions.size() > 1) && is_weakly_connected()) - { - weakly_connected_status_sent = false; - } - } - } - } - } - - void comm_server::maintain_known_connections( - std::list &sessions, const std::set &req_known_remotes, - const SESSION_TYPE session_type, const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]) - { - // Find already connected known remote parties list - std::set known_remotes; - for (const comm_session &session : sessions) - { - if (session.state != SESSION_STATE::CLOSED && !session.known_ipport.first.empty()) - known_remotes.emplace(session.known_ipport); - } - - for (const auto &ipport : req_known_remotes) - { - if (should_stop_listening) - break; - - // Check if we are already connected to this remote party. - if (known_remotes.find(ipport) != known_remotes.end()) - continue; - - std::string_view host = ipport.first; - const uint16_t port = ipport.second; - LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); - - std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, max_msg_size, host, port, "/", {}, util::fork_detach); - - if (std::holds_alternative(client_result)) - { - const hpws::error error = std::get(client_result); - if (error.first != 202) - LOG_ERROR << "Outbound connection hpws error:" << error.first << " " << error.second; - } - else - { - hpws::client client = std::move(std::get(client_result)); - const std::variant host_result = client.host_address(); - if (std::holds_alternative(host_result)) - { - const hpws::error error = std::get(host_result); - LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; - } - else - { - const std::string &host_address = std::get(host_result); - comm::comm_session session(host_address, std::move(client), session_type, false, metric_thresholds); - session.known_ipport = ipport; - if (session.on_connect() == 0) - { - std::scoped_lock lock(sessions_mutex); - comm_session &inserted_session = sessions.emplace_back(std::move(session)); - - // Thread is seperately started after the moving operation to overcome the difficulty - // in accessing class member variables inside the thread. - // Class member variables gives unacceptable values if the thread starts before the move operation. - inserted_session.start_messaging_threads(); - - known_remotes.emplace(ipport); - } - } - } - } - } - - void comm_server::inbound_message_processor_loop(const SESSION_TYPE session_type) - { - util::mask_signal(); - - while (!should_stop_listening) - { - bool messages_processed = false; - - { - // Process one message from each session in round-robin fashion. - std::scoped_lock lock(sessions_mutex); - for (comm_session &session : sessions) - { - const int result = session.process_next_inbound_message(); - - if (result != 0) - messages_processed = true; - - if (result == -1) - session.mark_for_closure(); - } - } - - // If no messages were processed in this cycle, wait for some time. - if (!messages_processed) - util::sleep(10); - } - - LOG_INFO << (session_type == SESSION_TYPE::USER ? "User" : "Peer") << " message processor stopped."; - } - - int comm_server::start_hpws_server(const uint16_t port, const uint64_t max_msg_size) - { - std::variant result = hpws::server::create( - conf::ctx.hpws_exe_path, - max_msg_size, - port, - 512, // Max connections - 2, // Max connections per IP. - conf::ctx.tls_cert_file, - conf::ctx.tls_key_file, - {}, - util::fork_detach); - - if (std::holds_alternative(result)) - { - const hpws::error e = std::get(result); - LOG_ERROR << "Error creating hpws server:" << e.first << " " << e.second; - return -1; - } - - hpws_server.emplace(std::move(std::get(result))); - - return 0; - } - - void comm_server::stop() - { - should_stop_listening = true; - watchdog_thread.join(); - hpws_server.reset(); - - inbound_message_processor_thread.join(); - } - -} // namespace comm diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index bc8bb5a9..02040c16 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -1,52 +1,243 @@ -#ifndef _HP_COMM_SERVER_ -#define _HP_COMM_SERVER_ +#ifndef _HP_COMM_COMM_SERVER_ +#define _HP_COMM_COMM_SERVER_ #include "../pchheader.hpp" -#include "comm_session.hpp" +#include "../hplog.hpp" +#include "../util.hpp" +#include "../bill/corebill.h" #include "../hpws/hpws.hpp" +#include "comm_session.hpp" namespace comm { + constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 16 * 1024 * 1024; + template class comm_server { + protected: + const uint64_t (&metric_thresholds)[4]; + const uint64_t max_msg_size; + bool is_shutting_down = false; + std::list sessions; + std::list new_sessions; // Sessions that haven't been initialized properly which are yet to be merge to "sessions" list. + std::mutex sessions_mutex; + std::mutex new_sessions_mutex; + + virtual void start_custom_jobs() + { + } + + virtual void stop_custom_jobs() + { + } + + virtual int process_custom_messages() + { + return 0; + } + + virtual void custom_connections() + { + } + + private: + const std::string name; + const uint16_t listen_port; std::optional hpws_server; std::thread watchdog_thread; // Connection watcher thread. std::thread inbound_message_processor_thread; // Incoming message processor thread. - bool should_stop_listening = false; - bool weakly_connected_status_sent = false; // keep track whether the weakly connected status announcement is sent or not. - std::list sessions; - std::mutex sessions_mutex; + void connection_watchdog() + { + util::mask_signal(); - void connection_watchdog( - const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], - const std::set &req_known_remotes, const uint64_t max_msg_size); + while (!is_shutting_down) + { + util::sleep(100); - void inbound_message_processor_loop(const SESSION_TYPE session_type); + // Accept any new incoming connection if available. + check_for_new_connection(); - int start_hpws_server(const uint16_t port, const uint64_t max_msg_size); + // Any connection logic in inherited classes. + custom_connections(); - int poll_fds(pollfd *pollfds, const int accept_fd, const std::list &sessions); + std::scoped_lock lock(sessions_mutex); - void check_for_new_connection( - std::list &sessions, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4]); + // Initialize any new sessions. + { + // Get current last session. + auto ex_last_session = std::prev(sessions.end()); - void maintain_known_connections( - std::list &sessions, const std::set &req_known_remotes, - const SESSION_TYPE session_type, const uint64_t max_msg_size, const uint64_t (&metric_thresholds)[4]); + { + // Move new sessions to the end of "sessions" list. + std::scoped_lock lock(new_sessions_mutex); + sessions.splice(sessions.end(), new_sessions); + } - std::string get_cgi_ip(const int fd); + // Initialize newly inserted sessions. + // This must be performed after session objects end up in their final location. + for (auto itr = ++ex_last_session; itr != sessions.end(); itr++) + itr->init(); + } - bool is_weakly_connected(); + // Cleanup any sessions that needs closure. + for (auto itr = sessions.begin(); itr != sessions.end();) + { + if (itr->state == SESSION_STATE::MUST_CLOSE) + itr->close(true); + + if (itr->state == SESSION_STATE::CLOSED) + itr = sessions.erase(itr); + else + ++itr; + } + } + + // If we reach this point that means we are shutting down. + + // Close and erase all sessions. + for (T &session : sessions) + session.close(false); + + sessions.clear(); + + LOG_INFO << name << " listener stopped."; + } + + void check_for_new_connection() + { + std::variant accept_result = hpws_server.value().accept(true); + + if (std::holds_alternative(accept_result)) + { + const hpws::error error = std::get(accept_result); + if (error.first == 199) // No client connected. + return; + + LOG_ERROR << "Error in hpws accept():" << error.first << " " << error.second; + return; + } + + // New client connected. + hpws::client client = std::move(std::get(accept_result)); + const std::variant host_result = client.host_address(); + if (std::holds_alternative(host_result)) + { + const hpws::error error = std::get(host_result); + LOG_ERROR << "Error getting " << name << " ip from hpws:" << error.first << " " << error.second; + } + else + { + const std::string &host_address = std::get(host_result); + if (!corebill::is_banned(host_address)) + { + // We do not directly add to sessions list. We simply add to new_sessions list under a lock so the main server + // loop will take care of initialize the new sessions. This is because inherited classes (eg. peer_comm_server) + // need a way to safely inject new sessions from another thread. + std::scoped_lock lock(new_sessions_mutex); + new_sessions.emplace_back(host_address, std::move(client), true, metric_thresholds); + } + else + { + LOG_DEBUG << "Dropping " << name << " connection for banned host " << host_address; + } + } + + // If the hpws client object was not added to a session so far, in will get dstructed and the channel will close. + } + + void inbound_message_processor_loop() + { + util::mask_signal(); + + while (!is_shutting_down) + { + bool messages_processed = false; + + if (process_custom_messages() != 0) + messages_processed = true; + + { + // Process one message from each session in round-robin fashion. + std::scoped_lock lock(sessions_mutex); + for (T &session : sessions) + { + const int result = session.process_next_inbound_message(); + + if (result != 0) + messages_processed = true; + + if (result == -1) + session.mark_for_closure(); + } + } + + // If no messages were processed in this cycle, wait for some time. + if (!messages_processed) + util::sleep(10); + } + + LOG_INFO << name << " message processor stopped."; + } + + int start_hpws_server() + { + std::variant result = hpws::server::create( + conf::ctx.hpws_exe_path, + max_msg_size, + listen_port, + 512, // Max connections + 2, // Max connections per IP. + conf::ctx.tls_cert_file, + conf::ctx.tls_key_file, + {}, + util::fork_detach); + + if (std::holds_alternative(result)) + { + const hpws::error e = std::get(result); + LOG_ERROR << "Error creating hpws server:" << e.first << " " << e.second; + return -1; + } + + hpws_server.emplace(std::move(std::get(result))); + + return 0; + } public: - // Start accepting incoming connections - int start( - const uint16_t port, const SESSION_TYPE session_type, const uint64_t (&metric_thresholds)[4], - const std::set &req_known_remotes, const uint64_t max_msg_size); - void stop(); - void firewall_ban(std::string_view ip, const bool unban); + comm_server(std::string_view name, const uint16_t port, const uint64_t (&metric_thresholds)[4], const uint64_t max_msg_size) + : name(name), + listen_port(port), + metric_thresholds(metric_thresholds), + max_msg_size(max_msg_size > 0 ? max_msg_size : DEFAULT_MAX_MSG_SIZE) + { + } + + int start() + { + if (start_hpws_server() == -1) + return -1; + + watchdog_thread = std::thread(&comm_server::connection_watchdog, this); + inbound_message_processor_thread = std::thread(&comm_server::inbound_message_processor_loop, this); + start_custom_jobs(); + + return 0; + } + + void stop() + { + is_shutting_down = true; + + stop_custom_jobs(); + + watchdog_thread.join(); + hpws_server.reset(); + + inbound_message_processor_thread.join(); + } }; } // namespace comm diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index e315d243..af6b78f6 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -1,29 +1,20 @@ #include "../pchheader.hpp" -#include "../usr/user_session_handler.hpp" -#include "../p2p/peer_session_handler.hpp" -#include "comm_session.hpp" #include "../hplog.hpp" #include "../util.hpp" #include "../conf.hpp" #include "../bill/corebill.h" #include "../hpws/hpws.hpp" +#include "comm_session.hpp" namespace comm { constexpr uint32_t INTERVALMS = 60000; - // Global instances of user and peer session handlers. - usr::user_session_handler user_sess_handler; - p2p::peer_session_handler peer_sess_handler; - comm_session::comm_session( - std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type, - const bool is_inbound, const uint64_t (&metric_thresholds)[4]) - - : address(ip), + std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[4]) + : uniqueid(host_address), + host_address(host_address), hpws_client(std::move(hpws_client)), - session_type(session_type), - uniqueid(ip), is_inbound(is_inbound), in_msg_queue(32) { @@ -36,12 +27,18 @@ namespace comm } /** - * Starts the outbound queue processing thread. - */ - void comm_session::start_messaging_threads() + * Init() should be called to activate the session. + * Because we are starting threads here, after init() is called, the session object must not be "std::moved". + */ + void comm_session::init() { - reader_thread = std::thread(&comm_session::reader_loop, this); - writer_thread = std::thread(&comm_session::process_outbound_msg_queue, this); + if (state == SESSION_STATE::NONE) + { + handle_connect(); + reader_thread = std::thread(&comm_session::reader_loop, this); + writer_thread = std::thread(&comm_session::process_outbound_msg_queue, this); + state = SESSION_STATE::ACTIVE; + } } void comm_session::reader_loop() @@ -87,16 +84,6 @@ namespace comm } } - int comm_session::on_connect() - { - state = SESSION_STATE::ACTIVE; - - if (session_type == SESSION_TYPE::USER) - return user_sess_handler.on_connect(*this); - else - return peer_sess_handler.on_connect(*this); - } - /** * Processes the next queued message (if any). * @return 0 if no messages in queue. 1 if message was processed. -1 means session must be closed. @@ -110,9 +97,7 @@ namespace comm if (in_msg_queue.try_dequeue(msg)) { std::string_view sv(msg.data(), msg.size()); - const int sess_handler_result = (session_type == SESSION_TYPE::USER) - ? user_sess_handler.on_message(*this, sv) - : peer_sess_handler.on_message(*this, sv); + const int sess_handler_result = handle_message(sv); // If session handler returns -1 then that means the session must be closed. // Otherwise it's considered message processing is successful. @@ -211,12 +196,7 @@ namespace comm return; if (invoke_handler) - { - if (session_type == SESSION_TYPE::USER) - user_sess_handler.on_close(*this); - else - peer_sess_handler.on_close(*this); - } + handle_close(); state = SESSION_STATE::CLOSED; @@ -224,11 +204,13 @@ namespace comm hpws_client.reset(); // Wait untill reader/writer threads gracefully stop. - writer_thread.join(); - reader_thread.join(); + if (writer_thread.joinable()) + writer_thread.join(); - LOG_DEBUG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " - << display_name() << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); + if (reader_thread.joinable()) + reader_thread.join(); + + LOG_DEBUG << "Session closed: " << display_name(); } /** @@ -236,26 +218,7 @@ namespace comm */ const std::string comm_session::display_name() { - if (challenge_status == CHALLENGE_STATUS::CHALLENGE_VERIFIED) - { - if (session_type == SESSION_TYPE::PEER) - { - // Peer sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix). - return uniqueid.substr(2, 10); - } - else - { - // User sessions use binary pubkey as unique id. So we need to convert to hex. - std::string hex; - util::bin2hex(hex, - reinterpret_cast(uniqueid.data()), - uniqueid.length()); - return hex.substr(2, 10); // Skipping first 2 bytes key type prefix. - } - } - - // Unverified sessions just use the ip/host address as the unique id. - return uniqueid; + return uniqueid + (is_inbound ? ":in" : ":out"); } /** @@ -301,7 +264,7 @@ namespace comm t.counter_value = 0; LOG_INFO << "Session " << uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; - corebill::report_violation(address); + corebill::report_violation(host_address); } else if (elapsed_time > t.intervalms) { @@ -311,4 +274,17 @@ namespace comm } } + void comm_session::handle_connect() + { + } + + int comm_session::handle_message(std::string_view msg) + { + return 0; + } + + void comm_session::handle_close() + { + } + } // namespace comm \ No newline at end of file diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 2deb4ae4..d63d5676 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -1,10 +1,10 @@ -#ifndef _HP_COMM_SESSION_ -#define _HP_COMM_SESSION_ +#ifndef _HP_COMM_COMM_SESSION_ +#define _HP_COMM_COMM_SESSION_ #include "../pchheader.hpp" -#include "comm_session_threshold.hpp" #include "../conf.hpp" #include "../hpws/hpws.hpp" +#include "comm_session_threshold.hpp" namespace comm { @@ -18,26 +18,19 @@ namespace comm enum SESSION_STATE { - NOT_INITIALIZED, // Session is not yet initialized properly. - ACTIVE, // Session is active and functioning. - MUST_CLOSE, // Session socket is in unusable state and must be closed. - CLOSED // Session is fully closed. - }; - - enum SESSION_TYPE - { - USER = 0, - PEER = 1 + NONE, // Session is not yet initialized properly. + ACTIVE, // Session is active and functioning. + MUST_CLOSE, // Session socket is in unusable state and must be closed. + CLOSED // Session is fully closed. }; /** - * Represents an active WebSocket connection -*/ + * Represents an active WebSocket connection + */ class comm_session { private: std::optional hpws_client; - const SESSION_TYPE session_type; std::vector thresholds; // track down various communication thresholds std::thread reader_thread; // The thread responsible for reading messages from the read fd. @@ -47,22 +40,22 @@ namespace comm void reader_loop(); + protected: + virtual void handle_connect(); + virtual int handle_message(std::string_view msg); + virtual void handle_close(); + public: - const bool is_inbound; - bool is_self = false; - const std::string address; // IP address of the remote party. std::string uniqueid; + const bool is_inbound; + const std::string host_address; // Connection host address of the remote party. std::string issued_challenge; - conf::ip_port_pair known_ipport; - SESSION_STATE state = SESSION_STATE::NOT_INITIALIZED; + SESSION_STATE state = SESSION_STATE::NONE; CHALLENGE_STATUS challenge_status = CHALLENGE_STATUS::NOT_ISSUED; - bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes. comm_session( - std::string_view ip, hpws::client &&hpws_client, const SESSION_TYPE session_type, - const bool is_inbound, const uint64_t (&metric_thresholds)[4]); - int on_connect(); - void start_messaging_threads(); + std::string_view host_address, hpws::client &&hpws_client, const bool is_inbound, const uint64_t (&metric_thresholds)[4]); + void init(); int process_next_inbound_message(); int send(const std::vector &message); int send(std::string_view message); @@ -70,7 +63,7 @@ namespace comm void process_outbound_msg_queue(); void mark_for_closure(); void close(const bool invoke_handler = true); - const std::string display_name(); + virtual const std::string display_name(); void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms); void increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount); diff --git a/src/comm/comm_session_handler.hpp b/src/comm/comm_session_handler.hpp deleted file mode 100644 index 7a32f400..00000000 --- a/src/comm/comm_session_handler.hpp +++ /dev/null @@ -1,23 +0,0 @@ -#ifndef _HP_COMM_SESSION_HANDLER_ -#define _HP_COMM_SESSION_HANDLER_ - -#include "../pchheader.hpp" - -namespace comm -{ - -// Forward declaration -class comm_session; - -class comm_session_handler -{ - -public: - int on_connect(comm_session &session) const; - int on_message(comm_session &session, std::string_view message) const; - void on_close(const comm_session &session) const; -}; - -} // namespace comm - -#endif \ No newline at end of file diff --git a/src/comm/comm_session_threshold.hpp b/src/comm/comm_session_threshold.hpp index 73ba77bb..2a973e58 100644 --- a/src/comm/comm_session_threshold.hpp +++ b/src/comm/comm_session_threshold.hpp @@ -1,5 +1,5 @@ -#ifndef _HP_COMM_SESSION_THRESHOLD_ -#define _HP_COMM_SESSION_THRESHOLD_ +#ifndef _HP_COMM_COMM_SESSION_THRESHOLD_ +#define _HP_COMM_COMM_SESSION_THRESHOLD_ #include "../pchheader.hpp" diff --git a/src/conf.cpp b/src/conf.cpp index 1ab27d01..057fb0b1 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -29,11 +29,6 @@ namespace conf if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0) return -1; - // Append self peer to peer list. - const std::string portstr = std::to_string(cfg.peerport); - - cfg.peers.emplace(std::make_pair(SELF_HOST, cfg.peerport)); - // Append self pubkey to unl list. cfg.unl.emplace(cfg.pubkey); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index bdc4a52c..01f99320 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -1,12 +1,12 @@ #include "../pchheader.hpp" -#include "../comm/comm_server.hpp" #include "../conf.hpp" #include "../crypto.hpp" #include "../util.hpp" #include "../hplog.hpp" -#include "p2p.hpp" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../ledger.hpp" +#include "p2p.hpp" +#include "self_node.hpp" namespace p2p { @@ -18,9 +18,9 @@ namespace p2p bool init_success = false; /** - * Initializes the p2p subsystem. Must be called once during application startup. - * @return 0 for successful initialization. -1 for failure. - */ + * Initializes the p2p subsystem. Must be called once during application startup. + * @return 0 for successful initialization. -1 for failure. + */ int init() { metric_thresholds[0] = conf::cfg.peermaxcpm; @@ -37,27 +37,26 @@ namespace p2p } /** - * Cleanup any running processes. - */ + * Cleanup any running processes. + */ void deinit() { if (init_success) - ctx.listener.stop(); + ctx.server->stop(); } int start_peer_connections() { - if (ctx.listener.start( - conf::cfg.peerport, comm::SESSION_TYPE::PEER, metric_thresholds, conf::cfg.peers, conf::cfg.peermaxsize) == -1) + ctx.server.emplace(conf::cfg.peerport, metric_thresholds, conf::cfg.peermaxsize, conf::cfg.peers); + if (ctx.server->start() == -1) return -1; LOG_INFO << "Started listening for peer connections on " << std::to_string(conf::cfg.peerport); return 0; } - int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp) + int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp) { - // Compare the response challenge string with the original issued challenge. if (session.issued_challenge != challenge_resp.challenge) { @@ -66,10 +65,7 @@ namespace p2p } // Verify the challenge signature. - if (crypto::verify( - challenge_resp.challenge, - challenge_resp.signature, - challenge_resp.pubkey) != 0) + if (crypto::verify(challenge_resp.challenge, challenge_resp.signature, challenge_resp.pubkey) != 0) { LOG_DEBUG << "Peer challenge response signature verification failed."; return -1; @@ -82,35 +78,34 @@ namespace p2p const int res = challenge_resp.pubkey.compare(conf::cfg.pubkey); - // If pub key is same as our (self) pub key, then this is the loopback connection to ourselves. - // Hence we must keep the connection but only one of two sessions must be added to peer_connections. // If pub key is greater than our id (< 0), then we should give priority to any existing inbound connection // from the same peer and drop the outbound connection. // If pub key is lower than our id (> 0), then we should give priority to any existing outbound connection // from the same peer and drop the inbound connection. + // If the pub key is same as ours then we reject the connection. + if (res == 0) + { + LOG_DEBUG << "Pubkey violation. Rejecting new peer connection [" << session.display_name() << "]"; + return -1; + } + std::scoped_lock lock(ctx.peer_connections_mutex); - const auto iter = p2p::ctx.peer_connections.find(pubkeyhex); - if (iter == p2p::ctx.peer_connections.end()) + const auto iter = ctx.peer_connections.find(pubkeyhex); + if (iter == ctx.peer_connections.end()) { // Add the new connection straight away, if we haven't seen it before. - session.is_self = (res == 0); session.uniqueid.swap(pubkeyhex); - session.challenge_status = comm::CHALLENGE_VERIFIED; - p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); + session.challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED; + ctx.peer_connections.try_emplace(session.uniqueid, &session); + + LOG_DEBUG << "Accepted verified connection [" << session.display_name() << "]"; return 0; } - else if (res == 0) // New connection is self (There can be two sessions for self (inbound/outbound)) + else // Peer pub key already exists in our sessions. { - session.is_self = true; - session.uniqueid.swap(pubkeyhex); - session.challenge_status = comm::CHALLENGE_VERIFIED; - return 0; - } - else // New connection is not self but peer pub key already exists in our sessions. - { - comm::comm_session &ex_session = *iter->second; + peer_comm_session &ex_session = *iter->second; // We don't allow duplicate sessions to the same peer to same direction. if (ex_session.is_inbound != session.is_inbound) { @@ -122,27 +117,28 @@ namespace p2p if (session.known_ipport.first.empty()) session.known_ipport.swap(ex_session.known_ipport); session.uniqueid.swap(pubkeyhex); - session.challenge_status = comm::CHALLENGE_VERIFIED; + session.challenge_status = comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED; ex_session.mark_for_closure(); - p2p::ctx.peer_connections.erase(iter); // remove existing session. + ctx.peer_connections.erase(iter); // remove existing session. // We have to keep the weekly connected status of the removed session object. // If not, connected status received prior to connection dropping will be lost. session.is_weakly_connected = ex_session.is_weakly_connected; - p2p::ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. + ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. - LOG_DEBUG << "Replacing existing connection [" << session.display_name() << "]"; + LOG_DEBUG << "Replacing existing connection [" << ex_session.display_name() << "] with [" << session.display_name() << "]"; return 0; } else if (ex_session.known_ipport.first.empty() || !session.known_ipport.first.empty()) { // If we have any known ip-port info from the new session, transfer them to the existing session. ex_session.known_ipport.swap(session.known_ipport); + LOG_DEBUG << "Merging new connection [" << session.display_name() << "] with [" << ex_session.display_name() << "]"; } } // Reaching this point means we don't need the new session. - LOG_DEBUG << "Rejecting new peer connection because existing connection takes priority [" << pubkeyhex.substr(0, 10) << "]"; + LOG_DEBUG << "Rejecting new connection [" << session.display_name() << "] in favour of [" << ex_session.display_name() << "]"; return -1; } } @@ -167,39 +163,36 @@ namespace p2p * @param is_msg_forwarding Whether this broadcast is for message forwarding. * @param skipping_session Session to be skipped in message forwarding(optional). */ - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const comm::comm_session *skipping_session) + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const peer_comm_session *skipping_session) { - if (ctx.peer_connections.size() == 0) - { - LOG_DEBUG << "No peers to broadcast (not even self). Cannot broadcast."; - return; - } + if (send_to_self) + self::send(message); //Broadcast while locking the peer_connections. std::scoped_lock lock(ctx.peer_connections_mutex); for (const auto &[k, session] : ctx.peer_connections) { - // Exclude given session and self if provided. + // Exclude given session if provided. // Messages are forwarded only to the weakly connected nodes only in the message forwarding mode. - if ((!send_to_self && session->is_self) || - (skipping_session && skipping_session == session) || + if ((skipping_session && skipping_session == session) || (is_msg_forwarding && !session->is_weakly_connected)) continue; session->send(message); } } + /** * Check whether the given message is qualified to be forwarded to peers. * @param container The message container. * @param content_message_type The message type. * @return Returns true if the message is qualified for forwarding to peers. False otherwise. */ - bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type) + bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type) { - // Checking whether the message forwarding is enabled and skip if the message is sent from self. - if (!conf::cfg.msgforwarding || session.is_self) + // Checking whether the message forwarding is enabled. + if (!conf::cfg.msgforwarding) { return false; } @@ -222,35 +215,25 @@ namespace p2p } /** - * Sends the given message to self (this node). - * @param fbuf Peer outbound message to be sent to self. - */ + * Sends the given message to self (this node). + * @param fbuf Peer outbound message to be sent to self. + */ void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf) { - //Send while locking the peer_connections. - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); - - // Find the peer session connected to self. - const auto peer_itr = ctx.peer_connections.find(conf::cfg.pubkeyhex); - if (peer_itr != ctx.peer_connections.end()) - { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - - comm::comm_session *session = peer_itr->second; - session->send(msg); - } + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + self::send(msg); } /** - * Sends the given message to a random peer (except self). - * @param fbuf Peer outbound message to be sent to peer. - * @param target_pubkey Randomly selected target peer pubkey. - */ + * Sends the given message to a random peer (except self). + * @param fbuf Peer outbound message to be sent to peer. + * @param target_pubkey Randomly selected target peer pubkey. + */ void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey) { //Send while locking the peer_connections. - std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + std::scoped_lock lock(ctx.peer_connections_mutex); const size_t connected_peers = ctx.peer_connections.size(); if (connected_peers == 0) @@ -258,11 +241,6 @@ namespace p2p LOG_DEBUG << "No peers to random send."; return; } - else if (connected_peers == 1 && ctx.peer_connections.begin()->second->is_self) - { - LOG_DEBUG << "Only self is connected. Cannot random send."; - return; - } while (true) { @@ -272,16 +250,13 @@ namespace p2p std::advance(it, random_peer_index); //move iterator to point to random selected peer. //send message to selected peer. - comm::comm_session *session = it->second; - if (!session->is_self) // Exclude self peer. - { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + peer_comm_session *session = it->second; + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session->send(msg); - target_pubkey = session->uniqueid; - break; - } + session->send(msg); + target_pubkey = session->uniqueid; + break; } } @@ -293,7 +268,7 @@ namespace p2p void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected) { msg::fbuf::p2pmsg::create_msg_for_connected_status_announcement(fbuf, is_weakly_connected, ledger::ctx.get_lcl()); - p2p::broadcast_message(fbuf, false); + broadcast_message(fbuf, false); } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 03fe7987..e8632c93 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -2,17 +2,16 @@ #define _HP_P2P_ #include "../pchheader.hpp" -#include "../comm/comm_server.hpp" -#include "../comm/comm_session.hpp" #include "../usr/user_input.hpp" -#include "peer_session_handler.hpp" #include "../hpfs/h32.hpp" #include "../conf.hpp" #include "../msg/fbuf/p2pmsg_container_generated.h" +#include "peer_comm_server.hpp" +#include "peer_comm_session.hpp" +#include "peer_session_handler.hpp" namespace p2p { - struct proposal { std::string pubkey; @@ -125,11 +124,11 @@ namespace p2p message_collection collected_msgs; // Set of currently connected peer connections mapped by the uniqueid of socket session. - std::unordered_map peer_connections; + std::unordered_map peer_connections; std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. - comm::comm_server listener; + std::optional server; }; extern connected_context ctx; @@ -140,17 +139,17 @@ namespace p2p int start_peer_connections(); - int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp); + int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp); void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false); - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const comm::comm_session *skipping_session = NULL); + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const peer_comm_session *skipping_session = NULL); void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey); - bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); + bool validate_for_peer_msg_forwarding(const peer_comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); void send_connected_status_announcement(flatbuffers::FlatBufferBuilder &fbuf, const bool is_weakly_connected); } // namespace p2p diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp new file mode 100644 index 00000000..44b98119 --- /dev/null +++ b/src/p2p/peer_comm_server.cpp @@ -0,0 +1,114 @@ +#include "../comm/comm_server.hpp" +#include "../util.hpp" +#include "peer_comm_server.hpp" +#include "peer_comm_session.hpp" +#include "self_node.hpp" + +namespace p2p +{ + peer_comm_server::peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[4], + const uint64_t max_msg_size, const std::set &req_known_remotes) + : comm::comm_server("Peer", port, metric_thresholds, max_msg_size), + req_known_remotes(req_known_remotes) + { + } + + void peer_comm_server::start_custom_jobs() + { + // known_peers_thread = std::thread(&peer_comm_server::peer_monitor_loop, this); + } + + void peer_comm_server::stop_custom_jobs() + { + // known_peers_thread.join(); + } + + int peer_comm_server::process_custom_messages() + { + return self::process_next_message(); + } + + void peer_comm_server::custom_connections() + { + if (custom_connection_invocations == 20 || custom_connection_invocations == -1) + { + maintain_known_connections(); + custom_connection_invocations = 0; + } + + custom_connection_invocations++; + } + + // void peer_comm_server::peer_monitor_loop() + // { + // util::mask_signal(); + + // LOG_INFO << "Started peer monitor."; + + // while (!is_shutting_down) + // { + // util::sleep(2000); + // maintain_known_connections(); + // } + + // LOG_INFO << "Stopped peer monitor."; + // } + + void peer_comm_server::maintain_known_connections() + { + // Find already connected known remote parties list. + std::set known_remotes; + + { + std::scoped_lock lock(sessions_mutex); + for (const p2p::peer_comm_session &session : sessions) + { + if (session.state != comm::SESSION_STATE::CLOSED && !session.known_ipport.first.empty()) + known_remotes.emplace(session.known_ipport); + } + } + + for (const auto &ipport : req_known_remotes) + { + if (is_shutting_down) + break; + + // Check if we are already connected to this remote party. + if (known_remotes.find(ipport) != known_remotes.end()) + continue; + + std::string_view host = ipport.first; + const uint16_t port = ipport.second; + LOG_DEBUG << "Trying to connect " << host << ":" << std::to_string(port); + + std::variant client_result = hpws::client::connect(conf::ctx.hpws_exe_path, max_msg_size, host, port, "/", {}, util::fork_detach); + + if (std::holds_alternative(client_result)) + { + const hpws::error error = std::get(client_result); + if (error.first != 202) + LOG_DEBUG << "Outbound connection hpws error:" << error.first << " " << error.second; + } + else + { + hpws::client client = std::move(std::get(client_result)); + const std::variant host_result = client.host_address(); + if (std::holds_alternative(host_result)) + { + const hpws::error error = std::get(host_result); + LOG_ERROR << "Error getting ip from hpws:" << error.first << " " << error.second; + } + else + { + const std::string &host_address = std::get(host_result); + p2p::peer_comm_session session(host_address, std::move(client), false, metric_thresholds); + session.known_ipport = ipport; + + std::scoped_lock lock(new_sessions_mutex); + new_sessions.emplace_back(std::move(session)); + } + } + } + } + +} // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp new file mode 100644 index 00000000..6dbfbea9 --- /dev/null +++ b/src/p2p/peer_comm_server.hpp @@ -0,0 +1,29 @@ +#ifndef _HP_P2P_PEER_COMM_SERVER_ +#define _HP_P2P_PEER_COMM_SERVER_ + +#include "../comm/comm_server.hpp" +#include "peer_comm_session.hpp" + +namespace p2p +{ + class peer_comm_server : public comm::comm_server + { + private: + const std::set &req_known_remotes; + int custom_connection_invocations = -1; + // std::thread known_peers_thread; // Known peers connection establishment thread. + void maintain_known_connections(); + + protected: + void start_custom_jobs(); + void stop_custom_jobs(); + int process_custom_messages(); + void custom_connections(); + + public: + peer_comm_server(const uint16_t port, const uint64_t (&metric_thresholds)[4], + const uint64_t max_msg_size, const std::set &req_known_remotes); + }; +} // namespace p2p + +#endif \ No newline at end of file diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp new file mode 100644 index 00000000..04b0200c --- /dev/null +++ b/src/p2p/peer_comm_session.cpp @@ -0,0 +1,36 @@ +#include "../pchheader.hpp" +#include "peer_comm_session.hpp" +#include "peer_session_handler.hpp" + +namespace p2p +{ + void peer_comm_session::handle_connect() + { + p2p::handle_peer_connect(*this); + } + + int peer_comm_session::handle_message(std::string_view msg) + { + return p2p::handle_peer_message(*this, msg); + } + + void peer_comm_session::handle_close() + { + p2p::handle_peer_close(*this); + } + + /** + * Returns printable name for the session based on uniqueid (used for logging). + */ + const std::string peer_comm_session::display_name() + { + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + // Peer sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix). + return uniqueid.substr(2, 10) + (is_inbound ? ":in" : ":out"); + } + + return comm_session::display_name(); + } + +} // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_comm_session.hpp b/src/p2p/peer_comm_session.hpp new file mode 100644 index 00000000..b81ad3ee --- /dev/null +++ b/src/p2p/peer_comm_session.hpp @@ -0,0 +1,30 @@ +#ifndef _HP_P2P_PEER_COMM_SESSION_ +#define _HP_P2P_PEER_COMM_SESSION_ + +#include "../pchheader.hpp" +#include "../conf.hpp" +#include "../comm/comm_session.hpp" + +namespace p2p +{ + /** + * Represents a WebSocket connection to a Hot Pocket peer. + */ + class peer_comm_session : public comm::comm_session + { + using comm_session::comm_session; // Inherit constructors. + + private: + void handle_connect(); + int handle_message(std::string_view msg); + void handle_close(); + + public: + conf::ip_port_pair known_ipport; // A known ip/port information that matches with our peer list configuration. + bool is_weakly_connected = false; // Holds whether this node is weakly connected to the other nodes. + const std::string display_name(); + }; + +} // namespace comm + +#endif diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 20ea5139..e04d36ea 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -8,35 +8,23 @@ #include "../msg/fbuf/p2pmsg_content_generated.h" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../msg/fbuf/common_helpers.hpp" -#include "../comm/comm_session.hpp" -#include "p2p.hpp" -#include "peer_session_handler.hpp" #include "../state/state_sync.hpp" #include "../ledger.hpp" +#include "peer_comm_session.hpp" +#include "p2p.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { - // The set of recent peer message hashes used for duplicate detection. util::rollover_hashset recent_peermsg_hashes(200); /** - * This gets hit every time a peer connects to HP via the peer port (configured in contract config). - */ - int peer_session_handler::on_connect(comm::comm_session &session) const + * This gets hit every time a peer connects to HP via the peer port (configured in contract config). + */ + void handle_peer_connect(p2p::peer_comm_session &session) { - if (session.is_inbound) - { - // Limit max number of inbound connections. - if (conf::cfg.peermaxcons > 0 && ctx.peer_connections.size() >= conf::cfg.peermaxcons) - { - LOG_DEBUG << "Max peer connections reached. Dropped connection " << session.display_name(); - return -1; - } - } - // Send peer challenge. flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_peer_challenge(fbuf, session.issued_challenge); @@ -44,12 +32,11 @@ namespace p2p reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); session.send(msg); session.challenge_status = comm::CHALLENGE_ISSUED; - return 0; } - //peer session on message callback method - //validate and handle each type of peer messages. - int peer_session_handler::on_message(comm::comm_session &session, std::string_view message) const + // peer session on message callback method. + // validate and handle each type of peer messages. + int handle_peer_message(p2p::peer_comm_session &session, std::string_view message) { const p2pmsg::Container *container; if (p2pmsg::validate_and_extract_container(&container, message) != 0) @@ -74,6 +61,7 @@ namespace p2p } const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc + // Check whether the message is qualified for forwarding. if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type)) { @@ -99,8 +87,7 @@ namespace p2p reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); return session.send(msg); } - - if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response + else if (content_message_type == p2pmsg::Message_Peer_Challenge_Response_Message) // message is a peer challenge response { // Ignore if challenge is already resolved. if (session.challenge_status == comm::CHALLENGE_ISSUED) @@ -116,49 +103,7 @@ namespace p2p return 0; } - if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message - { - // We only trust proposals coming from trusted peers. - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name(); - return 0; - } - - std::scoped_lock lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. - - ctx.collected_msgs.proposals.push_back( - p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); - } - else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message - { - std::scoped_lock lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. - - ctx.collected_msgs.nonunl_proposals.push_back( - p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); - } - else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message - { - if (p2pmsg::validate_container_trust(container) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); - LOG_DEBUG << "NPL message rejected due to trust failure. " << session.display_name(); - return 0; - } - - const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message(); - npl_message msg; - msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data()); - msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); - msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl()); - - if (!consensus::push_npl_message(msg)) - { - LOG_DEBUG << "NPL message enqueue failure. " << session.display_name(); - } - } - else if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. + if (content_message_type == p2pmsg::Message_Connected_Status_Announcement_Message) // This message is the connected status announcement message. { const p2pmsg::Connected_Status_Announcement_Message *announcement_msg = content->message_as_Connected_Status_Announcement_Message(); session.is_weakly_connected = announcement_msg->is_weakly_connected(); @@ -171,6 +116,33 @@ namespace p2p LOG_DEBUG << "Strongly connected announcement received from " << session.display_name(); } } + else if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message + { + // We only trust proposals coming from trusted peers. + if (p2pmsg::validate_container_trust(container) != 0) + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Proposal rejected due to trust failure. " << session.display_name(); + return 0; + } + + handle_proposal_message(container, content); + } + else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message + { + handle_nonunl_proposal_message(container, content); + } + else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message + { + if (p2pmsg::validate_container_trust(container) != 0) + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1); + LOG_DEBUG << "NPL message rejected due to trust failure. " << session.display_name(); + return 0; + } + + handle_npl_message(container, content); + } else if (content_message_type == p2pmsg::Message_State_Request_Message) { // Insert request with lock. @@ -211,14 +183,78 @@ namespace p2p return 0; } + /** + * Handles messages that we receive from ourselves. + */ + int handle_self_message(std::string_view message) + { + const p2pmsg::Container *container; + if (p2pmsg::validate_and_extract_container(&container, message) != 0) + return 0; + + //Get serialised message content. + const flatbuffers::Vector *container_content = container->content(); + + //Accessing message content and size. + const uint8_t *content_ptr = container_content->Data(); + const flatbuffers::uoffset_t content_size = container_content->size(); + + const p2pmsg::Content *content; + if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0) + return 0; + + const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc + + if (content_message_type == p2pmsg::Message_Proposal_Message) // message is a proposal message + handle_proposal_message(container, content); + else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message + handle_nonunl_proposal_message(container, content); + else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message + handle_npl_message(container, content); + + return 0; + } + + void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) + { + std::scoped_lock lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. + + ctx.collected_msgs.proposals.push_back( + p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp(), container->lcl())); + } + + void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content) + { + std::scoped_lock lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. + + ctx.collected_msgs.nonunl_proposals.push_back( + p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); + } + + void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content) + { + const p2pmsg::Npl_Message *npl_p2p_msg = content->message_as_Npl_Message(); + npl_message msg; + msg.data = msg::fbuf::flatbuff_bytes_to_sv(npl_p2p_msg->data()); + msg.pubkey = msg::fbuf::flatbuff_bytes_to_sv(container->pubkey()); + msg.lcl = msg::fbuf::flatbuff_bytes_to_sv(container->lcl()); + + if (!consensus::push_npl_message(msg)) + { + LOG_DEBUG << "NPL message from self enqueue failure."; + } + } + //peer session on message callback method - void peer_session_handler::on_close(const comm::comm_session &session) const + int handle_peer_close(const comm::comm_session &session) { // Erase the corresponding uniqueid peer connection if it's this session. std::scoped_lock lock(ctx.peer_connections_mutex); const auto itr = ctx.peer_connections.find(session.uniqueid); if (itr != ctx.peer_connections.end() && itr->second == &session) ctx.peer_connections.erase(itr); + + return 0; } } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_session_handler.hpp b/src/p2p/peer_session_handler.hpp index 36ab7c45..46fe8dbe 100644 --- a/src/p2p/peer_session_handler.hpp +++ b/src/p2p/peer_session_handler.hpp @@ -1,20 +1,22 @@ -#ifndef _HP_PEER_SESSION_HANDLER_ -#define _HP_PEER_SESSION_HANDLER_ +#ifndef _HP_P2P_PEER_SESSION_HANDLER_ +#define _HP_P2P_PEER_SESSION_HANDLER_ #include "../pchheader.hpp" -#include "../comm/comm_session_handler.hpp" -#include "../comm/comm_session.hpp" +#include "../msg/fbuf/p2pmsg_container_generated.h" +#include "../msg/fbuf/p2pmsg_content_generated.h" +#include "peer_comm_session.hpp" + +namespace p2pmsg = msg::fbuf::p2pmsg; namespace p2p { - -class peer_session_handler : public comm::comm_session_handler -{ -public: - int on_connect(comm::comm_session &session) const; - int on_message(comm::comm_session &session, std::string_view message) const; - void on_close(const comm::comm_session &session) const; -}; + void handle_peer_connect(p2p::peer_comm_session &session); + int handle_peer_message(p2p::peer_comm_session &session, std::string_view message); + int handle_self_message(std::string_view message); + int handle_peer_close(const comm::comm_session &session); + void handle_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); + void handle_nonunl_proposal_message(const p2pmsg::Container *container, const p2pmsg::Content *content); + void handle_npl_message(const p2pmsg::Container *container, const p2pmsg::Content *content); } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/self_node.cpp b/src/p2p/self_node.cpp new file mode 100644 index 00000000..b62d8688 --- /dev/null +++ b/src/p2p/self_node.cpp @@ -0,0 +1,28 @@ +#include "../pchheader.hpp" +#include "peer_session_handler.hpp" + +namespace p2p::self +{ + // Holds self messages waiting to be processed. + moodycamel::ConcurrentQueue msg_queue; + + /** + * Processes the next queued message (if any). + * @return 0 if no messages in queue. 1 if message was processed successfully. -1 on error. + */ + int process_next_message() + { + std::string msg; + if (msg_queue.try_dequeue(msg)) + return p2p::handle_self_message(msg); + + return 0; + } + + void send(std::string_view message) + { + // Passing the ownership of message to the queue. + msg_queue.enqueue(std::string(message)); + } + +} // namespace p2p::self \ No newline at end of file diff --git a/src/p2p/self_node.hpp b/src/p2p/self_node.hpp new file mode 100644 index 00000000..f26e16b6 --- /dev/null +++ b/src/p2p/self_node.hpp @@ -0,0 +1,12 @@ +#ifndef _HP_P2P_SELF_NODE_ +#define _HP_P2P_SELF_NODE_ + +#include "../pchheader.hpp" + +namespace p2p::self +{ + int process_next_message(); + void send(std::string_view message); + +} // namespace p2p +#endif \ No newline at end of file diff --git a/src/usr/user_comm_server.hpp b/src/usr/user_comm_server.hpp new file mode 100644 index 00000000..4ccf0403 --- /dev/null +++ b/src/usr/user_comm_server.hpp @@ -0,0 +1,15 @@ +#ifndef _HP_USR_USER_COMM_SERVER_ +#define _HP_USR_USER_COMM_SERVER_ + +#include "../comm/comm_server.hpp" +#include "user_comm_session.hpp" + +namespace usr +{ + class user_comm_server : public comm::comm_server + { + using comm::comm_server::comm_server; // Inherit constructors. + }; +} // namespace usr + +#endif \ No newline at end of file diff --git a/src/usr/user_comm_session.cpp b/src/usr/user_comm_session.cpp new file mode 100644 index 00000000..533b26c0 --- /dev/null +++ b/src/usr/user_comm_session.cpp @@ -0,0 +1,41 @@ +#include "../pchheader.hpp" +#include "../util.hpp" +#include "user_comm_session.hpp" +#include "user_session_handler.hpp" + +namespace usr +{ + void user_comm_session::handle_connect() + { + usr::handle_user_connect(*this); + } + + int user_comm_session::handle_message(std::string_view msg) + { + return usr::handle_user_message(*this, msg); + } + + void user_comm_session::handle_close() + { + usr::handle_user_close(*this); + } + + /** + * Returns printable name for the session based on uniqueid (used for logging). + */ + const std::string user_comm_session::display_name() + { + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + // User sessions use binary pubkey as unique id. So we need to convert to hex. + std::string hex; + util::bin2hex(hex, + reinterpret_cast(uniqueid.data()), + uniqueid.length()); + return hex.substr(2, 10) + (is_inbound ? ":in" : ":out"); // Skipping first 2 bytes key type prefix. + } + + return comm_session::display_name(); + } + +} // namespace usr \ No newline at end of file diff --git a/src/usr/user_comm_session.hpp b/src/usr/user_comm_session.hpp new file mode 100644 index 00000000..207930c2 --- /dev/null +++ b/src/usr/user_comm_session.hpp @@ -0,0 +1,27 @@ +#ifndef _HP_USR_USER_COMM_SESSION_ +#define _HP_USR_USER_COMM_SESSION_ + +#include "../pchheader.hpp" +#include "../comm/comm_session.hpp" + +namespace usr +{ + /** + * Represents a WebSocket connection to a Hot Pocket user. + */ + class user_comm_session : public comm::comm_session + { + using comm_session::comm_session; // Inherit constructors. + + private: + void handle_connect(); + int handle_message(std::string_view msg); + void handle_close(); + + public: + const std::string display_name(); + }; + +} // namespace usr + +#endif diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 9867d5ca..101e4456 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -4,90 +4,84 @@ #include "../bill/corebill.h" #include "usr.hpp" #include "user_session_handler.hpp" +#include "user_comm_session.hpp" namespace jusrmsg = msg::usrmsg::json; namespace usr { - -/** - * This gets hit every time a client connects to HP via the public port (configured in contract config). - */ -int user_session_handler::on_connect(comm::comm_session &session) const -{ - if (conf::cfg.pubmaxcons > 0 && ctx.users.size() >= conf::cfg.pubmaxcons) + /** + * This gets hit every time a client connects to HP via the public port (configured in contract config). + */ + void handle_user_connect(usr::user_comm_session &session) { - LOG_DEBUG << "Max user connections reached. Dropped connection " << session.display_name(); + LOG_DEBUG << "User client connected " << session.display_name(); + + // As soon as a user connects, we issue them a challenge message. We remember the + // challenge we issued and later verify the user's response with it. + std::vector msg; + jusrmsg::create_user_challenge(msg, session.issued_challenge); + session.send(msg); + + // Set the challenge-issued value to true. + session.challenge_status = comm::CHALLENGE_ISSUED; + } + + /** + * This gets hit every time we receive some data from a client connected to the HP public port. + */ + int handle_user_message(usr::user_comm_session &session, std::string_view message) + { + // First check whether this session is pending challenge. + // Meaning we have previously issued a challenge to the client. + if (session.challenge_status == comm::CHALLENGE_ISSUED) + { + if (verify_challenge(message, session) == 0) + return 0; + } + // Check whether this session belongs to an authenticated (challenge-verified) user. + else if (session.challenge_status == comm::CHALLENGE_VERIFIED) + { + // Check whether this user is among authenticated users + // and perform authenticated msg processing. + + const auto itr = ctx.users.find(session.uniqueid); + if (itr != ctx.users.end()) + { + // This is an authed user. + connected_user &user = itr->second; + if (handle_user_message(user, message) != 0) + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DEBUG << "Bad message from user " << session.display_name(); + } + } + else + { + session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); + LOG_DEBUG << "User session id not found: " << session.display_name(); + } + + return 0; + } + + // If for any reason we reach this point, we should drop the connection because none of the + // valid cases match. + LOG_DEBUG << "Dropping the user connection " << session.display_name(); + corebill::report_violation(session.host_address); return -1; } - LOG_DEBUG << "User client connected " << session.display_name(); - - // As soon as a user connects, we issue them a challenge message. We remember the - // challenge we issued and later verify the user's response with it. - std::vector msg; - jusrmsg::create_user_challenge(msg, session.issued_challenge); - session.send(msg); - - // Set the challenge-issued value to true. - session.challenge_status = comm::CHALLENGE_ISSUED; - - return 0; -} - -/** - * This gets hit every time we receive some data from a client connected to the HP public port. - */ -int user_session_handler::on_message(comm::comm_session &session, std::string_view message) const -{ - // First check whether this session is pending challenge. - // Meaning we have previously issued a challenge to the client. - if (session.challenge_status == comm::CHALLENGE_ISSUED) + /** + * This gets hit every time a client disconnects from the HP public port. + */ + int handle_user_close(const usr::user_comm_session &session) { - if (verify_challenge(message, session) == 0) - return 0; - } - // Check whether this session belongs to an authenticated (challenge-verified) user. - else if (session.challenge_status == comm::CHALLENGE_VERIFIED) - { - // Check whether this user is among authenticated users - // and perform authenticated msg processing. - - const auto itr = ctx.users.find(session.uniqueid); - if (itr != ctx.users.end()) - { - // This is an authed user. - connected_user &user = itr->second; - if (handle_user_message(user, message) != 0) - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "Bad message from user " << session.display_name(); - } - } - else - { - session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); - LOG_DEBUG << "User session id not found: " << session.display_name(); - } + // Session belongs to an authed user. + if (session.challenge_status == comm::CHALLENGE_VERIFIED) + remove_user(session.uniqueid); return 0; } - // If for any reason we reach this point, we should drop the connection because none of the - // valid cases match. - LOG_DEBUG << "Dropping the user connection " << session.display_name(); - corebill::report_violation(session.address); - return -1; -} - -/** - * This gets hit every time a client disconnects from the HP public port. - */ -void user_session_handler::on_close(const comm::comm_session &session) const -{ - // Session belongs to an authed user. - if (session.challenge_status == comm::CHALLENGE_VERIFIED) - remove_user(session.uniqueid); -} - } // namespace usr \ No newline at end of file diff --git a/src/usr/user_session_handler.hpp b/src/usr/user_session_handler.hpp index 36a0eeeb..73511eb4 100644 --- a/src/usr/user_session_handler.hpp +++ b/src/usr/user_session_handler.hpp @@ -2,19 +2,13 @@ #define _HP_USER_SESSION_HANDLER_ #include "../pchheader.hpp" -#include "../comm/comm_session_handler.hpp" -#include "../comm/comm_session.hpp" +#include "user_comm_session.hpp" namespace usr { - -class user_session_handler : public comm::comm_session_handler -{ -public: - int on_connect(comm::comm_session &session) const; - int on_message(comm::comm_session &session, std::string_view message) const; - void on_close(const comm::comm_session &session) const; -}; + void handle_user_connect(usr::user_comm_session &session); + int handle_user_message(usr::user_comm_session &session, std::string_view message); + int handle_user_close(const usr::user_comm_session &session); } // namespace usr diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 6bc35796..aa93deaa 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -2,8 +2,6 @@ #include "../msg/json/usrmsg_json.hpp" #include "../msg/usrmsg_parser.hpp" #include "../msg/usrmsg_common.hpp" -#include "../comm/comm_server.hpp" -#include "../comm/comm_session.hpp" #include "../util.hpp" #include "../conf.hpp" #include "../crypto.hpp" @@ -11,6 +9,8 @@ #include "../ledger.hpp" #include "usr.hpp" #include "user_session_handler.hpp" +#include "user_comm_session.hpp" +#include "user_comm_server.hpp" #include "user_input.hpp" #include "read_req.hpp" @@ -48,7 +48,7 @@ namespace usr void deinit() { if (init_success) - ctx.listener.stop(); + ctx.server->stop(); } /** @@ -56,8 +56,8 @@ namespace usr */ int start_listening() { - if (ctx.listener.start( - conf::cfg.pubport, comm::SESSION_TYPE::USER, metric_thresholds, std::set(), conf::cfg.pubmaxsize) == -1) + ctx.server.emplace("User", conf::cfg.pubport, metric_thresholds, conf::cfg.pubmaxsize); + if (ctx.server->start() == -1) return -1; LOG_INFO << "Started listening for user connections on " << std::to_string(conf::cfg.pubport); @@ -70,7 +70,7 @@ namespace usr * @param session The socket session that received the response. * @return 0 for successful verification. -1 for failure. */ - int verify_challenge(std::string_view message, comm::comm_session &session) + int verify_challenge(std::string_view message, usr::user_comm_session &session) { // The received message must be the challenge response. We need to verify it. if (session.issued_challenge.empty()) @@ -173,7 +173,7 @@ namespace usr /** * Send the specified contract input status result via the provided session. */ - void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, + void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig) { std::vector msg; @@ -190,7 +190,7 @@ namespace usr * @param protocol_code Messaging protocol used by user. * @return 0 on successful additions. -1 on failure. */ - int add_user(comm::comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code) + int add_user(usr::user_comm_session &session, const std::string &pubkey_hex, std::string_view protocol_code) { // Decode hex pubkey and get binary pubkey. We are only going to keep // the binary pubkey due to reduced memory footprint. diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 08dcbc85..b9e13b93 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -3,9 +3,9 @@ #include "../pchheader.hpp" #include "../util.hpp" -#include "../comm/comm_server.hpp" -#include "../comm/comm_session.hpp" #include "../msg/usrmsg_parser.hpp" +#include "user_comm_session.hpp" +#include "user_comm_server.hpp" #include "user_session_handler.hpp" #include "user_input.hpp" @@ -15,9 +15,9 @@ namespace usr { /** - * Holds information about an authenticated (challenge-verified) user - * connected to the HotPocket node. - */ + * Holds information about an authenticated (challenge-verified) user + * connected to the HotPocket node. + */ struct connected_user { // User binary public key @@ -31,7 +31,7 @@ namespace usr // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. - comm::comm_session &session; + usr::user_comm_session &session; // The messaging protocol used by this user. const util::PROTOCOL protocol = util::PROTOCOL::JSON; @@ -40,15 +40,15 @@ namespace usr * @param session The web socket session the user is connected to. * @param pubkey The public key of the user in binary format. */ - connected_user(comm::comm_session &session, std::string_view pubkey, util::PROTOCOL protocol) + connected_user(usr::user_comm_session &session, std::string_view pubkey, util::PROTOCOL protocol) : session(session), pubkey(pubkey), protocol(protocol) { } }; /** - * The context struct to hold global connected-users and related objects. - */ + * The context struct to hold global connected-users and related objects. + */ struct connected_context { // Connected (authenticated) user list. @@ -56,7 +56,7 @@ namespace usr std::unordered_map users; std::mutex users_mutex; // Mutex for users access race conditions. - comm::comm_server listener; + std::optional server; }; extern connected_context ctx; @@ -66,14 +66,14 @@ namespace usr int start_listening(); - int verify_challenge(std::string_view message, comm::comm_session &session); + int verify_challenge(std::string_view message, usr::user_comm_session &session); int handle_user_message(connected_user &user, std::string_view message); - void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, + void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig); - int add_user(comm::comm_session &session, const std::string &user_pubkey_hex, std::string_view protocol_code); + int add_user(usr::user_comm_session &session, const std::string &user_pubkey_hex, std::string_view protocol_code); int remove_user(const std::string &pubkey); diff --git a/test/local-cluster/cluster-create.sh b/test/local-cluster/cluster-create.sh index 5f2e64e1..e29b091e 100755 --- a/test/local-cluster/cluster-create.sh +++ b/test/local-cluster/cluster-create.sh @@ -69,7 +69,8 @@ do cp ../bin/appbill ./node$n/bin/ pushd ./node$n/bin > /dev/null 2>&1 - npm install + # Uncomment this if the contract needs npm install. + # npm install popd done