diff --git a/README.md b/README.md index bf7777c9..a16e2102 100644 --- a/README.md +++ b/README.md @@ -14,7 +14,8 @@ A C++ version of hotpocket designed for production envrionments, original protot * P2P Protocol - https://google.github.io/flatbuffers * Fuse filesystem - https://github.com/libfuse/libfuse * Boost - https://www.boost.org -* Concurrent queue - https://github.com/cameron314/readerwriterqueue +* Reader Writer Queue - https://github.com/cameron314/readerwriterqueue +* Concurrent Queue - https://github.com/cameron314/concurrentqueue ## Steps to setup Hot Pocket (For Ubuntu/Debian) @@ -36,8 +37,7 @@ Instructions are based on [this](https://libsodium.gitbook.io/doc/installation). 1. Clone [blake3 library](https://github.com/BLAKE3-team/BLAKE3) repository 2. Navigate into the directory in a terminal. 3. `cd c` to navigate to the C implementation folder -4. `gcc -shared -fPIC -O3 -o libblake3.so blake3.c blake3_dispatch.c blake3_portable.c \` - `blake3_sse41_x86-64_unix.S blake3_avx2_x86-64_unix.S blake3_avx512_x86-64_unix.S` +4. `gcc -shared -fPIC -O3 -o libblake3.so blake3.c blake3_dispatch.c blake3_portable.c blake3_sse2_x86-64_unix.S blake3_sse41_x86-64_unix.S blake3_avx2_x86-64_unix.S blake3_avx512_x86-64_unix.S` 5. `sudo cp blake3.h /usr/local/include/` 6. `sudo cp libblake3.so /usr/local/lib/` @@ -87,6 +87,10 @@ Example: When you make a change to `p2pmsg_content_.fbc` defnition file, you nee 3. `cmake ..` 4. `sudo make install` +#### Install concurrent queue +1. Download [concurrentqueue 1.0.2](https://github.com/cameron314/concurrentqueue/archive/1.0.2.zip) and extract. +2. Run `sudo cp concurrentqueue.h /usr/local/include/` + #### Run ldconfig `sudo ldconfig` diff --git a/src/comm/comm_server.cpp b/src/comm/comm_server.cpp index 62e7fabb..af8ecb8b 100644 --- a/src/comm/comm_server.cpp +++ b/src/comm/comm_server.cpp @@ -157,6 +157,10 @@ namespace comm { std::scoped_lock lock(sessions_mutex); const auto [itr, success] = sessions.try_emplace(client_fd, std::move(session)); + + // Thread is seperately started after the moving operation to overcome the difficulty + // in accessing class member variables inside the thread. + // Class member variables gives unacceptable values if the thread starts before the move operation. itr->second.start_messaging_threads(); } } @@ -209,6 +213,9 @@ namespace comm { std::scoped_lock lock(sessions_mutex); const auto [itr, success] = sessions.try_emplace(client.read_fd, std::move(session)); + // Thread is seperately started after the moving operation to overcome the difficulty + // in accessing class member variables inside the thread. + // Class member variables gives unacceptable values if the thread starts before the move operation. itr->second.start_messaging_threads(); } diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index da5e030c..f21d568e 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -38,16 +38,20 @@ namespace comm thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS)); } + /** + * Starts the outbound queue processing thread. + */ void comm_session::start_messaging_threads() { reader_thread = std::thread(&comm_session::reader_loop, this); + writer_thread = std::thread(&comm_session::process_outbound_msg_queue, this); } void comm_session::reader_loop() { util::mask_signal(); - while (!should_stop_messaging_threads) + while (state != SESSION_STATE::CLOSED) { pollfd pollfds[1] = {{read_fd, READER_POLL_EVENTS}}; @@ -164,17 +168,38 @@ namespace comm return 0; } - int comm_session::send(const std::vector &message) const + int comm_session::send(const std::vector &message) { std::string_view sv(reinterpret_cast(message.data()), message.size()); send(sv); } - int comm_session::send(std::string_view message) const + /** + * Adds the given message to the outbound message queue. + * @param message Message to be added to the outbound queue. + * @return 0 on successful addition and -1 if the session is already closed. + */ + int comm_session::send(std::string_view message) { + // Making a copy of the message before it is destroyed from the parent scope. + std::string msg(message); + if (state == SESSION_STATE::CLOSED) return -1; + // Passing the ownership of msg to the queue using move operator for memory efficiency. + out_msg_queue.enqueue(std::move(msg)); + + return 0; + } + + /** + * This function constructs and sends the message to the node from the given message. + * @param message Message to be sent via the pipe. + * @return 0 on successful message sent and -1 on error. + */ + int comm_session::process_outbound_message(std::string_view message) + { // Prepare the memory segments to map with writev(). iovec memsegs[2]; @@ -211,6 +236,32 @@ namespace comm return 0; } + /** + * Process message sending in the queue in the outbound_queue_thread. + */ + void comm_session::process_outbound_msg_queue() + { + // Appling a signal mask to prevent receiving control signals from linux kernel. + util::mask_signal(); + + // Keep checking until the session is terminated. + while (state != SESSION_STATE::CLOSED) + { + std::string msg_to_send; + + // If the queue is not empty, the first element will be processed, + // else wait 10ms until queue gets populated. + if (out_msg_queue.try_dequeue(msg_to_send)) + { + process_outbound_message(msg_to_send); + } + else + { + util::sleep(10); + } + } + } + /** * Mark the session as needing to close. The session will be properly "closed" * and cleaned up by the global comm_server thread. @@ -240,12 +291,14 @@ namespace comm peer_sess_handler.on_close(*this); } - should_stop_messaging_threads = true; // Set the messaging thread stop flag before closing the fds. state = SESSION_STATE::CLOSED; ::close(read_fd); if (read_fd != write_fd) ::close(write_fd); + + // Wait untill both reader & writer threads gracefully stop. reader_thread.join(); + writer_thread.join(); LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " << uniqueid.substr(0, 10) << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 89d1ad37..01822df5 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -17,10 +17,10 @@ namespace comm enum SESSION_STATE { - NOT_INITIALIZED, // Session is not yet initialized properly. - ACTIVE, // Session is active and functioning. - MUST_CLOSE, // Session socket is in unusable state and must be closed. - CLOSED // Session is fully closed. + NOT_INITIALIZED, // Session is not yet initialized properly. + ACTIVE, // Session is active and functioning. + MUST_CLOSE, // Session socket is in unusable state and must be closed. + CLOSED // Session is fully closed. }; enum SESSION_TYPE @@ -38,15 +38,16 @@ namespace comm const int write_fd = 0; const SESSION_TYPE session_type; const uint64_t max_msg_size = 0; - std::vector thresholds; // track down various communication thresholds + std::vector thresholds; // track down various communication thresholds - uint32_t expected_msg_size = 0; // Next expected message size based on size header. - std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. - uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. + uint32_t expected_msg_size = 0; // Next expected message size based on size header. + std::vector read_buffer; // Local buffer to keep collecting data until a complete message can be constructed. + uint32_t read_buffer_filled_size = 0; // How many bytes have been buffered so far. - bool should_stop_messaging_threads = false; // Indicates whether messaging threads has been instructed to stop. - std::thread reader_thread; // The thread responsible for reading messages from the read fd. - moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + std::thread writer_thread; // The thread responsible for writing messages to the write fd. + moodycamel::ReaderWriterQueue> in_msg_queue; // Holds incoming messages waiting to be processed. + moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. void reader_loop(); int attempt_read(); @@ -69,8 +70,10 @@ namespace comm int on_connect(); void start_messaging_threads(); int process_next_inbound_message(); - int send(const std::vector &message) const; - int send(std::string_view message) const; + int send(const std::vector &message); + int send(std::string_view message); + int process_outbound_message(std::string_view message); + void process_outbound_msg_queue(); void mark_for_closure(); void close(const bool invoke_handler = true); diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 3eff39cc..a0346529 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -363,7 +363,7 @@ namespace cons for (const auto &[pubkey, umsgs] : p.user_inputs) { // Locate this user's socket session in case we need to send any status messages regarding user inputs. - const comm::comm_session *session = usr::get_session_by_pubkey(pubkey); + comm::comm_session *session = usr::get_session_by_pubkey(pubkey); // Populate user list with this user's pubkey. ctx.candidate_users.emplace(pubkey); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 0c0d8a98..bf231148 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -188,7 +188,7 @@ namespace p2p std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - const comm::comm_session *session = peer_itr->second; + comm::comm_session *session = peer_itr->second; session->send(msg); } } @@ -222,7 +222,7 @@ namespace p2p std::advance(it, random_peer_index); //move iterator to point to random selected peer. //send message to selected peer. - const comm::comm_session *session = it->second; + comm::comm_session *session = it->second; if (!session->is_self) // Exclude self peer. { std::string_view msg = std::string_view( diff --git a/src/pchheader.hpp b/src/pchheader.hpp index fe38afbe..371de951 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -55,5 +55,6 @@ #include #include #include +#include #endif \ No newline at end of file diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index c094c12b..795e2efb 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -93,7 +93,7 @@ namespace state_serve std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - const comm::comm_session *session = peer_itr->second; + comm::comm_session *session = peer_itr->second; session->send(msg); } } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 47bd1f14..715f89c1 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -203,7 +203,7 @@ namespace usr /** * Send the specified contract input status result via the provided session. */ - void send_input_status(const msg::usrmsg::usrmsg_parser &parser, const comm::comm_session &session, + void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig) { std::vector msg; @@ -220,7 +220,7 @@ namespace usr * @param protocol Messaging protocol used by user. * @return 0 on successful additions. -1 on failure. */ - int add_user(const comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol) + int add_user(comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol) { const std::string &sessionid = session.uniqueid; if (ctx.users.count(sessionid) == 1) @@ -269,11 +269,11 @@ namespace usr } /** - * Finds and returns the socket session for the proided user pubkey. - * @param pubkey User binary pubkey. - * @return Pointer to the socket session. NULL of not found. - */ - const comm::comm_session *get_session_by_pubkey(const std::string &pubkey) + * Finds and returns the socket session for the proided user pubkey. + * @param pubkey User binary pubkey. + * @return Pointer to the socket session. NULL of not found. + */ + comm::comm_session *get_session_by_pubkey(const std::string &pubkey) { const auto sessionid_itr = ctx.sessionids.find(pubkey); if (sessionid_itr != ctx.sessionids.end()) diff --git a/src/usr/usr.hpp b/src/usr/usr.hpp index 3c7d06f5..b4c50ff3 100644 --- a/src/usr/usr.hpp +++ b/src/usr/usr.hpp @@ -31,16 +31,16 @@ namespace usr // Holds the websocket session of this user. // We don't need to own the session object since the lifetime of user and session are coupled. - const comm::comm_session &session; + comm::comm_session &session; // The messaging protocol used by this user. const util::PROTOCOL protocol = util::PROTOCOL::JSON; /** - * @param session The web socket session the user is connected to. - * @param pubkey The public key of the user in binary format. - */ - connected_user(const comm::comm_session &session, std::string_view pubkey, util::PROTOCOL protocol) + * @param session The web socket session the user is connected to. + * @param pubkey The public key of the user in binary format. + */ + connected_user(comm::comm_session &session, std::string_view pubkey, util::PROTOCOL protocol) : session(session), pubkey(pubkey), protocol(protocol) { } @@ -75,14 +75,14 @@ namespace usr int handle_user_message(connected_user &user, std::string_view message); - void send_input_status(const msg::usrmsg::usrmsg_parser &parser, const comm::comm_session &session, + void send_input_status(const msg::usrmsg::usrmsg_parser &parser, comm::comm_session &session, std::string_view status, std::string_view reason, std::string_view input_sig); - int add_user(const comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol); + int add_user(comm::comm_session &session, const std::string &pubkey, const util::PROTOCOL protocol); int remove_user(const std::string &sessionid); - const comm::comm_session *get_session_by_pubkey(const std::string &pubkey); + comm::comm_session *get_session_by_pubkey(const std::string &pubkey); } // namespace usr