From 4820f9660e79556461db79f272e5e37326b36f68 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Mon, 26 Jul 2021 12:14:41 +0530 Subject: [PATCH] Support additional hp config. (#39) --- dependencies/hp.cfg | 2 +- examples/message-board/message-board.js | 202 +++++++++++-- sashi-cli/cli-manager.cpp | 2 +- sashi-cli/main.cpp | 2 +- src/comm/comm_handler.cpp | 83 +++--- src/comm/comm_handler.hpp | 4 +- src/hp_manager.cpp | 180 +++++++++--- src/hp_manager.hpp | 2 +- src/msg/json/msg_json.cpp | 370 ++++++++++++++++-------- src/msg/json/msg_json.hpp | 4 +- src/msg/msg_common.hpp | 138 +++++++-- src/msg/msg_parser.cpp | 8 +- src/msg/msg_parser.hpp | 4 +- 13 files changed, 735 insertions(+), 266 deletions(-) diff --git a/dependencies/hp.cfg b/dependencies/hp.cfg index dfd68fa..456f345 100644 --- a/dependencies/hp.cfg +++ b/dependencies/hp.cfg @@ -71,7 +71,7 @@ "max_bad_msgs_per_min": 0, "max_connections": 0, "max_in_connections_per_host": 0, - "concurrent_read_reqeuests": 4 + "concurrent_read_requests": 4 }, "hpfs": { "external": true, diff --git a/examples/message-board/message-board.js b/examples/message-board/message-board.js index 787aa5e..b84a513 100644 --- a/examples/message-board/message-board.js +++ b/examples/message-board/message-board.js @@ -69,7 +69,6 @@ const interatctiveInterface = async () => { } sendToAgent(JSON.stringify({ - id: uuidv4(), type: 'create', owner_pubkey: 'ed5cb83404120ac759609819591ef839b7d222c84f1f08b3012f490586159d2b50', contract_id: contractId, @@ -78,43 +77,189 @@ const interatctiveInterface = async () => { break; case 'initiate': containerName = await askForInput('Container Name'); - role = await askForInput('Role: validator(default) | observer', "validator"); - if (role != 'validator' && role != 'observer') { - console.error('Invalid role. (Should be "validator" or "observer").') - break; + let config = {}; + modifyNode = await askForInput('Modify node section? [y/N]', 'n'); + if (modifyNode === 'y' || modifyNode === 'Y') { + role = await askForInput('Role: validator | observer(optional)'); + if (role && role != 'validator' && role != 'observer') { + console.error('Invalid role. (Should be "validator" or "observer").') + break; + } + history = await askForInput('History <{full|custom},max_primary_shards,max_raw_shards> (custom,1,1)', "custom,1,1"); + split = []; + if (history) { + split = history.split(','); + if (split.length == 0 || split.length !== 3) { + console.error('Invalid history.') + break; + } + else if (split[0] != 'full' && split[0] != 'custom') { + console.error('Invalid history. (Should be "full" or "custom").') + break; + } + } + config.node = { + role: role, + history: history ? split[0] : undefined, + history_config: history ? { + max_primary_shards: parseInt(split[1]), + max_raw_shards: parseInt(split[2]) + } : undefined + }; } - history = await askForInput('History <{full|custom},max_primary_shards,max_raw_shards> (custom,1,1)', "custom,1,1"); - split = []; - if (history) { - split = history.split(','); - if (split.length == 0 || split.length == 0 > 3) { - console.error('Invalid history.') - break; + modifyContract = await askForInput('Modify contract section? [y/N]', 'n'); + if (modifyContract === 'y' || modifyContract === 'Y') { + unl = await askForInput('Comma seperated UNL ,,...'); + execute = await askForInput('Execute contract? (optional)'); + log = await askForInput('log <{true|false},max_mbytes_per_file,max_file_count> (optional)'); + if (log) { + split = log.split(','); + if (split.length == 0 || split.length !== 3) { + console.error('Invalid log config.') + break; + } + else if (split[0] != 'true' && split[0] != 'false') { + console.error('Log enable tag should be either true or false') + break; + } } - else if (split[0] != 'full' && split[0] != 'custom') { - console.error('Invalid history. (Should be "full" or "custom").') - break; + config.contract = { + execute: execute ? (execute === 'true' ? true : false) : undefined, + log: log ? { + enable: split[0] === 'true' ? true : false, + max_mbytes_per_file: parseInt(split[1]), + max_file_count: parseInt(split[2]) + } : undefined, + unl: unl ? unl.split(',') : undefined } } - peers = await askForInput('Comma seperated Peer List ,,...'); - unl = await askForInput('Comma seperated UNL ,,...'); + modifyMesh = await askForInput('Modify mesh section? [y/N]', 'n'); + if (modifyMesh === 'y' || modifyMesh === 'Y') { + idleTimeout = await askForInput('Idle timeout?(optional)'); + peers = await askForInput('Comma seperated Peer List ,,...(optional)'); + msgForwarding = await askForInput('Message forwarding [true|false]?(optional)'); + set01 = await askForInput('Comma seperated max_connections, max_known_connections and max_in_connections_per_host?(optional)'); + if (set01) { + split01 = set01.split(','); + if (split01.length == 0 || split01.length !== 3) { + console.error('Make sure to add all three. Eg: 1,1,1'); + break; + } + } + + set02 = await askForInput('Comma seperated max_bytes_per_msg, max_bytes_per_min and max_bad_msgs_per_min?(optional)'); + if (set02) { + split02 = set02.split(','); + if (split02.length == 0 || split02.length !== 3) { + console.error('Make sure to add all three. Eg: 1,1,1'); + break; + } + } + + set03 = await askForInput('Comma seperated max_bad_msgsigs_per_min and max_dup_msgs_per_min?(optional)'); + if (set03) { + split03 = set03.split(','); + if (split03.length == 0 || split03.length !== 2) { + console.error('Make sure to add all two. Eg: 1,1'); + break; + } + } + + peerDiscovery = await askForInput('Peer discovery <{true|false}, Interval>?(optional)'); + if (peerDiscovery) { + peerDiscovery = peerDiscovery.split(','); + if (peerDiscovery.length == 0 || peerDiscovery.length !== 2) { + console.error('Make sure to add all two. Eg: true,10000'); + break; + } + } + + config.mesh = { + idle_timeout: idleTimeout ? parseInt(idleTimeout) : undefined, + known_peers: peers ? peers.split(',') : undefined, + msg_forwarding: msgForwarding ? (msgForwarding === 'true' ? true : false) : undefined, + max_connections: set01 ? parseInt(split01[0]) : undefined, + max_known_connections: set01 ? parseInt(split01[1]) : undefined, + max_in_connections_per_host: set01 ? parseInt(split01[2]) : undefined, + max_bytes_per_msg: set02 ? parseInt(split02[0]) : undefined, + max_bytes_per_min: set02 ? parseInt(split02[1]) : undefined, + max_bad_msgs_per_min: set02 ? parseInt(split02[2]) : undefined, + max_bad_msgsigs_per_min: set03 ? parseInt(split03[0]) : undefined, + max_dup_msgs_per_min: set03 ? parseInt(split03[1]) : undefined, + peer_discovery: peerDiscovery ? { + enabled: peerDiscovery[0] === 'true' ? true : false, + interval: parseInt(peerDiscovery[1]) + } : undefined + }; + + + } + modifyUser = await askForInput('Modify user section? [y/N]', 'n'); + if (modifyUser === 'y' || modifyUser === 'Y') { + idleTimeout = await askForInput('Idle timeout?(optional)'); + set01 = await askForInput('Comma seperated max_bytes_per_msg, max_bytes_per_min and max_bad_msgs_per_min?(optional)'); + if (set01) { + split01 = set01.split(','); + if (split01.length == 0 || split01.length !== 3) { + console.error('Make sure to add all three. Eg: 1,1,1'); + break; + } + } + set02 = await askForInput('Comma seperated max_connections, max_in_connections_per_host and concurrent_read_reqeuests?(optional)'); + if (set02) { + split02 = set02.split(','); + if (split02.length == 0 || split02.length !== 3) { + console.error('Make sure to add all three. Eg: 1,1,1'); + break; + } + } + config.user = { + idle_timeout: idleTimeout ? parseInt(idleTimeout) : undefined, + max_bytes_per_msg: set01 ? parseInt(split01[0]) : undefined, + max_bytes_per_min: set01 ? parseInt(split01[1]) : undefined, + max_bad_msgs_per_min: set01 ? parseInt(split01[2]) : undefined, + max_connections: set02 ? parseInt(split02[0]) : undefined, + max_in_connections_per_host: set02 ? parseInt(split02[1]) : undefined, + concurrent_read_requests: set02 ? parseInt(split02[2]) : undefined + }; + } + modifyHpfs = await askForInput('Modify hpfs section? [y/N]', 'n'); + if (modifyHpfs === 'y' || modifyHpfs === 'Y') { + logLevel = await askForInput('Hpfs log level?(optional)'); + config.hpfs = logLevel ? { + log_level: logLevel ? logLevel : undefined + } : undefined; + } + + modifyLogs = await askForInput('Modify log section? [y/N]', 'n'); + if (modifyLogs === 'y' || modifyLogs === 'Y') { + logLevel = await askForInput('HP log level?(optional)'); + set01 = await askForInput('Comma seperated max_mbytes_per_file and max_file_count?(optional)'); + if (set01) { + split01 = set01.split(','); + if (split01.length == 0 || split01.length !== 2) { + console.error('Make sure to add all two. Eg: 1,1'); + break; + } + } + loggers = await askForInput('Comma seperated loggers?(optional)'); + config.log = { + log_level: logLevel ? logLevel : undefined, + max_mbytes_per_file: set01 ? parseInt(split01[0]) : undefined, + max_file_count: set01 ? parseInt(split01[1]) : undefined, + loggers: loggers ? loggers.split(',') : undefined + }; + } sendToAgent(JSON.stringify({ - id: uuidv4(), type: 'initiate', container_name: containerName, - peers: peers ? peers.split(',') : [], - unl: unl ? unl.split(',') : [], - role: role, - history: split.length > 0 ? split[0] : '', - max_primary_shards: split.length > 1 ? parseInt(split[1]) : '', - max_raw_shards: split.length > 2 ? parseInt(split[2]) : '' + config: config })); break; case 'destroy': containerName = await askForInput('Container Name'); sendToAgent(JSON.stringify({ - id: uuidv4(), type: 'destroy', container_name: containerName })) @@ -122,7 +267,6 @@ const interatctiveInterface = async () => { case 'start': containerName = await askForInput('Container Name'); sendToAgent(JSON.stringify({ - id: uuidv4(), type: 'start', container_name: containerName })) @@ -130,7 +274,6 @@ const interatctiveInterface = async () => { case 'stop': containerName = await askForInput('Container Name'); sendToAgent(JSON.stringify({ - id: uuidv4(), type: 'stop', container_name: containerName })) @@ -196,7 +339,6 @@ const restApi = async () => { checkAgentStatus(res); }); app.post("/create", (req, res) => { - const id = uuidv4(); const msg = { id, type: 'create', @@ -207,7 +349,6 @@ const restApi = async () => { sendToAgent(JSON.stringify(msg), res); }); app.post("/initiate", (req, res) => { - const id = uuidv4(); const msg = { id, type: 'initiate', @@ -222,7 +363,6 @@ const restApi = async () => { sendToAgent(JSON.stringify(msg), res); }); app.post("/start", (req, res) => { - const id = uuidv4(); const msg = { id, type: 'start', @@ -231,7 +371,6 @@ const restApi = async () => { sendToAgent(JSON.stringify(msg), res); }); app.post("/stop", (req, res) => { - const id = uuidv4(); const msg = { id, type: 'stop', @@ -240,7 +379,6 @@ const restApi = async () => { sendToAgent(JSON.stringify(msg), res); }); app.post("/destroy", (req, res) => { - const id = uuidv4(); const msg = { id, type: 'destroy', diff --git a/sashi-cli/cli-manager.cpp b/sashi-cli/cli-manager.cpp index 0a1d690..7a1ad49 100644 --- a/sashi-cli/cli-manager.cpp +++ b/sashi-cli/cli-manager.cpp @@ -5,7 +5,7 @@ namespace cli { constexpr const char *SOCKET_NAME = "sa.sock"; // Name of the sashimono socket. constexpr const char *DATA_DIR = "/etc/sashimono"; // Sashimono data directory. - constexpr const int BUFFER_SIZE = 1024; // Max read buffer size. + constexpr const int BUFFER_SIZE = 4096; // Max read buffer size. cli_context ctx; diff --git a/sashi-cli/main.cpp b/sashi-cli/main.cpp index fe3e466..1ff7572 100644 --- a/sashi-cli/main.cpp +++ b/sashi-cli/main.cpp @@ -10,7 +10,7 @@ std::cerr << "Usage:\n"; \ std::cerr << "sashi status\n"; \ std::cerr << "sashi json \n"; \ - std::cerr << "Example: sashi json '{\"container_name\":\"\", ...}'\n"; \ + std::cerr << "Example: sashi json '{\"type\":\"\", ...}'\n"; \ return -1; \ } diff --git a/src/comm/comm_handler.cpp b/src/comm/comm_handler.cpp index 2641639..086bac2 100644 --- a/src/comm/comm_handler.cpp +++ b/src/comm/comm_handler.cpp @@ -2,12 +2,12 @@ #include "../util/util.hpp" #include "../conf.hpp" -#define __HANDLE_RESPONSE(id, type, content, ret) \ - { \ - std::string res; \ - msg_parser.build_response(res, type, id, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ - send(res); \ - return ret; \ +#define __HANDLE_RESPONSE(type, content, ret) \ + { \ + std::string res; \ + msg_parser.build_response(res, type, content, type == msg::MSGTYPE_CREATE_RES && ret == 0); \ + send(res); \ + return ret; \ } namespace comm @@ -15,9 +15,10 @@ namespace comm constexpr uint32_t DEFAULT_MAX_MSG_SIZE = 1 * 1024 * 1024; // 1MB; bool init_success; constexpr const int POLL_TIMEOUT = 10; - constexpr const int BUFFER_SIZE = 1024; + constexpr const int BUFFER_SIZE = 4096; constexpr const int EMPTY_READ_TRESHOLD = 5; msg::msg_parser msg_parser; + std::vector read_buffer(BUFFER_SIZE, 0); // Global buffer storing the current message. comm_ctx ctx; @@ -113,12 +114,11 @@ namespace comm // Process queued messaged only if there's a socket connection. if (ctx.data_socket != -1) { - std::string message; - const int ret = read_socket(message); - if (ret == -1) + const int message_size = read_socket(); + if (message_size == -1) disconnect(); - else if (ret > 0) - handle_message(message); + else if (message_size > 0) + handle_message(message_size); else { empty_read_count++; @@ -164,75 +164,82 @@ namespace comm /** * Handles the received message. - * @param msg Received message. + * @param message_size Message size. * @return 0 on success -1 on error. */ - int handle_message(std::string_view msg) + int handle_message(const int message_size) { - std::string id, type; - if (msg_parser.parse(msg) == -1 || msg_parser.extract_type_and_id(type, id) == -1) - __HANDLE_RESPONSE("", "error", "format_error", -1); + std::string_view msg((char *)read_buffer.data(), message_size); + std::string type; + if (msg_parser.parse(msg) == -1 || msg_parser.extract_type(type) == -1) + { + read_buffer.clear(); + __HANDLE_RESPONSE("error", "format_error", -1); + } + + // Clear the buffer after the message is parsed. + read_buffer.clear(); if (type == msg::MSGTYPE_CREATE) { msg::create_msg msg; if (msg_parser.extract_create_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "format_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_CREATE_RES, "format_error", -1); hp::instance_info info; if (hp::create_new_instance(info, msg.pubkey, msg.contract_id, msg.image) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, "create_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_CREATE_RES, "create_error", -1); std::string create_res; msg_parser.build_create_response(create_res, info); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_CREATE_RES, create_res, 0); + __HANDLE_RESPONSE(msg::MSGTYPE_CREATE_RES, create_res, 0); } else if (type == msg::MSGTYPE_INITIATE) { msg::initiate_msg msg; if (msg_parser.extract_initiate_message(msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "format_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_INITIATE_RES, "format_error", -1); if (hp::initiate_instance(msg.container_name, msg) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "init_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_INITIATE_RES, "init_error", -1); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_INITIATE_RES, "initiated", 0); + __HANDLE_RESPONSE(msg::MSGTYPE_INITIATE_RES, "initiated", 0); } else if (type == msg::MSGTYPE_DESTROY) { msg::destroy_msg msg; if (msg_parser.extract_destroy_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "format_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_DESTROY_RES, "format_error", -1); if (hp::destroy_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_DESTROY_RES, "destroy_error", -1); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_DESTROY_RES, "destroyed", 0); + __HANDLE_RESPONSE(msg::MSGTYPE_DESTROY_RES, "destroyed", 0); } else if (type == msg::MSGTYPE_START) { msg::start_msg msg; if (msg_parser.extract_start_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "format_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_START_RES, "format_error", -1); if (hp::start_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "start_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_START_RES, "start_error", -1); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_START_RES, "started", 0); + __HANDLE_RESPONSE(msg::MSGTYPE_START_RES, "started", 0); } else if (type == msg::MSGTYPE_STOP) { msg::stop_msg msg; if (msg_parser.extract_stop_message(msg)) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "format_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_STOP_RES, "format_error", -1); if (hp::stop_container(msg.container_name) == -1) - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stop_error", -1); + __HANDLE_RESPONSE(msg::MSGTYPE_STOP_RES, "stop_error", -1); - __HANDLE_RESPONSE(msg.id, msg::MSGTYPE_STOP_RES, "stopped", 0); + __HANDLE_RESPONSE(msg::MSGTYPE_STOP_RES, "stopped", 0); } else - __HANDLE_RESPONSE(id, "error", "type_error", -1); + __HANDLE_RESPONSE("error", "type_error", -1); return 0; } @@ -254,21 +261,17 @@ namespace comm } /** - * Reads the message from the connected client. - * @param message Placeholder to store the message. + * Reads the message from the connected client to the global buffer. * @return Number of bytes read on success -1 on error. **/ - int read_socket(std::string &message) + int read_socket() { - // Resize the message to max length and resize to original read length after reading. - message.resize(BUFFER_SIZE); - const int ret = read(ctx.data_socket, message.data(), message.length()); + const int ret = read(ctx.data_socket, read_buffer.data(), BUFFER_SIZE); if (ret == -1) { LOG_ERROR << errno << ": Error receiving data."; return -1; } - message.resize(ret); return ret; } } // namespace comm diff --git a/src/comm/comm_handler.hpp b/src/comm/comm_handler.hpp index 6ec2243..0662964 100644 --- a/src/comm/comm_handler.hpp +++ b/src/comm/comm_handler.hpp @@ -26,13 +26,13 @@ namespace comm void comm_handler_loop(); - int handle_message(std::string_view msg); + int handle_message(const int message_size); int send(std::string_view message); void wait(); - int read_socket(std::string &message); + int read_socket(); } // namespace comm diff --git a/src/hp_manager.cpp b/src/hp_manager.cpp index ed734a5..a39903f 100644 --- a/src/hp_manager.cpp +++ b/src/hp_manager.cpp @@ -271,7 +271,7 @@ namespace hp std::string hpfs_log_level; bool is_full_history; if (util::read_json_file(config_fd, d) == -1 || - write_json_values(d, config_msg) == -1 || + write_json_values(d, config_msg.config) == -1 || read_json_values(d, hpfs_log_level, is_full_history) == -1 || util::write_json_file(config_fd, d) == -1 || hpfs::update_service_conf(info.username, hpfs_log_level, is_full_history) == -1 || @@ -704,59 +704,165 @@ namespace hp /** * Write contract config values (only updated if provided config values are not empty) into the json file. * @param d Json file to be populated. - * @param config_msg Config values to be updated. + * @param config Config values to be updated. * @return 0 on success. -1 on failure. */ - int write_json_values(jsoncons::ojson &d, const msg::initiate_msg &config_msg) + int write_json_values(jsoncons::ojson &d, const msg::config_struct &config) { - if (!config_msg.unl.empty()) + // Contract { - jsoncons::ojson unl(jsoncons::json_array_arg); - for (auto &pubkey : config_msg.unl) - unl.push_back(util::to_hex(pubkey)); - d["contract"]["unl"] = unl; - } - - if (!config_msg.peers.empty()) - { - jsoncons::ojson peers(jsoncons::json_array_arg); - for (auto &peer : config_msg.peers) - peers.push_back(peer.host_address + ":" + std::to_string(peer.port)); - d["mesh"]["known_peers"] = peers; - } - - if (!config_msg.role.empty()) - { - if (config_msg.role != "observer" && config_msg.role != "validator") + if (!config.contract.unl.empty()) { - LOG_ERROR << "Invalid role value observer|validator"; + jsoncons::ojson unl(jsoncons::json_array_arg); + for (auto &pubkey : config.contract.unl) + unl.push_back(util::to_hex(pubkey)); + d["contract"]["unl"] = unl; + } + + if (config.contract.execute.has_value()) + d["contract"]["execute"] = config.contract.execute.value(); + + if (config.contract.log.enable.has_value()) + d["contract"]["log"]["enable"] = config.contract.log.enable.value(); + + if (config.contract.log.max_mbytes_per_file.has_value()) + d["contract"]["log"]["max_mbytes_per_file"] = config.contract.log.max_mbytes_per_file.value(); + + if (config.contract.log.max_file_count.has_value()) + d["contract"]["log"]["max_file_count"] = config.contract.log.max_file_count.value(); + } + + // Node + { + if (!config.node.role.empty()) + { + if (config.node.role != "observer" && config.node.role != "validator") + { + LOG_ERROR << "Invalid role value observer|validator"; + return -1; + } + d["node"]["role"] = config.node.role; + } + + if (!config.node.history.empty()) + { + if (config.node.history != "full" && config.node.history != "custom") + { + LOG_ERROR << "Invalid history value full|custom"; + return -1; + } + d["node"]["history"] = config.node.history; + } + + if (config.node.history_config.max_primary_shards.has_value()) + d["node"]["history_config"]["max_primary_shards"] = config.node.history_config.max_primary_shards.value(); + + if (config.node.history_config.max_raw_shards.has_value()) + d["node"]["history_config"]["max_raw_shards"] = config.node.history_config.max_raw_shards.value(); + + if (d["node"]["history"].as() == "custom" && d["node"]["history_config"]["max_primary_shards"].as() == 0) + { + LOG_ERROR << "'max_primary_shards' cannot be zero in history=custom mode."; return -1; } - d["node"]["role"] = config_msg.role; } - if (!config_msg.history.empty()) + // Mesh { - if (config_msg.history != "full" && config_msg.history != "custom") + if (config.mesh.idle_timeout.has_value()) + d["mesh"]["idle_timeout"] = config.mesh.idle_timeout.value(); + + if (!config.mesh.known_peers.empty()) { - LOG_ERROR << "Invalid history value full|custom"; - return -1; + jsoncons::ojson known_peers(jsoncons::json_array_arg); + for (auto &peer : config.mesh.known_peers) + known_peers.push_back(peer.host_address + ":" + std::to_string(peer.port)); + d["mesh"]["known_peers"] = known_peers; } - d["node"]["history"] = config_msg.history; + + if (config.mesh.msg_forwarding.has_value()) + d["mesh"]["msg_forwarding"] = config.mesh.msg_forwarding.value(); + + if (config.mesh.max_connections.has_value()) + d["mesh"]["max_connections"] = config.mesh.max_connections.value(); + + if (config.mesh.max_known_connections.has_value()) + d["mesh"]["max_known_connections"] = config.mesh.max_known_connections.value(); + + if (config.mesh.max_in_connections_per_host.has_value()) + d["mesh"]["max_in_connections_per_host"] = config.mesh.max_in_connections_per_host.value(); + + if (config.mesh.max_bytes_per_msg.has_value()) + d["mesh"]["max_bytes_per_msg"] = config.mesh.max_bytes_per_msg.value(); + + if (config.mesh.max_bytes_per_min.has_value()) + d["mesh"]["max_bytes_per_min"] = config.mesh.max_bytes_per_min.value(); + + if (config.mesh.max_bad_msgs_per_min.has_value()) + d["mesh"]["max_bad_msgs_per_min"] = config.mesh.max_bad_msgs_per_min.value(); + + if (config.mesh.max_bad_msgsigs_per_min.has_value()) + d["mesh"]["max_bad_msgsigs_per_min"] = config.mesh.max_bad_msgsigs_per_min.value(); + + if (config.mesh.max_dup_msgs_per_min.has_value()) + d["mesh"]["max_dup_msgs_per_min"] = config.mesh.max_dup_msgs_per_min.value(); + + if (config.mesh.peer_discovery.enabled.has_value()) + d["mesh"]["peer_discovery"]["enabled"] = config.mesh.peer_discovery.enabled.value(); + + if (config.mesh.peer_discovery.interval.has_value()) + d["mesh"]["peer_discovery"]["interval"] = config.mesh.peer_discovery.interval.value(); } - if (config_msg.max_primary_shards.has_value()) - d["node"]["history_config"]["max_primary_shards"] = config_msg.max_primary_shards.value(); - - if (config_msg.max_raw_shards.has_value()) - d["node"]["history_config"]["max_raw_shards"] = config_msg.max_raw_shards.value(); - - if (d["node"]["history"].as() == "custom" && d["node"]["history_config"]["max_primary_shards"].as() == 0) + // User { - LOG_ERROR << "'max_primary_shards' cannot be zero in history=custom mode."; - return -1; + if (config.user.idle_timeout.has_value()) + d["user"]["idle_timeout"] = config.user.idle_timeout.value(); + + if (config.user.max_bytes_per_msg.has_value()) + d["user"]["max_bytes_per_msg"] = config.user.max_bytes_per_msg.value(); + + if (config.user.max_bytes_per_min.has_value()) + d["user"]["max_bytes_per_min"] = config.user.max_bytes_per_min.value(); + + if (config.user.max_bad_msgs_per_min.has_value()) + d["user"]["max_bad_msgs_per_min"] = config.user.max_bad_msgs_per_min.value(); + + if (config.user.max_connections.has_value()) + d["user"]["max_connections"] = config.user.max_connections.value(); + + if (config.user.max_in_connections_per_host.has_value()) + d["user"]["max_in_connections_per_host"] = config.user.max_in_connections_per_host.value(); + + if (config.user.concurrent_read_requests.has_value()) + d["user"]["concurrent_read_requests"] = config.user.concurrent_read_requests.value(); } + // Hpfs + { + if (!config.hpfs.log.log_level.empty()) + d["hpfs"]["log"]["log_level"] = config.hpfs.log.log_level; + } + + // Log + { + if (!config.log.log_level.empty()) + d["log"]["log_level"] = config.log.log_level; + + if (config.log.max_mbytes_per_file.has_value()) + d["log"]["max_mbytes_per_file"] = config.log.max_mbytes_per_file.value(); + + if (config.log.max_file_count.has_value()) + d["log"]["max_file_count"] = config.log.max_file_count.value(); + + if (!config.log.loggers.empty()) + { + jsoncons::ojson loggers(jsoncons::json_array_arg); + for (auto &log : config.log.loggers) + loggers.push_back(log); + d["log"]["loggers"] = loggers; + } + } return 0; } diff --git a/src/hp_manager.hpp b/src/hp_manager.hpp index 333691a..62f498e 100644 --- a/src/hp_manager.hpp +++ b/src/hp_manager.hpp @@ -83,7 +83,7 @@ namespace hp int read_json_values(const jsoncons::ojson &d, std::string &hpfs_log_level, bool &is_full_history); - int write_json_values(jsoncons::ojson &d, const msg::initiate_msg &config_msg); + int write_json_values(jsoncons::ojson &d, const msg::config_struct &config); int install_user(int &user_id, std::string &username, const size_t max_cpu_us, const size_t max_mem_kbytes, const size_t storage_kbytes, const std::string container_name); diff --git a/src/msg/json/msg_json.cpp b/src/msg/json/msg_json.cpp index 6775898..0e8d3a0 100644 --- a/src/msg/json/msg_json.cpp +++ b/src/msg/json/msg_json.cpp @@ -44,9 +44,9 @@ namespace msg::json } /** - * Extracts the message 'type' and 'id' values from the json document. + * Extracts the message 'type' values from the json document. */ - int extract_type_and_id(std::string &extracted_type, std::string &extracted_id, const jsoncons::json &d) + int extract_type(std::string &extracted_type, const jsoncons::json &d) { if (!d.contains(msg::FLD_TYPE)) { @@ -61,19 +61,6 @@ namespace msg::json } extracted_type = d[msg::FLD_TYPE].as(); - if (!d.contains(msg::FLD_ID)) - { - LOG_ERROR << "Field id is missing."; - return -1; - } - - if (!d[msg::FLD_ID].is()) - { - LOG_ERROR << "Invalid id value."; - return -1; - } - extracted_id = d[msg::FLD_ID].as(); - return 0; } @@ -92,10 +79,9 @@ namespace msg::json */ int extract_create_message(create_msg &msg, const jsoncons::json &d) { - if (extract_type_and_id(msg.type, msg.id, d) == -1) + if (extract_type(msg.type, d) == -1) return -1; - if (!d.contains(msg::FLD_PUBKEY)) { LOG_ERROR << "Field owner_pubkey is missing."; @@ -145,25 +131,19 @@ namespace msg::json * Accepted signed input container format: * { * "type": "initiate", - * "owner_pubkey": "", * "container_name": "", - * "peers": [<'ip:port' peer list>], - * "unl": [], - * "role": , - * "history": , - * "max_primary_shards": , - * "max_raw_shards": + * "config": {---config overrides----} * } * @return 0 on successful extraction. -1 for failure. */ int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d) { - if (extract_type_and_id(msg.type, msg.id, d) == -1) + if (extract_type(msg.type, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) { - LOG_ERROR << "Field contract_name is missing."; + LOG_ERROR << "Field container_name is missing."; return -1; } @@ -174,119 +154,262 @@ namespace msg::json } msg.container_name = d[msg::FLD_CONTAINER_NAME].as(); - - if (d.contains(msg::FLD_PEERS)) + if (!d.contains(msg::FLD_CONFIG)) { - if (!d[msg::FLD_PEERS].empty() && !d[msg::FLD_PEERS].is_array()) + LOG_ERROR << "Field config is missing."; + return -1; + } + + const jsoncons::json &config = d[msg::FLD_CONFIG]; + + if (config.contains(msg::FLD_MESH)) + { + const jsoncons::json &mesh = config[msg::FLD_MESH]; + if (mesh.contains(msg::FLD_IDLE_TIMEOUT)) + msg.config.mesh.idle_timeout = mesh[msg::FLD_IDLE_TIMEOUT].as(); + + if (mesh.contains(msg::FLD_KNOWN_PEERS)) { - LOG_ERROR << "Invalid peers value."; - return -1; - } - else if (!d[msg::FLD_PEERS].empty() && d[msg::FLD_PEERS].size() > 0) - { - std::vector splitted; - for (auto &val : d[msg::FLD_PEERS].array_range()) + if (!mesh[msg::FLD_KNOWN_PEERS].empty() && !mesh[msg::FLD_KNOWN_PEERS].is_array()) { - if (!val.is()) + LOG_ERROR << "Invalid known_peers value."; + return -1; + } + else if (!mesh[msg::FLD_KNOWN_PEERS].empty() && mesh[msg::FLD_KNOWN_PEERS].size() > 0) + { + std::vector splitted; + for (auto &val : mesh[msg::FLD_KNOWN_PEERS].array_range()) { - LOG_ERROR << "Invalid peer value."; + if (!val.is()) + { + LOG_ERROR << "Invalid peer value."; + return -1; + } + + const std::string peer = val.as(); + util::split_string(splitted, peer, ":"); + if (splitted.size() != 2) + { + LOG_ERROR << "Invalid peer value: " << peer; + return -1; + } + + uint16_t port; + if (util::stoul(splitted.back(), port) == -1) + { + LOG_ERROR << "Invalid peer port value: " << peer; + return -1; + } + + msg.config.mesh.known_peers.emplace(conf::host_ip_port{splitted.front(), port}); + splitted.clear(); + } + } + } + + if (mesh.contains(msg::FLD_MSG_FORWARDING)) + msg.config.mesh.msg_forwarding = mesh[msg::FLD_MSG_FORWARDING].as(); + + if (mesh.contains(msg::FLD_MAX_CONS)) + msg.config.mesh.max_connections = mesh[msg::FLD_MAX_CONS].as(); + + if (mesh.contains(msg::FLD_MAX_KNOWN_CONS)) + msg.config.mesh.max_known_connections = mesh[msg::FLD_MAX_KNOWN_CONS].as(); + + if (mesh.contains(msg::FLD_MAX_IN_CONS_HOST)) + msg.config.mesh.max_in_connections_per_host = mesh[msg::FLD_MAX_IN_CONS_HOST].as(); + + if (mesh.contains(msg::FLD_MAX_BYTES_MSG)) + msg.config.mesh.max_bytes_per_msg = mesh[msg::FLD_MAX_BYTES_MSG].as(); + + if (mesh.contains(msg::FLD_MAX_BYTES_MIN)) + msg.config.mesh.max_bytes_per_min = mesh[msg::FLD_MAX_BYTES_MIN].as(); + + if (mesh.contains(msg::FLD_MAX_BAD_MSG_MIN)) + msg.config.mesh.max_bad_msgs_per_min = mesh[msg::FLD_MAX_BAD_MSG_MIN].as(); + + if (mesh.contains(msg::FLD_MAX_BAD_MSG_SIG_MIN)) + msg.config.mesh.max_bad_msgsigs_per_min = mesh[msg::FLD_MAX_BAD_MSG_SIG_MIN].as(); + + if (mesh.contains(msg::FLD_MAX_DUP_MSG_MIN)) + msg.config.mesh.max_dup_msgs_per_min = mesh[msg::FLD_MAX_DUP_MSG_MIN].as(); + + if (mesh.contains(msg::FLD_PEER_DISCOVERY)) + { + const jsoncons::json &peer_discovery = mesh[msg::FLD_PEER_DISCOVERY]; + + if (peer_discovery.contains(msg::FLD_ENABLED)) + msg.config.mesh.peer_discovery.enabled = peer_discovery[msg::FLD_ENABLED].as(); + + if (peer_discovery.contains(msg::FLD_INTERVAL)) + msg.config.mesh.peer_discovery.interval = peer_discovery[msg::FLD_INTERVAL].as(); + } + } + + if (config.contains(msg::FLD_CONTRACT)) + { + const jsoncons::json &contract = config[msg::FLD_CONTRACT]; + if (contract.contains(msg::FLD_UNL)) + { + if (!contract[msg::FLD_UNL].empty() && !contract[msg::FLD_UNL].is_array()) + { + LOG_ERROR << "Invalid unl value."; + return -1; + } + else if (!contract[msg::FLD_UNL].empty() && contract[msg::FLD_UNL].size() > 0) + { + for (auto &val : contract[msg::FLD_UNL].array_range()) + { + if (!val.is()) + { + LOG_ERROR << "Invalid unl pubkey value."; + return -1; + } + + const std::string unl_pubkey = val.as(); + const std::string unl_pubkey_bin = util::to_bin(unl_pubkey); + if (unl_pubkey_bin.empty()) + { + LOG_ERROR << "Invalid unl pubkey value: " << unl_pubkey; + return -1; + } + + msg.config.contract.unl.emplace(unl_pubkey_bin); + } + } + } + if (contract.contains(msg::FLD_EXECUTE)) + msg.config.contract.execute = contract[msg::FLD_EXECUTE].as(); + + if (contract.contains(msg::FLD_LOG)) + { + const jsoncons::json &log = contract[msg::FLD_LOG]; + if (log.contains(msg::FLD_ENABLE)) + msg.config.contract.log.enable = log[msg::FLD_ENABLE].as(); + + if (log.contains(msg::FLD_MAX_MB_PER_FILE)) + msg.config.contract.log.max_mbytes_per_file = log[msg::FLD_MAX_MB_PER_FILE].as(); + + if (log.contains(msg::FLD_MAX_FILE_COUNT)) + msg.config.contract.log.max_file_count = log[msg::FLD_MAX_FILE_COUNT].as(); + } + } + + if (config.contains(msg::FLD_NODE)) + { + const jsoncons::json &node = config[msg::FLD_NODE]; + if (node.contains(msg::FLD_ROLE)) + { + if (!node[msg::FLD_ROLE].is()) + { + LOG_ERROR << "Invalid role value."; + return -1; + } + + msg.config.node.role = node[msg::FLD_ROLE].as(); + } + + if (node.contains(msg::FLD_HISTORY)) + { + if (!node[msg::FLD_HISTORY].is()) + { + LOG_ERROR << "Invalid history value."; + return -1; + } + + msg.config.node.history = node[msg::FLD_HISTORY].as(); + } + if (node.contains(msg::FLD_HISTORY_CONFIG)) + { + const jsoncons::json &history_config = node[msg::FLD_HISTORY_CONFIG]; + if (history_config.contains(msg::FLD_MAX_P_SHARDS)) + { + if (!history_config[msg::FLD_MAX_P_SHARDS].empty() && !history_config[msg::FLD_MAX_P_SHARDS].is()) + { + LOG_ERROR << "Invalid max_primary_shards value."; return -1; } + else if (!history_config[msg::FLD_MAX_P_SHARDS].empty()) + msg.config.node.history_config.max_primary_shards = history_config[msg::FLD_MAX_P_SHARDS].as(); + } - const std::string peer = val.as(); - util::split_string(splitted, peer, ":"); - if (splitted.size() != 2) + if (history_config.contains(msg::FLD_MAX_R_SHARDS)) + { + if (!history_config[msg::FLD_MAX_R_SHARDS].empty() && !history_config[msg::FLD_MAX_R_SHARDS].is()) { - LOG_ERROR << "Invalid peer value: " << peer; + LOG_ERROR << "Invalid max_raw_shards value."; return -1; } - - uint16_t port; - if (util::stoul(splitted.back(), port) == -1) - { - LOG_ERROR << "Invalid peer port value: " << peer; - return -1; - } - - msg.peers.emplace(conf::host_ip_port{splitted.front(), port}); - splitted.clear(); + else if (!history_config[msg::FLD_MAX_R_SHARDS].empty()) + msg.config.node.history_config.max_raw_shards = history_config[msg::FLD_MAX_R_SHARDS].as(); } } } - if (d.contains(msg::FLD_UNL)) + if (config.contains(msg::FLD_USER)) { - if (!d[msg::FLD_UNL].empty() && !d[msg::FLD_UNL].is_array()) + const jsoncons::json &user = config[msg::FLD_USER]; + if (user.contains(msg::FLD_IDLE_TIMEOUT)) + msg.config.user.idle_timeout = user[msg::FLD_IDLE_TIMEOUT].as(); + + if (user.contains(msg::FLD_MAX_BYTES_MSG)) + msg.config.user.max_bytes_per_msg = user[msg::FLD_MAX_BYTES_MSG].as(); + + if (user.contains(msg::FLD_MAX_BYTES_MIN)) + msg.config.user.max_bytes_per_min = user[msg::FLD_MAX_BYTES_MIN].as(); + + if (user.contains(msg::FLD_MAX_BAD_MSG_MIN)) + msg.config.user.max_bad_msgs_per_min = user[msg::FLD_MAX_BAD_MSG_MIN].as(); + + if (user.contains(msg::FLD_MAX_CONS)) + msg.config.user.max_connections = user[msg::FLD_MAX_CONS].as(); + + if (user.contains(msg::FLD_MAX_IN_CONS_HOST)) + msg.config.user.max_in_connections_per_host = user[msg::FLD_MAX_IN_CONS_HOST].as(); + + if (user.contains(msg::FLD_CON_READ_REQ)) + msg.config.user.concurrent_read_requests = user[msg::FLD_CON_READ_REQ].as(); + } + if (config.contains(msg::FLD_HPFS)) + { + const jsoncons::json &hpfs = config[msg::FLD_HPFS]; + if (hpfs.contains(msg::FLD_LOG) && hpfs[msg::FLD_LOG].contains(msg::FLD_LOG_LEVEL)) + msg.config.hpfs.log.log_level = hpfs[msg::FLD_LOG][msg::FLD_LOG_LEVEL].as(); + } + + if (config.contains(msg::FLD_LOG)) + { + const jsoncons::json &log = config[msg::FLD_LOG]; + if (log.contains(msg::FLD_LOG_LEVEL)) + msg.config.log.log_level = log[msg::FLD_LOG_LEVEL].as(); + + if (log.contains(msg::FLD_MAX_MB_PER_FILE)) + msg.config.log.max_mbytes_per_file = log[msg::FLD_MAX_MB_PER_FILE].as(); + + if (log.contains(msg::FLD_MAX_FILE_COUNT)) + msg.config.log.max_file_count = log[msg::FLD_MAX_FILE_COUNT].as(); + + if (log.contains(msg::FLD_LOGGERS)) { - LOG_ERROR << "Invalid unl value."; - return -1; - } - else if (!d[msg::FLD_UNL].empty() && d[msg::FLD_UNL].size() > 0) - { - for (auto &val : d[msg::FLD_UNL].array_range()) + if (!log[msg::FLD_LOGGERS].empty() && !log[msg::FLD_LOGGERS].is_array()) { - if (!val.is()) + LOG_ERROR << "Invalid loggers value."; + return -1; + } + else if (!log[msg::FLD_LOGGERS].empty() && log[msg::FLD_LOGGERS].size() > 0) + { + for (auto &val : log[msg::FLD_LOGGERS].array_range()) { - LOG_ERROR << "Invalid unl pubkey value."; - return -1; + if (!val.is()) + { + LOG_ERROR << "Invalid log value."; + return -1; + } + msg.config.log.loggers.emplace(val.as()); } - - const std::string unl_pubkey = val.as(); - const std::string unl_pubkey_bin = util::to_bin(unl_pubkey); - if (unl_pubkey_bin.empty()) - { - LOG_ERROR << "Invalid unl pubkey value: " << unl_pubkey; - return -1; - } - - msg.unl.emplace(unl_pubkey_bin); } } } - - if (d.contains(msg::FLD_ROLE)) - { - if (!d[msg::FLD_ROLE].is()) - { - LOG_ERROR << "Invalid role value."; - return -1; - } - - msg.role = d[msg::FLD_ROLE].as(); - } - - if (d.contains(msg::FLD_HISTORY)) - { - if (!d[msg::FLD_HISTORY].is()) - { - LOG_ERROR << "Invalid history value."; - return -1; - } - - msg.history = d[msg::FLD_HISTORY].as(); - } - - if (d.contains(msg::FLD_MAX_P_SHARDS)) - { - if (!d[msg::FLD_MAX_P_SHARDS].empty() && !d[msg::FLD_MAX_P_SHARDS].is()) - { - LOG_ERROR << "Invalid max_primary_shards value."; - return -1; - } - else if (!d[msg::FLD_MAX_P_SHARDS].empty()) - msg.max_primary_shards = d[msg::FLD_MAX_P_SHARDS].as(); - } - - if (d.contains(msg::FLD_MAX_R_SHARDS)) - { - if (!d[msg::FLD_MAX_R_SHARDS].empty() && !d[msg::FLD_MAX_R_SHARDS].is()) - { - LOG_ERROR << "Invalid max_raw_shards value."; - return -1; - } - else if (!d[msg::FLD_MAX_R_SHARDS].empty()) - msg.max_raw_shards = d[msg::FLD_MAX_R_SHARDS].as(); - } return 0; } @@ -304,7 +427,7 @@ namespace msg::json */ int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d) { - if (extract_type_and_id(msg.type, msg.id, d) == -1) + if (extract_type(msg.type, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -337,7 +460,7 @@ namespace msg::json */ int extract_start_message(start_msg &msg, const jsoncons::json &d) { - if (extract_type_and_id(msg.type, msg.id, d) == -1) + if (extract_type(msg.type, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -370,7 +493,7 @@ namespace msg::json */ int extract_stop_message(stop_msg &msg, const jsoncons::json &d) { - if (extract_type_and_id(msg.type, msg.id, d) == -1) + if (extract_type(msg.type, d) == -1) return -1; if (!d.contains(msg::FLD_CONTAINER_NAME)) @@ -394,7 +517,6 @@ namespace msg::json * @param msg Buffer to construct the generated json message string into. * Message format: * { - * 'reply_for': '' * 'type': '', * "content": "" * } @@ -402,14 +524,10 @@ namespace msg::json * @param content Content inside the response. * @param json_content Whether content is a json string. */ - void build_response(std::string &msg, std::string_view response_type, std::string_view reply_for, std::string_view content, const bool json_content) + void build_response(std::string &msg, std::string_view response_type, std::string_view content, const bool json_content) { msg.reserve(1024); msg += "{\""; - msg += msg::FLD_REPLY_FOR; - msg += SEP_COLON; - msg += std::string(reply_for); - msg += SEP_COMMA; msg += msg::FLD_TYPE; msg += SEP_COLON; msg += response_type; diff --git a/src/msg/json/msg_json.hpp b/src/msg/json/msg_json.hpp index 61bcd6e..1927ac3 100644 --- a/src/msg/json/msg_json.hpp +++ b/src/msg/json/msg_json.hpp @@ -12,7 +12,7 @@ namespace msg::json { int parse_message(jsoncons::json &d, std::string_view message); - int extract_type_and_id(std::string &extracted_type, std::string &extracted_id, const jsoncons::json &d); + int extract_type(std::string &extracted_type, const jsoncons::json &d); int extract_create_message(create_msg &msg, const jsoncons::json &d); @@ -24,7 +24,7 @@ namespace msg::json int extract_stop_message(stop_msg &msg, const jsoncons::json &d); - void build_response(std::string &msg, std::string_view response_type, std::string_view reply_for, std::string_view content, const bool json_content = false); + void build_response(std::string &msg, std::string_view response_type, std::string_view content, const bool json_content = false); void build_create_response(std::string &msg, const hp::instance_info &info); diff --git a/src/msg/msg_common.hpp b/src/msg/msg_common.hpp index ccba83f..0136778 100644 --- a/src/msg/msg_common.hpp +++ b/src/msg/msg_common.hpp @@ -8,64 +8,168 @@ namespace msg { struct create_msg { - std::string id; std::string type; std::string pubkey; std::string contract_id; std::string image; }; - // Keep numerical config valus as optional so when updating the config if the value is empty - // We do nothing otherwise we take the value and update the config. - struct initiate_msg + struct history_configuration { - std::string id; - std::string type; - std::string container_name; - std::set peers; - std::set unl; - std::string role; - std::string history; std::optional max_primary_shards; std::optional max_raw_shards; }; + struct node_config + { + std::string role; + std::string history; + history_configuration history_config; + }; + + struct c_log_config + { + std::optional enable; + std::optional max_mbytes_per_file; + std::optional max_file_count; + }; + + struct contract_config + { + std::set unl; + std::optional execute; + c_log_config log; + }; + + struct peer_discovery_config + { + std::optional enabled; // Whether dynamic peer discovery is on/off. + std::optional interval; // Time interval in ms to find for known_peers dynamicpeerdiscovery should be on for this + }; + struct mesh_config + { + std::optional idle_timeout; // Idle connection timeout ms for peer connections. + std::set known_peers; + std::optional msg_forwarding; // Whether peer message forwarding is on/off. + std::optional max_connections; // Max peer connections. + std::optional max_known_connections; // Max known peer connections. + std::optional max_in_connections_per_host; // Max inbound peer connections per remote host (IP). + std::optional max_bytes_per_msg; // Peer message max size in bytes. + std::optional max_bytes_per_min; // Peer message rate (characters(bytes) per minute). + std::optional max_bad_msgs_per_min; // Peer bad messages per minute. + std::optional max_bad_msgsigs_per_min; // Peer bad signatures per minute. + std::optional max_dup_msgs_per_min; // Peer max duplicate messages per minute. + peer_discovery_config peer_discovery; // Peer discovery configs. + }; + + struct user_config + { + std::optional idle_timeout; // Idle connection timeout ms for user connections. + std::optional max_bytes_per_msg; // User message max size in bytes + std::optional max_bytes_per_min; // User message rate (characters(bytes) per minute) + std::optional max_bad_msgs_per_min; // User bad messages per minute + std::optional max_connections; // Max inbound user connections + std::optional max_in_connections_per_host; // Max inbound user connections per remote host (IP). + std::optional concurrent_read_requests; // Supported concurrent read requests count. + }; + + struct hpfs_log_config + { + std::string log_level; // Log severity level (dbg, inf, wrn, wrr) + }; + + struct hpfs_config + { + hpfs_log_config log; + }; + + struct log_config + { + std::string log_level; // Log severity level (dbg, inf, wrn, wrr) + std::unordered_set loggers; // List of enabled loggers (console, file) + std::optional max_mbytes_per_file; // Max MB size of a single log file. + std::optional max_file_count; // Max no. of log files to keep. + }; + + // Keep numerical config valus as optional so when updating the config if the value is empty + // We do nothing otherwise we take the value and update the config. + struct config_struct + { + node_config node; + contract_config contract; + mesh_config mesh; + user_config user; + hpfs_config hpfs; + log_config log; + }; + + struct initiate_msg + { + std::string type; + std::string container_name; + config_struct config; + }; + struct destroy_msg { - std::string id; std::string type; std::string container_name; }; struct start_msg { - std::string id; std::string type; std::string container_name; }; struct stop_msg { - std::string id; std::string type; std::string container_name; }; // Message field names constexpr const char *FLD_TYPE = "type"; - constexpr const char *FLD_REPLY_FOR = "reply_for"; constexpr const char *FLD_CONTENT = "content"; constexpr const char *FLD_PUBKEY = "owner_pubkey"; constexpr const char *FLD_CONTAINER_NAME = "container_name"; constexpr const char *FLD_CONTRACT_ID = "contract_id"; constexpr const char *FLD_IMAGE = "image"; - constexpr const char *FLD_ID = "id"; - constexpr const char *FLD_PEERS = "peers"; + constexpr const char *FLD_KNOWN_PEERS = "known_peers"; + constexpr const char *FLD_MESH = "mesh"; + constexpr const char *FLD_USER = "user"; + constexpr const char *FLD_EXECUTE = "execute"; + constexpr const char *FLD_LOG = "log"; + constexpr const char *FLD_LOG_LEVEL = "log_level"; + constexpr const char *FLD_ENABLE = "enable"; + constexpr const char *FLD_ENABLED = "enabled"; + constexpr const char *FLD_INTERVAL = "interval"; + constexpr const char *FLD_MAX_MB_PER_FILE = "max_mbytes_per_file"; + constexpr const char *FLD_MAX_FILE_COUNT = "max_file_count"; constexpr const char *FLD_UNL = "unl"; + constexpr const char *FLD_CONTRACT = "contract"; + constexpr const char *FLD_NODE = "node"; + constexpr const char *FLD_HPFS = "hpfs"; + constexpr const char *FLD_CONFIG = "config"; constexpr const char *FLD_ROLE = "role"; constexpr const char *FLD_HISTORY = "history"; constexpr const char *FLD_MAX_P_SHARDS = "max_primary_shards"; + constexpr const char *FLD_HISTORY_CONFIG = "history_config"; constexpr const char *FLD_MAX_R_SHARDS = "max_raw_shards"; + constexpr const char *FLD_LOGGERS = "loggers"; + + constexpr const char *FLD_IDLE_TIMEOUT = "idle_timeout"; + constexpr const char *FLD_MSG_FORWARDING = "msg_forwarding"; + constexpr const char *FLD_MAX_CONS = "max_connections"; + constexpr const char *FLD_MAX_KNOWN_CONS = "max_known_connections"; + constexpr const char *FLD_MAX_IN_CONS_HOST = "max_in_connections_per_host"; + constexpr const char *FLD_MAX_BYTES_MSG = "max_bytes_per_msg"; + constexpr const char *FLD_MAX_BYTES_MIN = "max_bytes_per_min"; + constexpr const char *FLD_MAX_BAD_MSG_MIN = "max_bad_msgs_per_min"; + constexpr const char *FLD_MAX_BAD_MSG_SIG_MIN = "max_bad_msgsigs_per_min"; + constexpr const char *FLD_MAX_DUP_MSG_MIN = "max_dup_msgs_per_min"; + constexpr const char *FLD_PEER_DISCOVERY = "peer_discovery"; + constexpr const char *FLD_CON_READ_REQ = "concurrent_read_requests"; // Message types constexpr const char *MSGTYPE_INIT = "init"; diff --git a/src/msg/msg_parser.cpp b/src/msg/msg_parser.cpp index 2bf5bf1..30309e4 100644 --- a/src/msg/msg_parser.cpp +++ b/src/msg/msg_parser.cpp @@ -8,9 +8,9 @@ namespace msg return json::parse_message(jdoc, message); } - int msg_parser::extract_type_and_id(std::string &extracted_type, std::string &extracted_id) const + int msg_parser::extract_type(std::string &extracted_type) const { - return json::extract_type_and_id(extracted_type, extracted_id, jdoc); + return json::extract_type(extracted_type, jdoc); } int msg_parser::extract_create_message(create_msg &msg) const @@ -38,9 +38,9 @@ namespace msg return json::extract_stop_message(msg, jdoc); } - void msg_parser::build_response(std::string &msg, std::string_view response_type, std::string_view reply_for, std::string_view content, const bool json_content) const + void msg_parser::build_response(std::string &msg, std::string_view response_type, std::string_view content, const bool json_content) const { - json::build_response(msg, response_type, reply_for, content, json_content); + json::build_response(msg, response_type, content, json_content); } void msg_parser::build_create_response(std::string &msg, const hp::instance_info &info) const diff --git a/src/msg/msg_parser.hpp b/src/msg/msg_parser.hpp index 44e556c..e5e6ffc 100644 --- a/src/msg/msg_parser.hpp +++ b/src/msg/msg_parser.hpp @@ -13,13 +13,13 @@ namespace msg public: int parse(std::string_view message); - int extract_type_and_id(std::string &extracted_type, std::string &extracted_id) const; + int extract_type(std::string &extracted_type) const; int extract_create_message(create_msg &msg) const; int extract_initiate_message(initiate_msg &msg) const; int extract_destroy_message(destroy_msg &msg) const; int extract_start_message(start_msg &msg) const; int extract_stop_message(stop_msg &msg) const; - void build_response(std::string &msg, std::string_view response_type, std::string_view reply_for, std::string_view content, const bool json_content = false) const; + void build_response(std::string &msg, std::string_view response_type, std::string_view content, const bool json_content = false) const; void build_create_response(std::string &msg, const hp::instance_info &info) const; };