Support additional hp config. (#39)

This commit is contained in:
Savinda Senevirathne
2021-07-26 12:14:41 +05:30
committed by GitHub
parent adbb1c8839
commit 4820f9660e
13 changed files with 735 additions and 266 deletions

View File

@@ -44,9 +44,9 @@ namespace msg::json
}
/**
* Extracts the message 'type' and 'id' values from the json document.
* Extracts the message 'type' values from the json document.
*/
int extract_type_and_id(std::string &extracted_type, std::string &extracted_id, const jsoncons::json &d)
int extract_type(std::string &extracted_type, const jsoncons::json &d)
{
if (!d.contains(msg::FLD_TYPE))
{
@@ -61,19 +61,6 @@ namespace msg::json
}
extracted_type = d[msg::FLD_TYPE].as<std::string>();
if (!d.contains(msg::FLD_ID))
{
LOG_ERROR << "Field id is missing.";
return -1;
}
if (!d[msg::FLD_ID].is<std::string>())
{
LOG_ERROR << "Invalid id value.";
return -1;
}
extracted_id = d[msg::FLD_ID].as<std::string>();
return 0;
}
@@ -92,10 +79,9 @@ namespace msg::json
*/
int extract_create_message(create_msg &msg, const jsoncons::json &d)
{
if (extract_type_and_id(msg.type, msg.id, d) == -1)
if (extract_type(msg.type, d) == -1)
return -1;
if (!d.contains(msg::FLD_PUBKEY))
{
LOG_ERROR << "Field owner_pubkey is missing.";
@@ -145,25 +131,19 @@ namespace msg::json
* Accepted signed input container format:
* {
* "type": "initiate",
* "owner_pubkey": "<pubkey of the owner>",
* "container_name": "<container name>",
* "peers": [<'ip:port' peer list>],
* "unl": [<hex unl pubkey list>],
* "role": <role>,
* "history": <history mode>,
* "max_primary_shards": <number of max primary shards>,
* "max_raw_shards": <number of max raw shards>
* "config": {---config overrides----}
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_initiate_message(initiate_msg &msg, const jsoncons::json &d)
{
if (extract_type_and_id(msg.type, msg.id, d) == -1)
if (extract_type(msg.type, d) == -1)
return -1;
if (!d.contains(msg::FLD_CONTAINER_NAME))
{
LOG_ERROR << "Field contract_name is missing.";
LOG_ERROR << "Field container_name is missing.";
return -1;
}
@@ -174,119 +154,262 @@ namespace msg::json
}
msg.container_name = d[msg::FLD_CONTAINER_NAME].as<std::string>();
if (d.contains(msg::FLD_PEERS))
if (!d.contains(msg::FLD_CONFIG))
{
if (!d[msg::FLD_PEERS].empty() && !d[msg::FLD_PEERS].is_array())
LOG_ERROR << "Field config is missing.";
return -1;
}
const jsoncons::json &config = d[msg::FLD_CONFIG];
if (config.contains(msg::FLD_MESH))
{
const jsoncons::json &mesh = config[msg::FLD_MESH];
if (mesh.contains(msg::FLD_IDLE_TIMEOUT))
msg.config.mesh.idle_timeout = mesh[msg::FLD_IDLE_TIMEOUT].as<uint32_t>();
if (mesh.contains(msg::FLD_KNOWN_PEERS))
{
LOG_ERROR << "Invalid peers value.";
return -1;
}
else if (!d[msg::FLD_PEERS].empty() && d[msg::FLD_PEERS].size() > 0)
{
std::vector<std::string> splitted;
for (auto &val : d[msg::FLD_PEERS].array_range())
if (!mesh[msg::FLD_KNOWN_PEERS].empty() && !mesh[msg::FLD_KNOWN_PEERS].is_array())
{
if (!val.is<std::string>())
LOG_ERROR << "Invalid known_peers value.";
return -1;
}
else if (!mesh[msg::FLD_KNOWN_PEERS].empty() && mesh[msg::FLD_KNOWN_PEERS].size() > 0)
{
std::vector<std::string> splitted;
for (auto &val : mesh[msg::FLD_KNOWN_PEERS].array_range())
{
LOG_ERROR << "Invalid peer value.";
if (!val.is<std::string>())
{
LOG_ERROR << "Invalid peer value.";
return -1;
}
const std::string peer = val.as<std::string>();
util::split_string(splitted, peer, ":");
if (splitted.size() != 2)
{
LOG_ERROR << "Invalid peer value: " << peer;
return -1;
}
uint16_t port;
if (util::stoul(splitted.back(), port) == -1)
{
LOG_ERROR << "Invalid peer port value: " << peer;
return -1;
}
msg.config.mesh.known_peers.emplace(conf::host_ip_port{splitted.front(), port});
splitted.clear();
}
}
}
if (mesh.contains(msg::FLD_MSG_FORWARDING))
msg.config.mesh.msg_forwarding = mesh[msg::FLD_MSG_FORWARDING].as<bool>();
if (mesh.contains(msg::FLD_MAX_CONS))
msg.config.mesh.max_connections = mesh[msg::FLD_MAX_CONS].as<uint16_t>();
if (mesh.contains(msg::FLD_MAX_KNOWN_CONS))
msg.config.mesh.max_known_connections = mesh[msg::FLD_MAX_KNOWN_CONS].as<uint16_t>();
if (mesh.contains(msg::FLD_MAX_IN_CONS_HOST))
msg.config.mesh.max_in_connections_per_host = mesh[msg::FLD_MAX_IN_CONS_HOST].as<uint16_t>();
if (mesh.contains(msg::FLD_MAX_BYTES_MSG))
msg.config.mesh.max_bytes_per_msg = mesh[msg::FLD_MAX_BYTES_MSG].as<uint64_t>();
if (mesh.contains(msg::FLD_MAX_BYTES_MIN))
msg.config.mesh.max_bytes_per_min = mesh[msg::FLD_MAX_BYTES_MIN].as<uint64_t>();
if (mesh.contains(msg::FLD_MAX_BAD_MSG_MIN))
msg.config.mesh.max_bad_msgs_per_min = mesh[msg::FLD_MAX_BAD_MSG_MIN].as<uint64_t>();
if (mesh.contains(msg::FLD_MAX_BAD_MSG_SIG_MIN))
msg.config.mesh.max_bad_msgsigs_per_min = mesh[msg::FLD_MAX_BAD_MSG_SIG_MIN].as<uint64_t>();
if (mesh.contains(msg::FLD_MAX_DUP_MSG_MIN))
msg.config.mesh.max_dup_msgs_per_min = mesh[msg::FLD_MAX_DUP_MSG_MIN].as<uint64_t>();
if (mesh.contains(msg::FLD_PEER_DISCOVERY))
{
const jsoncons::json &peer_discovery = mesh[msg::FLD_PEER_DISCOVERY];
if (peer_discovery.contains(msg::FLD_ENABLED))
msg.config.mesh.peer_discovery.enabled = peer_discovery[msg::FLD_ENABLED].as<bool>();
if (peer_discovery.contains(msg::FLD_INTERVAL))
msg.config.mesh.peer_discovery.interval = peer_discovery[msg::FLD_INTERVAL].as<uint16_t>();
}
}
if (config.contains(msg::FLD_CONTRACT))
{
const jsoncons::json &contract = config[msg::FLD_CONTRACT];
if (contract.contains(msg::FLD_UNL))
{
if (!contract[msg::FLD_UNL].empty() && !contract[msg::FLD_UNL].is_array())
{
LOG_ERROR << "Invalid unl value.";
return -1;
}
else if (!contract[msg::FLD_UNL].empty() && contract[msg::FLD_UNL].size() > 0)
{
for (auto &val : contract[msg::FLD_UNL].array_range())
{
if (!val.is<std::string>())
{
LOG_ERROR << "Invalid unl pubkey value.";
return -1;
}
const std::string unl_pubkey = val.as<std::string>();
const std::string unl_pubkey_bin = util::to_bin(unl_pubkey);
if (unl_pubkey_bin.empty())
{
LOG_ERROR << "Invalid unl pubkey value: " << unl_pubkey;
return -1;
}
msg.config.contract.unl.emplace(unl_pubkey_bin);
}
}
}
if (contract.contains(msg::FLD_EXECUTE))
msg.config.contract.execute = contract[msg::FLD_EXECUTE].as<bool>();
if (contract.contains(msg::FLD_LOG))
{
const jsoncons::json &log = contract[msg::FLD_LOG];
if (log.contains(msg::FLD_ENABLE))
msg.config.contract.log.enable = log[msg::FLD_ENABLE].as<bool>();
if (log.contains(msg::FLD_MAX_MB_PER_FILE))
msg.config.contract.log.max_mbytes_per_file = log[msg::FLD_MAX_MB_PER_FILE].as<size_t>();
if (log.contains(msg::FLD_MAX_FILE_COUNT))
msg.config.contract.log.max_file_count = log[msg::FLD_MAX_FILE_COUNT].as<size_t>();
}
}
if (config.contains(msg::FLD_NODE))
{
const jsoncons::json &node = config[msg::FLD_NODE];
if (node.contains(msg::FLD_ROLE))
{
if (!node[msg::FLD_ROLE].is<std::string>())
{
LOG_ERROR << "Invalid role value.";
return -1;
}
msg.config.node.role = node[msg::FLD_ROLE].as<std::string>();
}
if (node.contains(msg::FLD_HISTORY))
{
if (!node[msg::FLD_HISTORY].is<std::string>())
{
LOG_ERROR << "Invalid history value.";
return -1;
}
msg.config.node.history = node[msg::FLD_HISTORY].as<std::string>();
}
if (node.contains(msg::FLD_HISTORY_CONFIG))
{
const jsoncons::json &history_config = node[msg::FLD_HISTORY_CONFIG];
if (history_config.contains(msg::FLD_MAX_P_SHARDS))
{
if (!history_config[msg::FLD_MAX_P_SHARDS].empty() && !history_config[msg::FLD_MAX_P_SHARDS].is<uint64_t>())
{
LOG_ERROR << "Invalid max_primary_shards value.";
return -1;
}
else if (!history_config[msg::FLD_MAX_P_SHARDS].empty())
msg.config.node.history_config.max_primary_shards = history_config[msg::FLD_MAX_P_SHARDS].as<uint64_t>();
}
const std::string peer = val.as<std::string>();
util::split_string(splitted, peer, ":");
if (splitted.size() != 2)
if (history_config.contains(msg::FLD_MAX_R_SHARDS))
{
if (!history_config[msg::FLD_MAX_R_SHARDS].empty() && !history_config[msg::FLD_MAX_R_SHARDS].is<uint64_t>())
{
LOG_ERROR << "Invalid peer value: " << peer;
LOG_ERROR << "Invalid max_raw_shards value.";
return -1;
}
uint16_t port;
if (util::stoul(splitted.back(), port) == -1)
{
LOG_ERROR << "Invalid peer port value: " << peer;
return -1;
}
msg.peers.emplace(conf::host_ip_port{splitted.front(), port});
splitted.clear();
else if (!history_config[msg::FLD_MAX_R_SHARDS].empty())
msg.config.node.history_config.max_raw_shards = history_config[msg::FLD_MAX_R_SHARDS].as<uint64_t>();
}
}
}
if (d.contains(msg::FLD_UNL))
if (config.contains(msg::FLD_USER))
{
if (!d[msg::FLD_UNL].empty() && !d[msg::FLD_UNL].is_array())
const jsoncons::json &user = config[msg::FLD_USER];
if (user.contains(msg::FLD_IDLE_TIMEOUT))
msg.config.user.idle_timeout = user[msg::FLD_IDLE_TIMEOUT].as<uint32_t>();
if (user.contains(msg::FLD_MAX_BYTES_MSG))
msg.config.user.max_bytes_per_msg = user[msg::FLD_MAX_BYTES_MSG].as<uint64_t>();
if (user.contains(msg::FLD_MAX_BYTES_MIN))
msg.config.user.max_bytes_per_min = user[msg::FLD_MAX_BYTES_MIN].as<uint64_t>();
if (user.contains(msg::FLD_MAX_BAD_MSG_MIN))
msg.config.user.max_bad_msgs_per_min = user[msg::FLD_MAX_BAD_MSG_MIN].as<uint64_t>();
if (user.contains(msg::FLD_MAX_CONS))
msg.config.user.max_connections = user[msg::FLD_MAX_CONS].as<uint16_t>();
if (user.contains(msg::FLD_MAX_IN_CONS_HOST))
msg.config.user.max_in_connections_per_host = user[msg::FLD_MAX_IN_CONS_HOST].as<uint16_t>();
if (user.contains(msg::FLD_CON_READ_REQ))
msg.config.user.concurrent_read_requests = user[msg::FLD_CON_READ_REQ].as<uint64_t>();
}
if (config.contains(msg::FLD_HPFS))
{
const jsoncons::json &hpfs = config[msg::FLD_HPFS];
if (hpfs.contains(msg::FLD_LOG) && hpfs[msg::FLD_LOG].contains(msg::FLD_LOG_LEVEL))
msg.config.hpfs.log.log_level = hpfs[msg::FLD_LOG][msg::FLD_LOG_LEVEL].as<std::string>();
}
if (config.contains(msg::FLD_LOG))
{
const jsoncons::json &log = config[msg::FLD_LOG];
if (log.contains(msg::FLD_LOG_LEVEL))
msg.config.log.log_level = log[msg::FLD_LOG_LEVEL].as<std::string>();
if (log.contains(msg::FLD_MAX_MB_PER_FILE))
msg.config.log.max_mbytes_per_file = log[msg::FLD_MAX_MB_PER_FILE].as<uint64_t>();
if (log.contains(msg::FLD_MAX_FILE_COUNT))
msg.config.log.max_file_count = log[msg::FLD_MAX_FILE_COUNT].as<uint64_t>();
if (log.contains(msg::FLD_LOGGERS))
{
LOG_ERROR << "Invalid unl value.";
return -1;
}
else if (!d[msg::FLD_UNL].empty() && d[msg::FLD_UNL].size() > 0)
{
for (auto &val : d[msg::FLD_UNL].array_range())
if (!log[msg::FLD_LOGGERS].empty() && !log[msg::FLD_LOGGERS].is_array())
{
if (!val.is<std::string>())
LOG_ERROR << "Invalid loggers value.";
return -1;
}
else if (!log[msg::FLD_LOGGERS].empty() && log[msg::FLD_LOGGERS].size() > 0)
{
for (auto &val : log[msg::FLD_LOGGERS].array_range())
{
LOG_ERROR << "Invalid unl pubkey value.";
return -1;
if (!val.is<std::string>())
{
LOG_ERROR << "Invalid log value.";
return -1;
}
msg.config.log.loggers.emplace(val.as<std::string>());
}
const std::string unl_pubkey = val.as<std::string>();
const std::string unl_pubkey_bin = util::to_bin(unl_pubkey);
if (unl_pubkey_bin.empty())
{
LOG_ERROR << "Invalid unl pubkey value: " << unl_pubkey;
return -1;
}
msg.unl.emplace(unl_pubkey_bin);
}
}
}
if (d.contains(msg::FLD_ROLE))
{
if (!d[msg::FLD_ROLE].is<std::string>())
{
LOG_ERROR << "Invalid role value.";
return -1;
}
msg.role = d[msg::FLD_ROLE].as<std::string>();
}
if (d.contains(msg::FLD_HISTORY))
{
if (!d[msg::FLD_HISTORY].is<std::string>())
{
LOG_ERROR << "Invalid history value.";
return -1;
}
msg.history = d[msg::FLD_HISTORY].as<std::string>();
}
if (d.contains(msg::FLD_MAX_P_SHARDS))
{
if (!d[msg::FLD_MAX_P_SHARDS].empty() && !d[msg::FLD_MAX_P_SHARDS].is<uint64_t>())
{
LOG_ERROR << "Invalid max_primary_shards value.";
return -1;
}
else if (!d[msg::FLD_MAX_P_SHARDS].empty())
msg.max_primary_shards = d[msg::FLD_MAX_P_SHARDS].as<uint64_t>();
}
if (d.contains(msg::FLD_MAX_R_SHARDS))
{
if (!d[msg::FLD_MAX_R_SHARDS].empty() && !d[msg::FLD_MAX_R_SHARDS].is<uint64_t>())
{
LOG_ERROR << "Invalid max_raw_shards value.";
return -1;
}
else if (!d[msg::FLD_MAX_R_SHARDS].empty())
msg.max_raw_shards = d[msg::FLD_MAX_R_SHARDS].as<uint64_t>();
}
return 0;
}
@@ -304,7 +427,7 @@ namespace msg::json
*/
int extract_destroy_message(destroy_msg &msg, const jsoncons::json &d)
{
if (extract_type_and_id(msg.type, msg.id, d) == -1)
if (extract_type(msg.type, d) == -1)
return -1;
if (!d.contains(msg::FLD_CONTAINER_NAME))
@@ -337,7 +460,7 @@ namespace msg::json
*/
int extract_start_message(start_msg &msg, const jsoncons::json &d)
{
if (extract_type_and_id(msg.type, msg.id, d) == -1)
if (extract_type(msg.type, d) == -1)
return -1;
if (!d.contains(msg::FLD_CONTAINER_NAME))
@@ -370,7 +493,7 @@ namespace msg::json
*/
int extract_stop_message(stop_msg &msg, const jsoncons::json &d)
{
if (extract_type_and_id(msg.type, msg.id, d) == -1)
if (extract_type(msg.type, d) == -1)
return -1;
if (!d.contains(msg::FLD_CONTAINER_NAME))
@@ -394,7 +517,6 @@ namespace msg::json
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* 'reply_for': '<reply_for>'
* 'type': '<message type>',
* "content": "<any string>"
* }
@@ -402,14 +524,10 @@ namespace msg::json
* @param content Content inside the response.
* @param json_content Whether content is a json string.
*/
void build_response(std::string &msg, std::string_view response_type, std::string_view reply_for, std::string_view content, const bool json_content)
void build_response(std::string &msg, std::string_view response_type, std::string_view content, const bool json_content)
{
msg.reserve(1024);
msg += "{\"";
msg += msg::FLD_REPLY_FOR;
msg += SEP_COLON;
msg += std::string(reply_for);
msg += SEP_COMMA;
msg += msg::FLD_TYPE;
msg += SEP_COLON;
msg += response_type;