From 2dfd7cf6bc4530fd7744c2a9ef0170a7663c0739 Mon Sep 17 00:00:00 2001 From: Chalith Desaman Date: Fri, 4 Jun 2021 12:06:22 +0530 Subject: [PATCH] Introduce message threads and message handling (#4) --- CMakeLists.txt | 5 +- README.md | 6 + dev-setup.sh | 23 +++ examples/message-board/message-board.js | 17 +- src/comm/comm_handler.cpp | 81 ++++++-- src/comm/comm_handler.hpp | 15 +- src/comm/comm_session.cpp | 209 ++++++++++++++++++-- src/comm/comm_session.hpp | 20 +- src/conf.cpp | 10 +- src/main.cpp | 3 + src/msg/json/msg_json.cpp | 252 ++++++++++++++++++++++++ src/msg/json/msg_json.hpp | 30 +++ src/msg/msg_common.hpp | 55 ++++++ src/msg/msg_parser.cpp | 41 ++++ src/msg/msg_parser.hpp | 25 +++ src/pchheader.hpp | 3 + src/util/util.cpp | 8 + src/util/util.hpp | 2 + 18 files changed, 762 insertions(+), 43 deletions(-) create mode 100644 src/msg/json/msg_json.cpp create mode 100644 src/msg/json/msg_json.hpp create mode 100644 src/msg/msg_common.hpp create mode 100644 src/msg/msg_parser.cpp create mode 100644 src/msg/msg_parser.hpp diff --git a/CMakeLists.txt b/CMakeLists.txt index d4b7d08..b33a8d5 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,16 +13,19 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY build) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-unused-result -Wreturn-type") add_executable(sagent + src/conf.cpp src/comm/comm_handler.cpp src/comm/comm_session.cpp src/util/util.cpp - src/conf.cpp src/salog.cpp src/sqlite.cpp src/main.cpp + src/msg/msg_parser.cpp + src/msg/json/msg_json.cpp ) target_link_libraries(sagent + libboost_stacktrace_backtrace.a sqlite3 pthread ${CMAKE_DL_LIBS} # Needed for stacktrace support diff --git a/README.md b/README.md index 53e78b2..f8af004 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,10 @@ A C++ version of sashimono agent ## Libraries +* jsoncons (for JSON and BSON) - https://github.com/danielaparker/jsoncons +* Boost Stacktrace - https://www.boost.org +* Reader Writer Queue - https://github.com/cameron314/readerwriterqueue +* Concurrent Queue - https://github.com/cameron314/concurrentqueue ## Setting up Sashimono Agent environment Run the setup script located at the repo root (tested on Ubuntu 18.04). @@ -28,4 +32,6 @@ Code is divided into subsystems via namespaces. **util::** Contains shared data structures/helper functions used by multiple subsystems. +**msg::** Extract message data from received raw messages. + **sqlite::** Contains sqlite database management related helper functions. \ No newline at end of file diff --git a/dev-setup.sh b/dev-setup.sh index 4f8e376..4ece42b 100755 --- a/dev-setup.sh +++ b/dev-setup.sh @@ -41,6 +41,29 @@ sudo cp -r include/plog /usr/local/include/ popd > /dev/null 2>&1 rm 1.1.5.tar.gz && rm -r plog-1.1.5 +# Boost stacktrace +sudo apt-get install -y libboost-stacktrace-dev + +# Reader-Writer queue +wget https://github.com/cameron314/readerwriterqueue/archive/v1.0.3.tar.gz +tar -zxvf v1.0.3.tar.gz +pushd readerwriterqueue-1.0.3 > /dev/null 2>&1 +mkdir build +pushd build > /dev/null 2>&1 +cmake .. +sudo make install +popd > /dev/null 2>&1 +popd > /dev/null 2>&1 +rm v1.0.3.tar.gz && rm -r readerwriterqueue-1.0.3 + +# Concurrent queue +wget https://github.com/cameron314/concurrentqueue/archive/1.0.2.tar.gz +tar -zxvf 1.0.2.tar.gz +pushd concurrentqueue-1.0.2 > /dev/null 2>&1 +sudo cp concurrentqueue.h /usr/local/include/ +popd > /dev/null 2>&1 +rm 1.0.2.tar.gz && rm -r concurrentqueue-1.0.2 + # Update linker library cache. sudo ldconfig diff --git a/examples/message-board/message-board.js b/examples/message-board/message-board.js index 7acd996..6a47200 100644 --- a/examples/message-board/message-board.js +++ b/examples/message-board/message-board.js @@ -22,7 +22,7 @@ const wss = new WebSocket.Server({ server }); wss.on('connection', (ws) => { ws.on('message', (msg) => { - console.log('Received: ', msg); + console.log('Received: ', Buffer.from(msg).toString()); }); }); @@ -71,7 +71,7 @@ server.listen(8080, () => { sendToAll(JSON.stringify({ id: uuidv4(), type: 'create', - ownerPubKey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613' + owner_pubkey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613' })); break; case 'destroy': @@ -79,18 +79,17 @@ server.listen(8080, () => { sendToAll(JSON.stringify({ id: uuidv4(), type: 'destroy', - ownerPubKey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', - contractId + owner_pubkey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', + contract_id: contractId })) - break; case 'start': contractId = await askForContractId(); sendToAll(JSON.stringify({ id: uuidv4(), type: 'start', - ownerPubKey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', - contractId + owner_pubkey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', + contract_id: contractId })) break; case 'stop': @@ -98,8 +97,8 @@ server.listen(8080, () => { sendToAll(JSON.stringify({ id: uuidv4(), type: 'stop', - ownerPubKey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', - contractId + owner_pubkey: 'ed7a4b931bdc5dd79b77a8b6ac293d998c123db42bb3ec2613', + contract_id: contractId })) break; diff --git a/src/comm/comm_handler.cpp b/src/comm/comm_handler.cpp index 9b920e0..43105e8 100644 --- a/src/comm/comm_handler.cpp +++ b/src/comm/comm_handler.cpp @@ -1,19 +1,19 @@ #include "comm_handler.hpp" #include "../util/util.hpp" +#include "../conf.hpp" #include "hpws.hpp" -#include "comm_session.hpp" namespace comm { - constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 5 * 1024 * 1024; - std::optional session; + constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 1 * 1024 * 1024; // 1MB; bool init_success; + comm_ctx ctx; + int init() { - if (connect(conf::cfg.server.ip_port) == -1) - return -1; - + ctx.comm_handler_thread = std::thread(comm_handler_loop); + init_success = true; return 0; @@ -22,9 +22,20 @@ namespace comm void deinit() { if (init_success) - disconnect(); + { + ctx.is_shutting_down = true; + + if (ctx.comm_handler_thread.joinable()) + ctx.comm_handler_thread.join(); + } } + /** + * Make a connection and session to the given host. + * This only gets called whithin the comm handler thread. + * @param ip_port Ip and port of the host. + * @return 0 on success -1 on error. + */ int connect(const conf::host_ip_port &ip_port) { std::string_view host = ip_port.host_address; @@ -54,27 +65,69 @@ namespace comm else { const std::string &host_address = std::get(host_result); - session.emplace(host_address, std::move(client)); - session->init(); + ctx.session.emplace(host_address, std::move(client)); + ctx.session->init(); } } return 0; } + /** + * Disconnect the session. + * This only gets called whithin the comm handler thread. + */ void disconnect() { - if (session.has_value()) + if (ctx.session.has_value()) { - session->close(); - session.reset(); + ctx.session->close(); + ctx.session.reset(); } } + void comm_handler_loop() + { + LOG_INFO << "Message processor started."; + + util::mask_signal(); + + while (!ctx.is_shutting_down) + { + // Process queued messaged only if there's a session. + if (ctx.session.has_value()) + { + // If no messages were processed in this cycle, wait for some time. + if (ctx.session->process_inbound_msg_queue() <= 0) + util::sleep(10); + + // If session is marked for closure since there's an issue, We disconnect the current session. + // And try to create a new session in the next round + if (ctx.session->state == SESSION_STATE::MUST_CLOSE) + { + LOG_DEBUG << "Closing the session due to a failure: " << ctx.session->display_name(); + disconnect(); + util::sleep(1000); + } + } + else + { + // If host connection failed wait for some time. + if (connect(conf::cfg.server.ip_port) == -1) + util::sleep(1000); + } + } + + // Disconnect the host at the termination. + disconnect(); + + LOG_INFO << "Message processor stopped."; + } + /** - * Wait for the session. + * Wait for the comm handler thread. */ void wait() { - session->wait(); + ctx.comm_handler_thread.join(); } } // namespace comm diff --git a/src/comm/comm_handler.hpp b/src/comm/comm_handler.hpp index f2e00dd..7f6b081 100644 --- a/src/comm/comm_handler.hpp +++ b/src/comm/comm_handler.hpp @@ -2,10 +2,19 @@ #define _SA_COMM_COMM_SERVER_ #include "../pchheader.hpp" -#include "../conf.hpp" +#include "comm_session.hpp" namespace comm { + struct comm_ctx + { + std::optional session; + bool is_shutting_down = false; + std::thread comm_handler_thread; // Incoming message processor thread. + }; + + extern comm_ctx ctx; + int init(); void deinit(); @@ -14,8 +23,10 @@ namespace comm void disconnect(); + void comm_handler_loop(); + void wait(); - + } // namespace comm #endif diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index f050949..9e1dbde 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -1,17 +1,17 @@ -#include "../pchheader.hpp" -#include "../util/util.hpp" -#include "../conf.hpp" -#include "hpws.hpp" #include "comm_session.hpp" +#include "../util/util.hpp" namespace comm { + constexpr uint16_t MAX_IN_MSG_QUEUE_SIZE = 64; // Maximum in message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31).... comm_session::comm_session( std::string_view host_address, hpws::client &&hpws_client) : uniqueid(host_address), host_address(host_address), - hpws_client(std::move(hpws_client)) + hpws_client(std::move(hpws_client)), + msg_parser(msg::msg_parser()), + in_msg_queue(MAX_IN_MSG_QUEUE_SIZE) { } @@ -25,22 +25,35 @@ namespace comm if (state == SESSION_STATE::NONE) { reader_thread = std::thread(&comm_session::reader_loop, this); + writer_thread = std::thread(&comm_session::outbound_msg_queue_processor, this); state = SESSION_STATE::ACTIVE; + + // Send an initial message to the host. + std::string res; + msg_parser.create_response(res, msg::MSGTYPE_INIT, "Connection initiated."); + send(res); LOG_DEBUG << "Session started: " << uniqueid; } return 0; } + /** + * Listening for receiving messages and process them. + */ void comm_session::reader_loop() { util::mask_signal(); while (state != SESSION_STATE::CLOSED && hpws_client) { + // If reading from the hpws_client failed we'll mark this session to closure. + bool should_disconnect = false; + const std::variant read_result = hpws_client->read(); if (std::holds_alternative(read_result)) { + should_disconnect = true; const hpws::error error = std::get(read_result); if (error.first != 1) // 1 indicates channel has closed. LOG_DEBUG << "hpws client read failed:" << error.first << " " << error.second; @@ -49,20 +62,189 @@ namespace comm { // Enqueue the message for processing. std::string_view data = std::get(read_result); - - LOG_INFO << "Received message : " << data; + in_msg_queue.try_enqueue(std::string(data)); // Signal the hpws client that we are ready for next message. const std::optional error = hpws_client->ack(data); if (error.has_value()) + { + should_disconnect = true; LOG_DEBUG << "hpws client ack failed:" << error->first << " " << error->second; + } + } + + if (should_disconnect) + { + // Here we mark the session as needing to close. + // The session will be properly "closed" and cleared from comm_handler. + // Then comm_handler will try to initiate a new session with the host. + mark_for_closure(); + break; } } } + /** + * Processes the unprocessed queued inbound messages (if any). + * @return 0 if no messages in queue. 1 if messages were processed. -1 error occured + */ + int comm_session::process_inbound_msg_queue() + { + if (state == SESSION_STATE::CLOSED) + return -1; + + bool messages_processed = false; + std::string msg_to_process; + + // Process all messages in queue. + while (in_msg_queue.try_dequeue(msg_to_process)) + { + handle_message(msg_to_process); + msg_to_process.clear(); + messages_processed = true; + } + + return messages_processed ? 1 : 0; + } + + /** + * This function constructs and sends the message to the target from the given message. + * @param message Message to be sent via the pipe. + * @return 0 on successful message sent and -1 on error. + */ + int comm_session::process_outbound_message(std::string_view message) + { + if (state == SESSION_STATE::CLOSED || !hpws_client) + return -1; + + const std::optional error = hpws_client->write(message); + if (error.has_value()) + { + LOG_ERROR << "hpws client write failed:" << error->first << " " << error->second; + return -1; + } + return 0; + } + + /** + * Loop to keep processing outbound messages in the queue. + */ + void comm_session::outbound_msg_queue_processor() + { + // Appling a signal mask to prevent receiving control signals from linux kernel. + util::mask_signal(); + + // Keep checking until the session is terminated. + while (state != SESSION_STATE::CLOSED) + { + bool messages_sent = false; + std::string msg_to_send; + + // Send all messages in queue. + while (out_msg_queue.try_dequeue(msg_to_send)) + { + process_outbound_message(msg_to_send); + msg_to_send.clear(); + messages_sent = true; + } + + // Wait for small delay if there were no outbound messages. + if (!messages_sent) + util::sleep(10); + } + } + + /** + * Handles the received message. + * @param msg Received message. + * @return 0 on success -1 on error. + */ + int comm_session::handle_message(std::string_view msg) + { + std::string type; + std::string id; + if (msg_parser.parse(msg) == -1 || msg_parser.extract_type(type) == -1) + return -1; + + if (type == msg::MSGTYPE_CREATE) + { + msg::create_msg msg; + if (msg_parser.extract_create_message(msg) == -1) + return -1; + id = msg.id; + LOG_INFO << "---------------Create signal received--------------"; + LOG_INFO << "---------------Pubkey: " << msg.pubkey << "--------------"; + } + else if (type == msg::MSGTYPE_DESTROY) + { + msg::destroy_msg msg; + if (msg_parser.extract_destroy_message(msg)) + return -1; + id = msg.id; + LOG_INFO << "---------------Destroy signal received--------------"; + LOG_INFO << "---------------Pubkey: " << msg.pubkey << ", ContractId: " << msg.contract_id << "--------------"; + } + else if (type == msg::MSGTYPE_START) + { + msg::start_msg msg; + if (msg_parser.extract_start_message(msg)) + return -1; + id = msg.id; + LOG_INFO << "---------------Start signal received--------------"; + LOG_INFO << "---------------Pubkey: " << msg.pubkey << ", ContractId: " << msg.contract_id << "--------------"; + } + else if (type == msg::MSGTYPE_STOP) + { + msg::stop_msg msg; + if (msg_parser.extract_stop_message(msg)) + return -1; + id = msg.id; + LOG_INFO << "---------------Stop signal received--------------"; + LOG_INFO << "---------------Pubkey: " << msg.pubkey << ", ContractId: " << msg.contract_id << "--------------"; + } + else + { + LOG_ERROR << "Received invalid message type."; + return -1; + } + + std::string res; + msg_parser.create_response(res, type, "Acknowledgment for message " + id); + send(res); + return 0; + } + + /** + * Adds the given message to the outbound message queue. + * @param message Message to be added to the outbound queue. + * @return 0 on successful addition and -1 if the session is already closed. + */ + int comm_session::send(std::string_view message) + { + if (state == SESSION_STATE::CLOSED) + return -1; + + // Passing the ownership of message to the queue. + out_msg_queue.enqueue(std::string(message)); + + return 0; + } + + /** + * Mark the session as needing to close. + * The session will be properly "closed" by comm_handler. + */ + void comm_session::mark_for_closure() + { + if (state == SESSION_STATE::CLOSED) + return; + + state = SESSION_STATE::MUST_CLOSE; + } + /** * Close the connection and wrap up any session processing threads. - * This will be only called by the global comm_server thread. + * This will be only called by the global comm_handler. */ void comm_session::close() { @@ -74,6 +256,10 @@ namespace comm // Destruct the hpws client instance so it will close the sockets and related processes. hpws_client.reset(); + // Wait untill reader/writer threads gracefully stop. + if (writer_thread.joinable()) + writer_thread.join(); + if (reader_thread.joinable()) reader_thread.join(); @@ -81,11 +267,12 @@ namespace comm } /** - * Joins the listner thread. + * Returns printable name for the session based on uniqueid (used for logging). + * @return The display name as a string. */ - void comm_session::wait() + const std::string comm_session::display_name() const { - reader_thread.join(); + return uniqueid; } } // namespace comm \ No newline at end of file diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index ce1ebb8..8d7c7af 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -4,6 +4,7 @@ #include "../pchheader.hpp" #include "../conf.hpp" #include "hpws.hpp" +#include "../msg/msg_parser.hpp" namespace comm { @@ -21,20 +22,31 @@ namespace comm class comm_session { private: - SESSION_STATE state = SESSION_STATE::NONE; std::optional hpws_client; - const std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address. + msg::msg_parser msg_parser; // Message parser. + const std::string uniqueid; // IP address. const std::string host_address; // Connection host address of the remote party. - std::thread reader_thread; // The thread responsible for reading messages from the read fd. + + std::thread reader_thread; // The thread responsible for reading messages from the read fd. + std::thread writer_thread; // The thread responsible for writing messages to the write fd. + moodycamel::ReaderWriterQueue in_msg_queue; // Holds incoming messages waiting to be processed. + moodycamel::ConcurrentQueue out_msg_queue; // Holds outgoing messages waiting to be processed. void reader_loop(); + int handle_message(std::string_view msg); + int process_outbound_message(std::string_view message); + void outbound_msg_queue_processor(); + void mark_for_closure(); public: + SESSION_STATE state = SESSION_STATE::NONE; comm_session( std::string_view host_address, hpws::client &&hpws_client); int init(); + int send(std::string_view message); + int process_inbound_msg_queue(); void close(); - void wait(); + const std::string display_name() const; }; } // namespace comm diff --git a/src/conf.cpp b/src/conf.cpp index 60b0559..bba8bdb 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -116,7 +116,9 @@ namespace conf { if (!util::is_file_exists(path) && !util::is_dir_exists(path)) { - if (path == ctx.hpws_exe_path) + if (path == ctx.config_file) + std::cerr << path << " does not exist. Initialize with command.\n"; + else if (path == ctx.hpws_exe_path) std::cerr << path << " binary does not exist.\n"; else std::cerr << path << " does not exist.\n"; @@ -187,12 +189,16 @@ namespace conf cfg.server.ip_port.host_address = server["host"].as(); cfg.server.ip_port.port = server["port"].as(); - // Push the peer address and the port to peers set if (cfg.server.ip_port.host_address.empty()) { std::cerr << "Configured server host_address is empty.\n"; return -1; } + else if (cfg.server.ip_port.port <= 0) + { + std::cerr << "Configured server port invalid.\n"; + return -1; + } } catch (const std::exception &e) { diff --git a/src/main.cpp b/src/main.cpp index 077971f..a9a549d 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -55,6 +55,7 @@ void sig_exit_handler(int signum) void segfault_handler(int signum) { + LOG_ERROR << boost::stacktrace::stacktrace(); exit(SIGABRT); } @@ -84,6 +85,8 @@ void std_terminate() noexcept LOG_ERROR << "std error: Terminated due to unknown reason"; } + LOG_ERROR << boost::stacktrace::stacktrace(); + exit(1); } diff --git a/src/msg/json/msg_json.cpp b/src/msg/json/msg_json.cpp new file mode 100644 index 0000000..d832c41 --- /dev/null +++ b/src/msg/json/msg_json.cpp @@ -0,0 +1,252 @@ +#include "msg_json.hpp" + +namespace msg::json +{ + // JSON separators + constexpr const char *SEP_COMMA = "\",\""; + constexpr const char *SEP_COLON = "\":\""; + constexpr const char *SEP_COMMA_NOQUOTE = ",\""; + constexpr const char *SEP_COLON_NOQUOTE = "\":"; + constexpr const char *DOUBLE_QUOTE = "\""; + + /** + * Parses a json message sent by the message board. + * @param d Jsoncons document to which the parsed json should be loaded. + * @param message The message to parse. + * Accepted message format: + * { + * 'type': '' + * ... + * } + * @return 0 on successful parsing. -1 for failure. + */ + int parse_message(jsoncons::json &d, std::string_view message) + { + try + { + d = jsoncons::json::parse(message, jsoncons::strict_json_parsing()); + } + catch (const std::exception &e) + { + LOG_ERROR << "JSON message parsing failed. " << e.what(); + return -1; + } + + // Check existence of msg type field. + if (!d.contains(msg::FLD_TYPE) || !d[msg::FLD_TYPE].is()) + { + LOG_ERROR << "JSON message 'type' missing or invalid."; + return -1; + } + + return 0; + } + + /** + * Extracts the message 'type' value from the json document. + */ + int extract_type(std::string &extracted_type, const jsoncons::json &d) + { + extracted_type = d[msg::FLD_TYPE].as(); + return 0; + } + + /** + * Extracts type, id and pubkey in the msg. + * @param type Type in the message. + * @param id id in the message. + * @param pubkey Pubkey in the message. + * @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * ... + * "type": "", + * "id": "", + * "owner_pubkey": "", + * ... + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d) + { + if (extract_type(type, d) == -1) + return -1; + + if (!d.contains(msg::FLD_ID)) + { + LOG_ERROR << "Field id is missing."; + return -1; + } + + if (!d[msg::FLD_ID].is()) + { + LOG_ERROR << "Invalid id value."; + return -1; + } + + if (!d.contains(msg::FLD_PUBKEY)) + { + LOG_ERROR << "Field owner_pubkey is missing."; + return -1; + } + + if (!d[msg::FLD_PUBKEY].is()) + { + LOG_ERROR << "Invalid owner_pubkey value."; + return -1; + } + + id = d[msg::FLD_ID].as(); + pubkey = d[msg::FLD_PUBKEY].as(); + return 0; + } + + /** + * Extracts create message from msg. + * @param msg Populated msg object. + * @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * "type": "create", + * "owner_pubkey": "" + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_create_message(create_msg &msg, const jsoncons::json &d) + { + if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + return -1; + return 0; + } + + /** + * Extracts destroy message from msg. + * @param msg Populated msg object. + * @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * "type": "destroy", + * "owner_pubkey": "", + * "contract_id": "", + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d) + { + if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + return -1; + + if (!d.contains(msg::FLD_CONTRACT_ID)) + { + LOG_ERROR << "Field contract_id is missing."; + return -1; + } + + if (!d[msg::FLD_CONTRACT_ID].is()) + { + LOG_ERROR << "Invalid contract_id value."; + return -1; + } + + msg.contract_id = d[msg::FLD_CONTRACT_ID].as(); + return 0; + + return 0; + } + + /** + * Extracts start message from msg. + * @param msg Populated msg object. + * @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * "type": "start", + * "owner_pubkey": "", + * "contract_id": "", + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_start_message(start_msg &msg, const jsoncons::json &d) + { + if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + return -1; + + if (!d.contains(msg::FLD_CONTRACT_ID)) + { + LOG_ERROR << "Field contract_id is missing."; + return -1; + } + + if (!d[msg::FLD_CONTRACT_ID].is()) + { + LOG_ERROR << "Invalid contract_id value."; + return -1; + } + + msg.contract_id = d[msg::FLD_CONTRACT_ID].as(); + return 0; + + return 0; + } + + /** + * Extracts stop message from msg. + * @param msg Populated msg object. + * @param d The json document holding the read request message. + * Accepted signed input container format: + * { + * "type": "stop", + * "owner_pubkey": "", + * "contract_id": "", + * } + * @return 0 on successful extraction. -1 for failure. + */ + int extract_stop_message(stop_msg &msg, const jsoncons::json &d) + { + if (extract_commons(msg.type, msg.id, msg.pubkey, d) == -1) + return -1; + + if (!d.contains(msg::FLD_CONTRACT_ID)) + { + LOG_ERROR << "Field contract_id is missing."; + return -1; + } + + if (!d[msg::FLD_CONTRACT_ID].is()) + { + LOG_ERROR << "Invalid contract_id value."; + return -1; + } + + msg.contract_id = d[msg::FLD_CONTRACT_ID].as(); + return 0; + + return 0; + } + + /** + * Constructs a response json. + * @param msg Buffer to construct the generated json message string into. + * Message format: + * { + * 'type': '', + * "content": "" + * } + * @param response_type Type of the response. + * @param content Content inside the response. + */ + void create_response(std::string &msg, std::string_view response_type, std::string_view content) + { + msg.reserve(1024); + msg += "{\""; + msg += msg::FLD_TYPE; + msg += SEP_COLON; + msg += response_type; + msg += SEP_COMMA; + msg += msg::FLD_CONTENT; + msg += SEP_COLON; + msg += content; + msg += "\"}"; + } + +} // namespace msg::json \ No newline at end of file diff --git a/src/msg/json/msg_json.hpp b/src/msg/json/msg_json.hpp new file mode 100644 index 0000000..449f4a1 --- /dev/null +++ b/src/msg/json/msg_json.hpp @@ -0,0 +1,30 @@ +#ifndef _HP_MSG_MSG_JSON_ +#define _HP_MSG_MSG_JSON_ + +#include "../../pchheader.hpp" +#include "../msg_common.hpp" + +/** + * Parser helpers for json messages. + */ +namespace msg::json +{ + int parse_message(jsoncons::json &d, std::string_view message); + + int extract_type(std::string &extracted_type, const jsoncons::json &d); + + int extract_commons(std::string &type, std::string &id, std::string &pubkey, const jsoncons::json &d); + + int extract_create_message(create_msg &msg, const jsoncons::json &d); + + int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d); + + int extract_start_message(start_msg &msg, const jsoncons::json &d); + + int extract_stop_message(stop_msg &msg, const jsoncons::json &d); + + void create_response(std::string &msg, std::string_view response_type, std::string_view content); + +} // namespace msg::json + +#endif \ No newline at end of file diff --git a/src/msg/msg_common.hpp b/src/msg/msg_common.hpp new file mode 100644 index 0000000..3e21569 --- /dev/null +++ b/src/msg/msg_common.hpp @@ -0,0 +1,55 @@ +#ifndef _HP_MSG_MSG_COMMON_ +#define _HP_MSG_MSG_COMMON_ + +#include "../pchheader.hpp" + +namespace msg +{ + struct create_msg + { + std::string id; + std::string type; + std::string pubkey; + }; + + struct destroy_msg + { + std::string id; + std::string type; + std::string pubkey; + std::string contract_id; + }; + + struct start_msg + { + std::string id; + std::string type; + std::string pubkey; + std::string contract_id; + }; + + struct stop_msg + { + std::string id; + std::string type; + std::string pubkey; + std::string contract_id; + }; + + // Message field names + constexpr const char *FLD_TYPE = "type"; + constexpr const char *FLD_CONTENT = "content"; + constexpr const char *FLD_PUBKEY = "owner_pubkey"; + constexpr const char *FLD_CONTRACT_ID = "contract_id"; + constexpr const char *FLD_ID = "id"; + + // Message types + constexpr const char *MSGTYPE_INIT = "init"; + constexpr const char *MSGTYPE_CREATE = "create"; + constexpr const char *MSGTYPE_DESTROY = "destroy"; + constexpr const char *MSGTYPE_START = "start"; + constexpr const char *MSGTYPE_STOP = "stop"; + +} // namespace msg + +#endif \ No newline at end of file diff --git a/src/msg/msg_parser.cpp b/src/msg/msg_parser.cpp new file mode 100644 index 0000000..da75cc4 --- /dev/null +++ b/src/msg/msg_parser.cpp @@ -0,0 +1,41 @@ +#include "msg_parser.hpp" +#include "json/msg_json.hpp" + +namespace msg +{ + int msg_parser::parse(std::string_view message) + { + return json::parse_message(jdoc, message); + } + + int msg_parser::extract_type(std::string &extracted_type) const + { + return json::extract_type(extracted_type, jdoc); + } + + int msg_parser::extract_create_message(create_msg &msg) const + { + return json::extract_create_message(msg, jdoc); + } + + int msg_parser::extract_destroy_message(destroy_msg &msg) const + { + return json::extract_destroy_message(msg, jdoc); + } + + int msg_parser::extract_start_message(start_msg &msg) const + { + return json::extract_start_message(msg, jdoc); + } + + int msg_parser::extract_stop_message(stop_msg &msg) const + { + return json::extract_stop_message(msg, jdoc); + } + + void msg_parser::create_response(std::string &msg, std::string_view response_type, std::string_view content) const + { + json::create_response(msg, response_type, content); + } + +} // namespace msg \ No newline at end of file diff --git a/src/msg/msg_parser.hpp b/src/msg/msg_parser.hpp new file mode 100644 index 0000000..d9fb710 --- /dev/null +++ b/src/msg/msg_parser.hpp @@ -0,0 +1,25 @@ +#ifndef _SA_MSG_MSG_PARSER_ +#define _SA_MSG_MSG_PARSER_ + +#include "../pchheader.hpp" +#include "msg_common.hpp" + +namespace msg +{ + class msg_parser + { + jsoncons::json jdoc; + + public: + int parse(std::string_view message); + int extract_type(std::string &extracted_type) const; + int extract_create_message(create_msg &msg) const; + int extract_destroy_message(destroy_msg &msg) const; + int extract_start_message(start_msg &msg) const; + int extract_stop_message(stop_msg &msg) const; + void create_response(std::string &msg, std::string_view response_type, std::string_view content) const; + }; + +} // namespace msg + +#endif \ No newline at end of file diff --git a/src/pchheader.hpp b/src/pchheader.hpp index b877602..ac1c9cc 100644 --- a/src/pchheader.hpp +++ b/src/pchheader.hpp @@ -2,7 +2,9 @@ #define _SA_PCHHEADER_ #include +#include #include +#include #include #include #include @@ -19,5 +21,6 @@ #include #include #include +#include #endif \ No newline at end of file diff --git a/src/util/util.cpp b/src/util/util.cpp index cf78346..f89a4cb 100644 --- a/src/util/util.cpp +++ b/src/util/util.cpp @@ -128,4 +128,12 @@ namespace util pthread_sigmask(SIG_BLOCK, &mask, NULL); } + /** + * Sleeps the current thread for specified no. of milliseconds. + */ + void sleep(const uint64_t milliseconds) + { + std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds)); + } + } // namespace util diff --git a/src/util/util.hpp b/src/util/util.hpp index 6f971af..8aba668 100644 --- a/src/util/util.hpp +++ b/src/util/util.hpp @@ -22,6 +22,8 @@ namespace util void mask_signal(); + void sleep(const uint64_t milliseconds); + } // namespace util #endif