diff --git a/CMakeLists.txt b/CMakeLists.txt index 50ac2727..ec9c2884 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,6 +28,7 @@ add_executable(hpcore src/util/rollover_hashset.cpp src/util/ttl_set.cpp src/util/buffer_store.cpp + src/unl.cpp src/crypto.cpp src/conf.cpp src/hplog.cpp diff --git a/examples/c_contract/hotpocket_contract.h b/examples/c_contract/hotpocket_contract.h index 11b8a42f..ead3278a 100644 --- a/examples/c_contract/hotpocket_contract.h +++ b/examples/c_contract/hotpocket_contract.h @@ -131,12 +131,15 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len); int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count); -void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object); +int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count); + +void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object); void __hp_free_contract_context(struct hp_contract_context *ctx); static void *__hp_peer_message_thread_func(void *arg); static void *__hp_control_message_thread_func(void *arg); +int __hp_control_write(const uint8_t *buf, const uint32_t len); void __hp_on_control_message(const void *buf, const uint32_t len); static struct __hp_global_context gctx = {}; @@ -162,7 +165,7 @@ int hp_init(hp_contract_func contract_func) { // Create and populate hotpocket context. struct hp_contract_context ctx = {}; - __hp_parse_args_json(&gctx, &ctx, object); + __hp_parse_args_json(&ctx, object); free(root); // Start control channel listener. @@ -196,7 +199,7 @@ int hp_init(hp_contract_func contract_func) __hp_free_contract_context(&ctx); // Send termination control message. - write(gctx.control_fd, "{\"type\":\"contract_end\"}", 24); + __hp_control_write("{\"type\":\"contract_end\"}", 23); close(gctx.control_fd); return 0; } @@ -323,7 +326,46 @@ int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bu return writev(ctx->peers.fd, bufs, buf_count); } -void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object) +int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count) +{ + // We assume 'add' and 'remove' are pointing to a char buffer containing 'count' no. of char[64] buffers. + + // Calculate total json message length and prepare the json buf. + // Format: {"type":"unl_changeset","add":["pubkey1",...],"remove":["pubkey2",...]} + + const size_t json_size = 45 + (67 * add_count - (add_count ? 1 : 0)) + (67 * remove_count - (remove_count ? 1 : 0)); + char json_buf[json_size]; + + strncpy(json_buf, "{\"type\":\"unl_changeset\",\"add\":[", 31); + size_t pos = 31; + for (int i = 0; i < add_count; i++) + { + if (i > 0) + json_buf[pos++] = ','; + json_buf[pos++] = '"'; + strncpy(json_buf + pos, add + (i * 64), 64); + pos += 64; + json_buf[pos++] = '"'; + } + + strncpy(json_buf + pos, "],\"remove\":[", 12); + pos += 12; + for (int i = 0; i < remove_count; i++) + { + if (i > 0) + json_buf[pos++] = ','; + json_buf[pos++] = '"'; + strncpy(json_buf + pos, remove + (i * 64), 64); + pos += 64; + json_buf[pos++] = '"'; + } + + strncpy(json_buf + pos, "]}", 2); + + return __hp_control_write(json_buf, json_size); +} + +void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object) { const struct json_object_element_s *elem = object->start; do @@ -438,7 +480,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c } else if (strcmp(k->string, "hpfd") == 0) { - __HP_ASSIGN_INT(gctx->control_fd, elem); + __HP_ASSIGN_INT(gctx.control_fd, elem); } elem = elem->next; @@ -593,4 +635,15 @@ void __hp_free_contract_context(struct hp_contract_context *ctx) free(ctx->peers.list); } +int __hp_control_write(const uint8_t *buf, const uint32_t len) +{ + if (len > __HP_SEQPKT_BUF_SIZE) + { + fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE); + return -1; + } + + return write(gctx.control_fd, buf, len); +} + #endif \ No newline at end of file diff --git a/examples/nodejs_contract/hp-contract-lib.js b/examples/nodejs_contract/hp-contract-lib.js index 1688c399..7f3cc48b 100644 --- a/examples/nodejs_contract/hp-contract-lib.js +++ b/examples/nodejs_contract/hp-contract-lib.js @@ -2,6 +2,11 @@ const { EventEmitter } = require('events'); const fs = require('fs'); const MAX_SEQ_PACKET_SIZE = 128 * 1024; +const CONTROL_MESSAGE = { + CONTRACT_END: "contract_end", + UNL_CHANGESET: "unl_changeset" +} +Object.freeze(CONTROL_MESSAGE); class HotPocketContract { @@ -27,7 +32,7 @@ class HotPocketContract { const users = new UsersCollection(hpargs.userinfd, hpargs.users); const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events); - const executionContext = new ContractExecutionContext(hpargs, users, peers); + const executionContext = new ContractExecutionContext(hpargs, users, peers, this.#controlChannel); this.events.emit("session_start"); invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => { @@ -40,18 +45,21 @@ class HotPocketContract { } #terminate = () => { - this.#controlChannel.send(JSON.stringify({ type: "contract_end" })); + this.#controlChannel.send({ type: CONTROL_MESSAGE.CONTRACT_END }); this.#controlChannel.close(); } } class ContractExecutionContext { - constructor(hpargs, users, peers) { + #controlChannel = null; + + constructor(hpargs, users, peers, controlChannel) { this.readonly = hpargs.readonly; this.timestamp = hpargs.ts; this.users = users; this.peers = peers; + this.#controlChannel = controlChannel; if (!hpargs.readonly) { const lclParts = hpargs.lcl.split("-"); @@ -61,6 +69,12 @@ class ContractExecutionContext { }; } } + + async updateUnl(addArray, removeArray) { + if (this.readonly) + throw "UNL update not allowed in readonly mode." + await this.#controlChannel.send({ type: CONTROL_MESSAGE.UNL_CHANGESET, add: addArray, remove: removeArray }); + } } class UsersCollection { @@ -307,8 +321,8 @@ class ControlChannel { this.#readStream.on("error", (err) => { }); } - send(msg) { - const buf = Buffer.from(msg); + send(obj) { + const buf = Buffer.from(JSON.stringify(obj)); if (buf.length > MAX_SEQ_PACKET_SIZE) throw ("Control message exceeds max size " + MAX_SEQ_PACKET_SIZE); return writeAsync(this.#fd, buf); diff --git a/src/conf.cpp b/src/conf.cpp index 6e3fd983..41fe6a15 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -2,6 +2,7 @@ #include "conf.hpp" #include "crypto.hpp" #include "util/util.hpp" +#include "unl.hpp" namespace conf { @@ -29,9 +30,6 @@ namespace conf if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0) return -1; - // Append self pubkey to unl list. - cfg.unl.emplace(cfg.pubkey); - return 0; } @@ -216,6 +214,9 @@ namespace conf cfg.pubkeyhex = d["pubkeyhex"].as(); cfg.seckeyhex = d["seckeyhex"].as(); + // Convert the hex keys to binary and keep for later use. + if (hexpair_to_bin() != 0) + return -1; cfg.binary = d["binary"].as(); cfg.binargs = d["binargs"].as(); @@ -278,9 +279,14 @@ namespace conf std::cerr << "Error decoding unl list.\n"; return -1; } - cfg.unl.emplace(bin_pubkey); + cfg.unl.push_back(bin_pubkey); } + cfg.unl.push_back(cfg.pubkey); // Add self pubkey. + unl::add(cfg.unl); + // Remove self pubkey after sending to unl list. This is so that it doesn't get saved in the config in "rekey" mode. + cfg.unl.pop_back(); + cfg.peerport = d["peerport"].as(); cfg.pubport = d["pubport"].as(); cfg.roundtime = d["roundtime"].as(); @@ -318,10 +324,6 @@ namespace conf for (auto &v : d["loggers"].array_range()) cfg.loggers.emplace(v.as()); - // Convert the hex keys to binary and keep for later use. - if (hexpair_to_bin() != 0) - return -1; - return 0; } /** diff --git a/src/conf.hpp b/src/conf.hpp index 6b80dbe0..8af32470 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -93,14 +93,14 @@ namespace conf std::string appbill; // binary to execute for appbill std::string appbillargs; // any arguments to supply to appbill binary by default std::vector peers; // Vector of peers with ip_port, timestamp, capacity - std::unordered_set unl; // Unique node list (list of binary public keys) + std::vector unl; // Unique node list (list of binary public keys) uint16_t peerport = 0; // Listening port for peer connections uint16_t roundtime = 0; // Consensus round time in ms uint16_t pubport = 0; // Listening port for public user connections uint16_t peerdiscoverytime = 0; // Time interval in ms to find for peers dynamicpeerdiscovery should be on for this - uint16_t peeridletimeout = 0; // Idle connection timeout for peer connections in seconds. - uint16_t pubidletimeout = 0; // Idle connection timeout for user connections in seconds. + uint16_t peeridletimeout = 0; // Idle connection timeout for peer connections in seconds. + uint16_t pubidletimeout = 0; // Idle connection timeout for user connections in seconds. uint64_t pubmaxsize = 0; // User message max size in bytes uint64_t pubmaxcpm = 0; // User message rate (characters(bytes) per minute) diff --git a/src/consensus.cpp b/src/consensus.cpp index c5f91f54..c6288d1a 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -15,6 +15,7 @@ #include "hpfs/hpfs.hpp" #include "state/state_common.hpp" #include "state/state_sync.hpp" +#include "unl.hpp" #include "ledger.hpp" #include "consensus.hpp" @@ -117,7 +118,8 @@ namespace consensus // Get current lcl and state. std::string lcl = ledger::ctx.get_lcl(); - uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); + const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); + const size_t unl_count = unl::count(); hpfs::h32 state = state_common::ctx.get_state(); vote_counter votes; @@ -133,19 +135,19 @@ namespace consensus } else if (ctx.stage == 1) { - if (is_in_sync(lcl, votes)) + if (is_in_sync(lcl, unl_count, votes)) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, state); + const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state); broadcast_proposal(p); } } else if (ctx.stage == 2) { - if (is_in_sync(lcl, votes)) + if (is_in_sync(lcl, unl_count, votes)) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, state); + const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state); broadcast_proposal(p); } @@ -156,11 +158,11 @@ namespace consensus } else if (ctx.stage == 3) { - if (is_in_sync(lcl, votes)) + if (is_in_sync(lcl, unl_count, votes)) { // If we are in sync, vote and get the final winning votes. // This is the consensus proposal which makes it into the ledger and contract execution - const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, state); + const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state); // Update the ledger and execute the contract using the consensus proposal. if (update_ledger_and_execute_contract(p, lcl, state) == -1) @@ -173,12 +175,12 @@ namespace consensus return 0; } - bool is_in_sync(std::string_view lcl, vote_counter &votes) + bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes) { // Check if we're ahead/behind of consensus lcl. bool is_lcl_desync = false; std::string majority_lcl; - if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl)) + if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl, unl_count)) { // We proceed further only if lcl check was success (meaning lcl check could be reliably performed). @@ -514,7 +516,7 @@ namespace consensus return stg_prop; } - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state) + p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state) { // The proposal to be emited at the end of this stage. p2p::proposal stg_prop; @@ -552,7 +554,7 @@ namespace consensus increment(votes.outputs, hash); } - const uint32_t required_votes = ceil(vote_threshold * conf::cfg.unl.size()); + const uint32_t required_votes = ceil(vote_threshold * unl_count); // todo: check if inputs being proposed by another node are actually spoofed inputs // from a user locally connected to this node. @@ -637,7 +639,7 @@ namespace consensus * @param lcl Our lcl. * @return True if majority lcl could be calculated reliably. False if lcl check failed due to unreliable votes. */ - bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl) + bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl, const size_t unl_count) { uint32_t total_lcl_votes = 0; @@ -648,7 +650,7 @@ namespace consensus } // Check whether we have received enough votes in total. - const uint32_t min_required = ceil(MAJORITY_THRESHOLD * conf::cfg.unl.size()); + const uint32_t min_required = ceil(MAJORITY_THRESHOLD * unl_count); if (total_lcl_votes < min_required) { LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required; diff --git a/src/consensus.hpp b/src/consensus.hpp index b5f96b07..4de8ae91 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -97,7 +97,7 @@ namespace consensus int consensus(); - bool is_in_sync(std::string_view lcl, vote_counter &votes); + bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes); void revise_candidate_proposals(); @@ -111,11 +111,11 @@ namespace consensus p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state); - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state); + p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state); void broadcast_proposal(const p2p::proposal &p); - bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl); + bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl, const size_t unl_count); void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); diff --git a/src/msg/controlmsg_common.hpp b/src/msg/controlmsg_common.hpp index 48c85001..9d6785ae 100644 --- a/src/msg/controlmsg_common.hpp +++ b/src/msg/controlmsg_common.hpp @@ -7,9 +7,12 @@ namespace msg::controlmsg { // Message field names constexpr const char *FLD_TYPE = "type"; + constexpr const char *FLD_ADD = "add"; + constexpr const char *FLD_REMOVE = "remove"; // Message types constexpr const char *MSGTYPE_CONTRACT_END = "contract_end"; + constexpr const char *MSGTYPE_UNL_CHANGESET = "unl_changeset"; } // namespace msg::controlmsg diff --git a/src/msg/controlmsg_parser.cpp b/src/msg/controlmsg_parser.cpp index aa019382..c1d72827 100644 --- a/src/msg/controlmsg_parser.cpp +++ b/src/msg/controlmsg_parser.cpp @@ -8,12 +8,17 @@ namespace msg::controlmsg { int controlmsg_parser::parse(std::string_view message) { - return jctlmsg::parse_control_message(jsonDoc, message); + return jctlmsg::parse_control_message(jdoc, message); } int controlmsg_parser::extract_type(std::string &extracted_type) const { - return jctlmsg::extract_type(extracted_type, jsonDoc); + return jctlmsg::extract_type(extracted_type, jdoc); + } + + int controlmsg_parser::extract_unl_changeset(std::vector &additions, std::vector &removals) + { + return jctlmsg::extract_unl_changeset(additions, removals, jdoc); } } // namespace msg::controlmsg \ No newline at end of file diff --git a/src/msg/controlmsg_parser.hpp b/src/msg/controlmsg_parser.hpp index 1fdb6888..93082d21 100644 --- a/src/msg/controlmsg_parser.hpp +++ b/src/msg/controlmsg_parser.hpp @@ -7,12 +7,12 @@ namespace msg::controlmsg { class controlmsg_parser { - jsoncons::json jsonDoc; + jsoncons::json jdoc; public: int parse(std::string_view message); - int extract_type(std::string &extracted_type) const; + int extract_unl_changeset(std::vector &additions, std::vector &removals); }; } // namespace msg::controlmsg diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index db7af8d2..1edcf72a 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -5,6 +5,7 @@ #include "../../hplog.hpp" #include "../../hpfs/h32.hpp" #include "../../hpfs/hpfs.hpp" +#include "../../unl.hpp" #include "p2pmsg_container_generated.h" #include "p2pmsg_content_generated.h" #include "common_helpers.hpp" @@ -102,8 +103,8 @@ namespace msg::fbuf::p2pmsg return -1; } - //validate if the message is not from a node of current node's unl list. - if (!conf::cfg.unl.count(std::string(msg_pubkey))) + //validate if the message is not from a node listed in this node's unl list. + if (!unl::exists(std::string(msg_pubkey))) { LOG_DEBUG << "Peer message pubkey verification failed. Not in UNL."; return -1; diff --git a/src/msg/json/controlmsg_json.cpp b/src/msg/json/controlmsg_json.cpp index b08df865..252371f8 100644 --- a/src/msg/json/controlmsg_json.cpp +++ b/src/msg/json/controlmsg_json.cpp @@ -1,4 +1,6 @@ #include "../../pchheader.hpp" +#include "../../util/util.hpp" +#include "../../crypto.hpp" #include "../controlmsg_common.hpp" #include "controlmsg_json.hpp" @@ -32,14 +34,14 @@ namespace msg::controlmsg::json } catch (const std::exception &e) { - LOG_DEBUG << "User json message parsing failed."; + LOG_ERROR << "Control json message parsing failed. " << e.what(); return -1; } // Check existence of msg type field. if (!d.contains(msg::controlmsg::FLD_TYPE) || !d[msg::controlmsg::FLD_TYPE].is()) { - LOG_DEBUG << "User json message 'type' missing or invalid."; + LOG_ERROR << "Control json message 'type' missing or invalid."; return -1; } @@ -55,4 +57,40 @@ namespace msg::controlmsg::json return 0; } + /** + * Extracts unl additions and removals from the json document. + * Format: + * { + * "type": "unl_changeset", + * "add": ["pk1","pk2",...] + * "remove": ["pk1","pk2",...] + * } + */ + int extract_unl_changeset(std::vector &additions, std::vector &removals, const jsoncons::json &d) + { + extract_string_array(additions, d, FLD_ADD); + extract_string_array(removals, d, FLD_REMOVE); + return 0; + } + + void extract_string_array(std::vector &vec, const jsoncons::json &d, const char *field_name) + { + if (!d.contains(field_name) || !d[field_name].is_array()) + return; + + for (const auto &v : d[field_name].array_range()) + { + std::string hex_pubkey = "ed" + v.as(); + std::string bin_pubkey; + bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES); + if (util::hex2bin( + reinterpret_cast(bin_pubkey.data()), + bin_pubkey.length(), + hex_pubkey) != -1) + { + vec.push_back(bin_pubkey); + } + } + } + } // namespace msg::controlmsg::json \ No newline at end of file diff --git a/src/msg/json/controlmsg_json.hpp b/src/msg/json/controlmsg_json.hpp index 550762a7..8ea7c5cf 100644 --- a/src/msg/json/controlmsg_json.hpp +++ b/src/msg/json/controlmsg_json.hpp @@ -12,6 +12,10 @@ namespace msg::controlmsg::json int extract_type(std::string &extracted_type, const jsoncons::json &d); + int extract_unl_changeset(std::vector &additions, std::vector &removals, const jsoncons::json &d); + + void extract_string_array(std::vector &vec, const jsoncons::json &d, const char *field_name); + } // namespace msg::controlmsg::json #endif \ No newline at end of file diff --git a/src/msg/json/usrmsg_json.cpp b/src/msg/json/usrmsg_json.cpp index 66a37b73..58649b7a 100644 --- a/src/msg/json/usrmsg_json.cpp +++ b/src/msg/json/usrmsg_json.cpp @@ -303,7 +303,7 @@ namespace msg::usrmsg::json } catch (const std::exception &e) { - LOG_DEBUG << "User json message parsing failed."; + LOG_DEBUG << "User json message parsing failed. " << e.what();; return -1; } diff --git a/src/msg/usrmsg_parser.cpp b/src/msg/usrmsg_parser.cpp index 7b24c32f..65b0feff 100644 --- a/src/msg/usrmsg_parser.cpp +++ b/src/msg/usrmsg_parser.cpp @@ -49,33 +49,33 @@ namespace msg::usrmsg int usrmsg_parser::parse(std::string_view message) { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::parse_user_message(jsonDoc, message); + return jusrmsg::parse_user_message(jdoc, message); else - return busrmsg::parse_user_message(bsonDoc, message); + return busrmsg::parse_user_message(bdoc, message); } int usrmsg_parser::extract_type(std::string &extracted_type) const { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::extract_type(extracted_type, jsonDoc); + return jusrmsg::extract_type(extracted_type, jdoc); else - return busrmsg::extract_type(extracted_type, bsonDoc); + return busrmsg::extract_type(extracted_type, bdoc); } int usrmsg_parser::extract_read_request(std::string &extracted_content) const { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::extract_read_request(extracted_content, jsonDoc); + return jusrmsg::extract_read_request(extracted_content, jdoc); else - return busrmsg::extract_read_request(extracted_content, bsonDoc); + return busrmsg::extract_read_request(extracted_content, bdoc); } int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const { if (protocol == util::PROTOCOL::JSON) - return jusrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, jsonDoc); + return jusrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, jdoc); else - return busrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, bsonDoc); + return busrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, bdoc); } int usrmsg_parser::extract_input_container(std::string &input, std::string &nonce, diff --git a/src/msg/usrmsg_parser.hpp b/src/msg/usrmsg_parser.hpp index de3e10d7..12174aa1 100644 --- a/src/msg/usrmsg_parser.hpp +++ b/src/msg/usrmsg_parser.hpp @@ -12,8 +12,8 @@ namespace msg::usrmsg class usrmsg_parser { const util::PROTOCOL protocol; - jsoncons::json jsonDoc; - jsoncons::ojson bsonDoc; + jsoncons::json jdoc; + jsoncons::ojson bdoc; public: usrmsg_parser(const util::PROTOCOL protocol); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 9eca33b0..98c22591 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -2,6 +2,7 @@ #include "../util/util.hpp" #include "../msg/fbuf/p2pmsg_helpers.hpp" #include "../ledger.hpp" +#include "../unl.hpp" #include "peer_comm_server.hpp" #include "peer_comm_session.hpp" #include "self_node.hpp" @@ -199,7 +200,7 @@ namespace p2p if (connected_status_check_counter == 600) { // One is added to session list size to reflect the loop back connection. - const bool current_state = (sessions.size() + 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD); + const bool current_state = (sessions.size() + 1) < (unl::count() * WEAKLY_CONNECTED_THRESHOLD); if (is_weakly_connected != current_state) { is_weakly_connected = !is_weakly_connected; diff --git a/src/sc.cpp b/src/sc.cpp index 14d4bb0b..302593e7 100644 --- a/src/sc.cpp +++ b/src/sc.cpp @@ -8,6 +8,7 @@ #include "msg/fbuf/p2pmsg_helpers.hpp" #include "msg/controlmsg_common.hpp" #include "msg/controlmsg_parser.hpp" +#include "unl.hpp" namespace sc { @@ -276,26 +277,9 @@ namespace sc user_json_to_stream(ctx.userfds, ctx.args.userbufs, os); - os << "},\"unl\":["; + os << "},\"unl\":" << unl::get_json() << "}"; - for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++) - { - if (nodepk != conf::cfg.unl.begin()) - os << ","; // Trailing comma separator for previous element. - - // Convert binary nodepk into hex. - std::string pubkeyhex; - util::bin2hex( - pubkeyhex, - reinterpret_cast((*nodepk).data()) + 1, - (*nodepk).length() - 1); - - os << "\"" << pubkeyhex << "\""; - } - - os << "]}"; - - // Get the json string that should be written to contract input pipe. + // Get the final json string that should be written to contract input pipe. const std::string json = os.str(); // Establish contract input pipe. @@ -735,6 +719,7 @@ namespace sc const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes); output.resize(bytes_to_read); const int read_res = read(readfd, output.data(), bytes_to_read); + output.resize(read_res); if (read_res >= 0) { @@ -838,6 +823,12 @@ namespace sc { ctx.termination_signaled = true; } + else if (type == msg::controlmsg::MSGTYPE_UNL_CHANGESET && !ctx.args.readonly) + { + std::vector additions, removals; + parser.extract_unl_changeset(additions, removals); + unl::update(additions, removals); + } } } // namespace sc diff --git a/src/unl.cpp b/src/unl.cpp new file mode 100644 index 00000000..d8b7629b --- /dev/null +++ b/src/unl.cpp @@ -0,0 +1,92 @@ +#include "util/util.hpp" +#include "hplog.hpp" +#include "unl.hpp" + +/** + * Manages the UNL public keys of this node. + */ +namespace unl +{ + std::set list; // List of binary pubkeys of UNL. + std::string json_list; // Stringified json array of UNL. (To be fed into the contract args) + std::shared_mutex unl_mutex; + + size_t count() + { + std::shared_lock lock(unl_mutex); + return list.size(); + } + + std::set get() + { + std::shared_lock lock(unl_mutex); + return list; + } + + std::string get_json() + { + std::shared_lock lock(unl_mutex); + return json_list; + } + + bool exists(const std::string &bin_pubkey) + { + std::shared_lock lock(unl_mutex); + return list.find(bin_pubkey) != list.end(); + } + + void add(const std::vector &additions) + { + if (additions.empty()) + return; + + std::unique_lock lock(unl_mutex); + + for (const std::string &pubkey : additions) + list.emplace(pubkey); + + update_json_list(); + } + + void update(const std::vector &additions, const std::vector &removals) + { + if (additions.empty() && removals.empty()) + return; + + std::unique_lock lock(unl_mutex); + + for (const std::string &pubkey : additions) + list.emplace(pubkey); + + for (const std::string &pubkey : removals) + list.erase(pubkey); + + update_json_list(); + + LOG_INFO << "UNL updated. Count:" << list.size(); + ; + } + + void update_json_list() + { + std::ostringstream os; + os << "["; + for (auto pk = list.begin(); pk != list.end(); pk++) + { + if (pk != list.begin()) + os << ","; // Trailing comma separator for previous element. + + // Convert binary pubkey into hex. + std::string pubkeyhex; + util::bin2hex( + pubkeyhex, + reinterpret_cast(pk->data()) + 1, + pk->length() - 1); + + os << "\"" << pubkeyhex << "\""; + } + os << "]"; + json_list = os.str(); + } + +} // namespace unl diff --git a/src/unl.hpp b/src/unl.hpp new file mode 100644 index 00000000..d76e8084 --- /dev/null +++ b/src/unl.hpp @@ -0,0 +1,21 @@ +#ifndef _HP_UNL_ +#define _HP_UNL_ + +#include "pchheader.hpp" + +/** + * Manages the UNL public keys of this node. + */ +namespace unl +{ + size_t count(); + std::set get(); + std::string get_json(); + bool exists(const std::string &bin_pubkey); + void add(const std::vector &additions); + void update(const std::vector &additions, const std::vector &removals); + void update_json_list(); + +} // namespace unl + +#endif