diff --git a/examples/c_contract/echo_contract.c b/examples/c_contract/echo_contract.c index 494f4ae4..f7a0115f 100644 --- a/examples/c_contract/echo_contract.c +++ b/examples/c_contract/echo_contract.c @@ -53,6 +53,10 @@ int main(int argc, char **argv) // printf("Received %.*s from %.*s", len, msg, HP_KEY_SIZE, sender); // free(msg); + // Peers update example: + // const char *add_peers[2] = {"localhost:4444", "localhost:3333"}; + // hp_update_peers(add_peers, 2, NULL, 0); + // Config update example: // struct hp_config *config = hp_get_config(); // hp_set_config_string(&config->version, "2.0", 4); diff --git a/examples/c_contract/hotpocket_contract.h b/examples/c_contract/hotpocket_contract.h index 8e6fc5d2..c47fdb3e 100644 --- a/examples/c_contract/hotpocket_contract.h +++ b/examples/c_contract/hotpocket_contract.h @@ -23,8 +23,9 @@ const char *__HP_PATCH_FILE_PATH = "../patch.cfg"; // Public constants. #define HP_NPL_MSG_MAX_SIZE __HP_SEQPKT_MAX_SIZE -#define HP_KEY_SIZE 66 // Hex pubkey size. (64 char key + 2 chars for key type prfix) -#define HP_HASH_SIZE 64 // Hex hash size. +#define HP_KEY_SIZE 66 // Hex pubkey size. (64 char key + 2 chars for key type prfix) +#define HP_HASH_SIZE 64 // Hex hash size. +#define HP_CONTRACT_ID_SIZE 36 // Contract Id UUIDv4 string length. const char *HP_POST_EXEC_SCRIPT_NAME = "post_exec.sh"; #define __HP_ASSIGN_STRING(dest, elem) \ @@ -170,9 +171,10 @@ struct hp_contract_context { bool readonly; uint64_t timestamp; - char pubkey[HP_KEY_SIZE + 1]; // +1 for null char. - uint64_t lcl_seq_no; // lcl sequence no. - char lcl_hash[HP_HASH_SIZE + 1]; // +1 for null char. + char contract_id[HP_CONTRACT_ID_SIZE + 1]; // +1 for null char. + char pubkey[HP_KEY_SIZE + 1]; // +1 for null char. + uint64_t lcl_seq_no; // lcl sequence no. + char lcl_hash[HP_HASH_SIZE + 1]; // +1 for null char. struct hp_users_collection users; struct hp_unl_collection unl; }; @@ -197,6 +199,7 @@ int hp_writev_npl_msg(const struct iovec *bufs, const int buf_count); int hp_read_npl_msg(void *msg_buf, char *pubkey_buf, const int timeout); struct hp_config *hp_get_config(); int hp_update_config(const struct hp_config *config); +int hp_update_peers(const char *add_peers[], const size_t add_peers_count, const char *remove_peers[], const size_t remove_peers_count); void hp_set_config_string(char **config_str, const char *value, const size_t value_size); void hp_set_config_unl(struct hp_config *config, const struct hp_unl_node *new_unl, const size_t new_unl_count); void hp_free_config(struct hp_config *config); @@ -206,6 +209,8 @@ int __hp_write_control_msg(const void *buf, const uint32_t len); void __hp_populate_patch_from_json_object(struct hp_config *config, const struct json_object_s *object); int __hp_write_to_patch_file(const int fd, const struct hp_config *config); struct hp_config *__hp_read_from_patch_file(const int fd); +size_t __hp_get_json_string_array_encoded_len(const char *elems[], const size_t count); +int __hp_encode_json_string_array(char *buf, const char *elems[], const size_t count); static struct __hp_contract __hpc = {}; @@ -278,9 +283,10 @@ int hp_deinit_contract() __HP_FREE(cctx); // Send termination control message. - __hp_write_control_msg("{\"type\":\"contract_end\"}", 23); + const int ret = __hp_write_control_msg("{\"type\":\"contract_end\"}", 23); + close(__hpc.control_fd); - return 0; + return ret; } const struct hp_contract_context *hp_get_context() @@ -357,7 +363,7 @@ int hp_write_npl_msg(const void *buf, const uint32_t len) { if (len > HP_NPL_MSG_MAX_SIZE) { - fprintf(stderr, "NPL message exceeds max length %d.", HP_NPL_MSG_MAX_SIZE); + fprintf(stderr, "NPL message exceeds max length %d.\n", HP_NPL_MSG_MAX_SIZE); return -1; } @@ -372,7 +378,7 @@ int hp_writev_npl_msg(const struct iovec *bufs, const int buf_count) if (len > HP_NPL_MSG_MAX_SIZE) { - fprintf(stderr, "NPL message exceeds max length %d.", HP_NPL_MSG_MAX_SIZE); + fprintf(stderr, "NPL message exceeds max length %d.\n", HP_NPL_MSG_MAX_SIZE); return -1; } @@ -585,6 +591,83 @@ void hp_free_config(struct hp_config *config) __HP_FREE(config); } +/** + * Updates the known-peers this node must attempt connections to. + * @param add_peers Array of strings containing peers to be added. Each string must be in the format of ":". + * @param add_peers_count No. of peers to be added. + * @param remove_peers Array of strings containing peers to be removed. Each string must be in the format of ":". + * @param remove_peers_count No. of peers to be removed. + */ +int hp_update_peers(const char *add_peers[], const size_t add_peers_count, const char *remove_peers[], const size_t remove_peers_count) +{ + const size_t add_json_len = __hp_get_json_string_array_encoded_len(add_peers, add_peers_count); + char add_json[add_json_len]; + if (__hp_encode_json_string_array(add_json, add_peers, add_peers_count) == -1) + { + fprintf(stderr, "Error when encoding peer update changeset 'add'.\n"); + return -1; + } + + const size_t remove_json_len = __hp_get_json_string_array_encoded_len(remove_peers, remove_peers_count); + char remove_json[remove_json_len]; + if (__hp_encode_json_string_array(remove_json, remove_peers, remove_peers_count) == -1) + { + fprintf(stderr, "Error when encoding peer update changeset 'remove'.\n"); + return -1; + } + + const size_t msg_len = 47 + (add_json_len - 1) + (remove_json_len - 1); + char msg[msg_len]; + sprintf(msg, "{\"type\":\"peer_changeset\",\"add\":[%s],\"remove\":[%s]}", add_json, remove_json); + + if (__hp_write_control_msg(msg, msg_len - 1) == -1) + return -1; + + return 0; +} + +/** + * Returns the null-terminated string length required to encode as a json string array without enclosing brackets. + * @param elems Array of strings. + * @param count No. of strings. + */ +size_t __hp_get_json_string_array_encoded_len(const char *elems[], const size_t count) +{ + size_t len = 1; // +1 for null terminator. + for (size_t i = 0; i < count; i++) + { + len += (strlen(elems[i]) + 2); // Quoted string. + if (i < count - 1) + len += 1; // Comma + } + + return len; +} + +/** + * Formats a string array in JSON notation without enclosing brackets. + * @param buf Buffer to populate the encoded output. + * @param elems Array of strings. + * @param count No. of strings. + */ +int __hp_encode_json_string_array(char *buf, const char *elems[], const size_t count) +{ + size_t pos = 0; + for (size_t i = 0; i < count; i++) + { + const char *elem = elems[i]; + buf[pos++] = '\"'; + strcpy((buf + pos), elem); + pos += strlen(elem); + buf[pos++] = '\"'; + + if (i < count - 1) + buf[pos++] = ','; + } + buf[pos] = '\0'; + return 0; +} + /** * Read the values from the existing patch file. * @param fd File discriptor of the patch.cfg file. @@ -848,7 +931,11 @@ void __hp_parse_args_json(const struct json_object_s *object) { const struct json_string_s *k = elem->name; - if (strcmp(k->string, "pubkey") == 0) + if (strcmp(k->string, "contract_id") == 0) + { + __HP_ASSIGN_STRING(cctx->contract_id, elem); + } + else if (strcmp(k->string, "pubkey") == 0) { __HP_ASSIGN_STRING(cctx->pubkey, elem); } @@ -959,7 +1046,7 @@ int __hp_write_control_msg(const void *buf, const uint32_t len) { if (len > __HP_SEQPKT_MAX_SIZE) { - fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_MAX_SIZE); + fprintf(stderr, "Control message exceeds max length %d.\n", __HP_SEQPKT_MAX_SIZE); return -1; } diff --git a/examples/nodejs_contract/package-lock.json b/examples/nodejs_contract/package-lock.json index c9b3f11c..b7585579 100644 --- a/examples/nodejs_contract/package-lock.json +++ b/examples/nodejs_contract/package-lock.json @@ -26,9 +26,9 @@ } }, "hotpocket-nodejs-contract": { - "version": "0.5.0", - "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.0.tgz", - "integrity": "sha512-KSxlnmgu9AAZEChom/Q8NxAiTUE47xDGgt5eHMEWf0U+95ezr8yxLjZEyoIWrhFZGNMvjQ+7ITjMmskSF3onQA==" + "version": "0.5.2", + "resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.2.tgz", + "integrity": "sha512-kBDpdWqA5x3i72wkhUVnhpp5Z5Qojw9MQD6NUbpjpzo4YYf14KNT5MZjX+ugRggE26UlueCaYdhKcBu+vaYNaQ==" }, "ieee754": { "version": "1.1.13", diff --git a/examples/nodejs_contract/package.json b/examples/nodejs_contract/package.json index d7397528..7c52fbb1 100644 --- a/examples/nodejs_contract/package.json +++ b/examples/nodejs_contract/package.json @@ -5,7 +5,7 @@ "build-diag": "ncc build diagnostic_contract.js -o dist/diagnostic-contract" }, "dependencies": { - "hotpocket-nodejs-contract": "0.5.0", + "hotpocket-nodejs-contract": "0.5.2", "bson": "4.0.4", "seedrandom": "3.0.5" } diff --git a/src/conf.cpp b/src/conf.cpp index 7bcd5c48..32aa2e70 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -406,20 +406,13 @@ namespace conf cfg.mesh.known_peers.clear(); for (auto &v : mesh["known_peers"].array_range()) { - const char *ipport_concat = v.as(); - // Split the address:port text into two - util::split_string(splitted_peers, ipport_concat, ":"); - - // Push the peer address and the port to peers set - if (splitted_peers.size() != 2) + peer_ip_port ipp; + std::string_view ipport_concat = v.as(); + if (ipp.from_string(ipport_concat) == -1) { std::cerr << "Invalid peer: " << ipport_concat << "\n"; return -1; } - - peer_ip_port ipp; - ipp.host_address = splitted_peers.front(); - ipp.port = std::stoi(splitted_peers.back()); cfg.mesh.known_peers.emplace(ipp); splitted_peers.clear(); } diff --git a/src/conf.hpp b/src/conf.hpp index 28589f89..e8e56d1e 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -20,6 +20,29 @@ namespace conf std::string host_address; uint16_t port = 0; + /** + * Populates ip/port pair from a string format of ":" + */ + int from_string(std::string_view str) + { + // Split the address:port text into two + std::vector split; + util::split_string(split, str, ":"); + + if (split.size() != 2) + return -1; + + host_address = split.front(); + if (host_address.empty()) + return -1; + + port = std::stoi(split.back()); + if (port == 0) + return -1; + + return 0; + } + bool operator==(const peer_ip_port &other) const { return host_address == other.host_address && port == other.port; diff --git a/src/consensus.cpp b/src/consensus.cpp index ee01865a..8b6d46e2 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -194,6 +194,7 @@ namespace consensus if (!was_in_sync && newly_in_sync && ctx.sync_ongoing) dispatch_synced_ledger_input_statuses(lcl_id); + LOG_DEBUG << "Vote status: " << new_vote_status; status::set_vote_status(new_vote_status); } diff --git a/src/msg/controlmsg_common.hpp b/src/msg/controlmsg_common.hpp index ddca4d6a..391f5d1d 100644 --- a/src/msg/controlmsg_common.hpp +++ b/src/msg/controlmsg_common.hpp @@ -12,6 +12,7 @@ namespace msg::controlmsg // Message types constexpr const char *MSGTYPE_CONTRACT_END = "contract_end"; + constexpr const char *MSGTYPE_PEER_CHANGESET = "peer_changeset"; } // namespace msg::controlmsg diff --git a/src/msg/controlmsg_parser.cpp b/src/msg/controlmsg_parser.cpp index 4299e6ee..461cef2c 100644 --- a/src/msg/controlmsg_parser.cpp +++ b/src/msg/controlmsg_parser.cpp @@ -16,4 +16,9 @@ namespace msg::controlmsg return jctlmsg::extract_type(extracted_type, jdoc); } + int controlmsg_parser::extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers) const + { + return jctlmsg::extract_peer_changeset(added_peers, removed_peers, jdoc); + } + } // namespace msg::controlmsg \ No newline at end of file diff --git a/src/msg/controlmsg_parser.hpp b/src/msg/controlmsg_parser.hpp index 170efbea..e21eeeb0 100644 --- a/src/msg/controlmsg_parser.hpp +++ b/src/msg/controlmsg_parser.hpp @@ -2,6 +2,7 @@ #define _HP_MSG_CONTROLMSG_PARSER_ #include "../pchheader.hpp" +#include "../p2p/p2p.hpp" namespace msg::controlmsg { @@ -12,6 +13,7 @@ namespace msg::controlmsg public: int parse(std::string_view message); int extract_type(std::string &extracted_type) const; + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers) const; }; } // namespace msg::controlmsg diff --git a/src/msg/json/controlmsg_json.cpp b/src/msg/json/controlmsg_json.cpp index 7b024a54..e9ceb129 100644 --- a/src/msg/json/controlmsg_json.cpp +++ b/src/msg/json/controlmsg_json.cpp @@ -12,14 +12,11 @@ namespace msg::controlmsg::json constexpr const char *SEP_COMMA_NOQUOTE = ",\""; constexpr const char *SEP_COLON_NOQUOTE = "\":"; - // Message types - constexpr const char *MSGTYPE_HANDSHAKE_CHALLENGE = "handshake_challenge"; - /** * Parses a json control message sent by the contract. * @param d Jsoncons document to which the parsed json should be loaded. * @param message The message to parse. - * Accepted message format: + * Message format: * { * 'type': '' * ... @@ -57,4 +54,54 @@ namespace msg::controlmsg::json return 0; } + /** + * Extracts the peer changes from a peer changeset message. + * Message format: + * { + * 'type': 'peer_changeset', + * 'add': ['','', ...], + * 'remove': ['','', ...] + * } + */ + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, const jsoncons::json &d) + { + if (extract_peers_from_array(added_peers, msg::controlmsg::FLD_ADD, d) == -1 || + extract_peers_from_array(removed_peers, msg::controlmsg::FLD_REMOVE, d) == -1) + return -1; + + return 0; + } + + int extract_peers_from_array(std::vector &peers, std::string_view field, const jsoncons::json &d) + { + if (d.contains(field)) + { + if (!d[field].is_array()) + { + LOG_ERROR << "Extract peers: Invalid array field: " << field; + return -1; + } + + for (auto &peer : d[field].array_range()) + { + if (!peer.is()) + { + LOG_ERROR << "Extract peers: Invalid peer entry in field: " << field; + return -1; + } + + conf::peer_ip_port ipp; + if (ipp.from_string(peer.as()) == -1) + { + LOG_ERROR << "Extract peers: Invalid peer format in field: " << field; + return -1; + } + + peers.push_back(p2p::peer_properties{ipp, -1, 0, 0}); + } + } + + return 0; + } + } // namespace msg::controlmsg::json \ No newline at end of file diff --git a/src/msg/json/controlmsg_json.hpp b/src/msg/json/controlmsg_json.hpp index c839ed4b..3bb126c5 100644 --- a/src/msg/json/controlmsg_json.hpp +++ b/src/msg/json/controlmsg_json.hpp @@ -2,6 +2,7 @@ #define _HP_MSG_JSON_CONTROLMSG_JSON_ #include "../../pchheader.hpp" +#include "../../p2p/p2p.hpp" /** * Parser helpers for smart contract control messages. @@ -12,6 +13,9 @@ namespace msg::controlmsg::json int extract_type(std::string &extracted_type, const jsoncons::json &d); + int extract_peer_changeset(std::vector &added_peers, std::vector &removed_peers, const jsoncons::json &d); + + int extract_peers_from_array(std::vector &peers, std::string_view field, const jsoncons::json &d); } // namespace msg::controlmsg::json diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index a0da4abf..3dc866d4 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -454,62 +454,84 @@ namespace p2p p2pmsg::create_msg_from_peer_list_request(fbuf); std::string target_pubkey; send_message_to_random_peer(fbuf, target_pubkey); - LOG_DEBUG << "Peer list request: Requesting from [" << target_pubkey.substr(0, 10) << "]"; + + if (!target_pubkey.empty()) + LOG_DEBUG << "Peer list requested from [" << target_pubkey.substr(0, 10) << "]"; } /** * Merging the response peer list with the own known peer list. - * @param peers Incoming peer list. + * @param merge_peers Peers that must be merged with existing known peers. + * @param remove_peers Peers that must be removed from existing known peers. * @param from The session that sent us the peer list. */ - void merge_peer_list(const std::vector &peers, const p2p::peer_comm_session &from) + void merge_peer_list(const std::vector *merge_peers, const std::vector *remove_peers, const p2p::peer_comm_session *from) { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - for (const peer_properties &peer : peers) + if (merge_peers) { - // If the peer address is indicated as empty, that is the entry for the peer who sent us this. - // We then fill that up with the host address we see for that peer. - // if (peer.ip_port.host_address.empty()) - // { - // peer.ip_port.host_address = from.host_address; - // } - - // If the peer is self, we won't add to the known peer list. - if (self::ip_port.has_value() && self::ip_port == peer.ip_port) + for (const peer_properties &peer : *merge_peers) { - LOG_DEBUG << "Rejecting " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + ". Loopback connection."; - continue; - } + // If the peer address is indicated as empty, that is the entry for the peer who sent us this. + // We then fill that up with the host address we see for that peer. + // if (from && peer.ip_port.host_address.empty()) + // { + // peer.ip_port.host_address = from->host_address; + // } - const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) - { return p.ip_port == peer.ip_port; }); - - // If the new peer is not in the peer list then add to the req_known_remotes - // Otherwise if new peer is recently updated (timestamp >) replace with the current one. - if (itr == ctx.server->req_known_remotes.end()) - { - // If maximum number of peer list reached skip the rest of peers. - if (ctx.server->req_known_remotes.size() < p2p::PEER_LIST_CAP) + // If the peer is self, we won't add to the known peer list. + if (self::ip_port.has_value() && self::ip_port == peer.ip_port) { - ctx.server->req_known_remotes.push_back(peer); - LOG_DEBUG << "Adding " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " to the known peer list."; + LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Loopback connection."; + continue; } - else + + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) + { return p.ip_port == peer.ip_port; }); + + // If the new peer is not in the peer list then add to the req_known_remotes + // Otherwise if new peer is recently updated (timestamp >) replace with the current one. + if (itr == ctx.server->req_known_remotes.end()) { - LOG_DEBUG << "Rejecting " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + ". Maximum peer count reached."; + // If maximum number of peer list reached skip the rest of peers. + if (ctx.server->req_known_remotes.size() < p2p::PEER_LIST_CAP) + { + ctx.server->req_known_remotes.push_back(peer); + LOG_DEBUG << "Adding " + peer.ip_port.to_string() + " to the known peer list."; + } + else + { + LOG_DEBUG << "Rejecting " + peer.ip_port.to_string() + ". Maximum peer count reached."; + } + } + else if (itr->timestamp < peer.timestamp) + { + itr->available_capacity = peer.available_capacity; + itr->timestamp = peer.timestamp; + LOG_DEBUG << "Replacing " + peer.ip_port.to_string() + " in the known peer list."; } } - else if (itr->timestamp < peer.timestamp) + } + + if (remove_peers) + { + for (const peer_properties &peer : *remove_peers) { - itr->available_capacity = peer.available_capacity; - itr->timestamp = peer.timestamp; - LOG_DEBUG << "Replacing " + peer.ip_port.host_address + ":" + std::to_string(peer.ip_port.port) + " in the known peer list."; + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) + { return p.ip_port == peer.ip_port; }); + + if (itr != ctx.server->req_known_remotes.end()) + { + LOG_DEBUG << "Removing " << peer.ip_port.to_string() << " from known peer list."; + ctx.server->req_known_remotes.erase(itr); + } } } // Sorting the known remote list according to the weight value after merging the peer list. - sort_known_remotes(); + if (merge_peers || remove_peers) + sort_known_remotes(); } /** diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 7549218e..9802a9d7 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -237,7 +237,7 @@ namespace p2p void update_known_peer_available_capacity(const conf::peer_ip_port &ip_port, const int16_t available_capacity, const uint64_t ×tamp); - void merge_peer_list(const std::vector &peers, const p2p::peer_comm_session &from); + void merge_peer_list(const std::vector *merge_peers, const std::vector *remove_peers, const p2p::peer_comm_session *from = NULL); void sort_known_remotes(); diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 0b9a38d8..409aa3de 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -149,7 +149,8 @@ namespace p2p if (mi.type == p2pmsg::P2PMsgContent_PeerListResponseMsg) { - p2p::merge_peer_list(p2pmsg::create_peer_list_response_from_msg(mi), session); + const std::vector merge_peers = p2pmsg::create_peer_list_response_from_msg(mi); + p2p::merge_peer_list(&merge_peers, NULL, &session); } else if (mi.type == p2pmsg::P2PMsgContent_PeerListRequestMsg) { diff --git a/src/sc/sc.cpp b/src/sc/sc.cpp index 4161d50a..ebe7c94c 100644 --- a/src/sc/sc.cpp +++ b/src/sc/sc.cpp @@ -8,6 +8,7 @@ #include "../msg/controlmsg_parser.hpp" #include "../unl.hpp" #include "../util/version.hpp" +#include "../p2p/p2p.hpp" #include "contract_serve.hpp" #include "sc.hpp" #include "hpfs_log_sync.hpp" @@ -1093,6 +1094,13 @@ namespace sc { ctx.termination_signaled = true; } + else if (type == msg::controlmsg::MSGTYPE_PEER_CHANGESET) + { + std::vector added_peers; + std::vector removed_peers; + if (parser.extract_peer_changeset(added_peers, removed_peers) != -1) + p2p::merge_peer_list(&added_peers, &removed_peers); + } } /**