mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Introduced UNL update control message. (#169)
This commit is contained in:
@@ -28,6 +28,7 @@ add_executable(hpcore
|
||||
src/util/rollover_hashset.cpp
|
||||
src/util/ttl_set.cpp
|
||||
src/util/buffer_store.cpp
|
||||
src/unl.cpp
|
||||
src/crypto.cpp
|
||||
src/conf.cpp
|
||||
src/hplog.cpp
|
||||
|
||||
@@ -131,12 +131,15 @@ int hp_user_writev(const struct hp_user *user, const struct iovec *bufs, const i
|
||||
int hp_peer_write(const struct hp_contract_context *ctx, const uint8_t *buf, const uint32_t len);
|
||||
int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bufs, const int buf_count);
|
||||
|
||||
void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object);
|
||||
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count);
|
||||
|
||||
void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object);
|
||||
void __hp_free_contract_context(struct hp_contract_context *ctx);
|
||||
|
||||
static void *__hp_peer_message_thread_func(void *arg);
|
||||
|
||||
static void *__hp_control_message_thread_func(void *arg);
|
||||
int __hp_control_write(const uint8_t *buf, const uint32_t len);
|
||||
void __hp_on_control_message(const void *buf, const uint32_t len);
|
||||
|
||||
static struct __hp_global_context gctx = {};
|
||||
@@ -162,7 +165,7 @@ int hp_init(hp_contract_func contract_func)
|
||||
{
|
||||
// Create and populate hotpocket context.
|
||||
struct hp_contract_context ctx = {};
|
||||
__hp_parse_args_json(&gctx, &ctx, object);
|
||||
__hp_parse_args_json(&ctx, object);
|
||||
free(root);
|
||||
|
||||
// Start control channel listener.
|
||||
@@ -196,7 +199,7 @@ int hp_init(hp_contract_func contract_func)
|
||||
__hp_free_contract_context(&ctx);
|
||||
|
||||
// Send termination control message.
|
||||
write(gctx.control_fd, "{\"type\":\"contract_end\"}", 24);
|
||||
__hp_control_write("{\"type\":\"contract_end\"}", 23);
|
||||
close(gctx.control_fd);
|
||||
return 0;
|
||||
}
|
||||
@@ -323,7 +326,46 @@ int hp_peer_writev(const struct hp_contract_context *ctx, const struct iovec *bu
|
||||
return writev(ctx->peers.fd, bufs, buf_count);
|
||||
}
|
||||
|
||||
void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_context *ctx, const struct json_object_s *object)
|
||||
int hp_update_unl(const char *add, const size_t add_count, const char *remove, const size_t remove_count)
|
||||
{
|
||||
// We assume 'add' and 'remove' are pointing to a char buffer containing 'count' no. of char[64] buffers.
|
||||
|
||||
// Calculate total json message length and prepare the json buf.
|
||||
// Format: {"type":"unl_changeset","add":["pubkey1",...],"remove":["pubkey2",...]}
|
||||
|
||||
const size_t json_size = 45 + (67 * add_count - (add_count ? 1 : 0)) + (67 * remove_count - (remove_count ? 1 : 0));
|
||||
char json_buf[json_size];
|
||||
|
||||
strncpy(json_buf, "{\"type\":\"unl_changeset\",\"add\":[", 31);
|
||||
size_t pos = 31;
|
||||
for (int i = 0; i < add_count; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
json_buf[pos++] = ',';
|
||||
json_buf[pos++] = '"';
|
||||
strncpy(json_buf + pos, add + (i * 64), 64);
|
||||
pos += 64;
|
||||
json_buf[pos++] = '"';
|
||||
}
|
||||
|
||||
strncpy(json_buf + pos, "],\"remove\":[", 12);
|
||||
pos += 12;
|
||||
for (int i = 0; i < remove_count; i++)
|
||||
{
|
||||
if (i > 0)
|
||||
json_buf[pos++] = ',';
|
||||
json_buf[pos++] = '"';
|
||||
strncpy(json_buf + pos, remove + (i * 64), 64);
|
||||
pos += 64;
|
||||
json_buf[pos++] = '"';
|
||||
}
|
||||
|
||||
strncpy(json_buf + pos, "]}", 2);
|
||||
|
||||
return __hp_control_write(json_buf, json_size);
|
||||
}
|
||||
|
||||
void __hp_parse_args_json(struct hp_contract_context *ctx, const struct json_object_s *object)
|
||||
{
|
||||
const struct json_object_element_s *elem = object->start;
|
||||
do
|
||||
@@ -438,7 +480,7 @@ void __hp_parse_args_json(struct __hp_global_context *gctx, struct hp_contract_c
|
||||
}
|
||||
else if (strcmp(k->string, "hpfd") == 0)
|
||||
{
|
||||
__HP_ASSIGN_INT(gctx->control_fd, elem);
|
||||
__HP_ASSIGN_INT(gctx.control_fd, elem);
|
||||
}
|
||||
|
||||
elem = elem->next;
|
||||
@@ -593,4 +635,15 @@ void __hp_free_contract_context(struct hp_contract_context *ctx)
|
||||
free(ctx->peers.list);
|
||||
}
|
||||
|
||||
int __hp_control_write(const uint8_t *buf, const uint32_t len)
|
||||
{
|
||||
if (len > __HP_SEQPKT_BUF_SIZE)
|
||||
{
|
||||
fprintf(stderr, "Control message exceeds max length %d.", __HP_SEQPKT_BUF_SIZE);
|
||||
return -1;
|
||||
}
|
||||
|
||||
return write(gctx.control_fd, buf, len);
|
||||
}
|
||||
|
||||
#endif
|
||||
@@ -2,6 +2,11 @@ const { EventEmitter } = require('events');
|
||||
const fs = require('fs');
|
||||
|
||||
const MAX_SEQ_PACKET_SIZE = 128 * 1024;
|
||||
const CONTROL_MESSAGE = {
|
||||
CONTRACT_END: "contract_end",
|
||||
UNL_CHANGESET: "unl_changeset"
|
||||
}
|
||||
Object.freeze(CONTROL_MESSAGE);
|
||||
|
||||
class HotPocketContract {
|
||||
|
||||
@@ -27,7 +32,7 @@ class HotPocketContract {
|
||||
|
||||
const users = new UsersCollection(hpargs.userinfd, hpargs.users);
|
||||
const peers = new PeersCollection(hpargs.readonly, hpargs.unl, hpargs.nplfd, pendingTasks, this.events);
|
||||
const executionContext = new ContractExecutionContext(hpargs, users, peers);
|
||||
const executionContext = new ContractExecutionContext(hpargs, users, peers, this.#controlChannel);
|
||||
|
||||
this.events.emit("session_start");
|
||||
invokeCallback(contractFunc, executionContext).catch(errHandler).finally(() => {
|
||||
@@ -40,18 +45,21 @@ class HotPocketContract {
|
||||
}
|
||||
|
||||
#terminate = () => {
|
||||
this.#controlChannel.send(JSON.stringify({ type: "contract_end" }));
|
||||
this.#controlChannel.send({ type: CONTROL_MESSAGE.CONTRACT_END });
|
||||
this.#controlChannel.close();
|
||||
}
|
||||
}
|
||||
|
||||
class ContractExecutionContext {
|
||||
|
||||
constructor(hpargs, users, peers) {
|
||||
#controlChannel = null;
|
||||
|
||||
constructor(hpargs, users, peers, controlChannel) {
|
||||
this.readonly = hpargs.readonly;
|
||||
this.timestamp = hpargs.ts;
|
||||
this.users = users;
|
||||
this.peers = peers;
|
||||
this.#controlChannel = controlChannel;
|
||||
|
||||
if (!hpargs.readonly) {
|
||||
const lclParts = hpargs.lcl.split("-");
|
||||
@@ -61,6 +69,12 @@ class ContractExecutionContext {
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
async updateUnl(addArray, removeArray) {
|
||||
if (this.readonly)
|
||||
throw "UNL update not allowed in readonly mode."
|
||||
await this.#controlChannel.send({ type: CONTROL_MESSAGE.UNL_CHANGESET, add: addArray, remove: removeArray });
|
||||
}
|
||||
}
|
||||
|
||||
class UsersCollection {
|
||||
@@ -307,8 +321,8 @@ class ControlChannel {
|
||||
this.#readStream.on("error", (err) => { });
|
||||
}
|
||||
|
||||
send(msg) {
|
||||
const buf = Buffer.from(msg);
|
||||
send(obj) {
|
||||
const buf = Buffer.from(JSON.stringify(obj));
|
||||
if (buf.length > MAX_SEQ_PACKET_SIZE)
|
||||
throw ("Control message exceeds max size " + MAX_SEQ_PACKET_SIZE);
|
||||
return writeAsync(this.#fd, buf);
|
||||
|
||||
18
src/conf.cpp
18
src/conf.cpp
@@ -2,6 +2,7 @@
|
||||
#include "conf.hpp"
|
||||
#include "crypto.hpp"
|
||||
#include "util/util.hpp"
|
||||
#include "unl.hpp"
|
||||
|
||||
namespace conf
|
||||
{
|
||||
@@ -29,9 +30,6 @@ namespace conf
|
||||
if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0)
|
||||
return -1;
|
||||
|
||||
// Append self pubkey to unl list.
|
||||
cfg.unl.emplace(cfg.pubkey);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
@@ -216,6 +214,9 @@ namespace conf
|
||||
|
||||
cfg.pubkeyhex = d["pubkeyhex"].as<std::string>();
|
||||
cfg.seckeyhex = d["seckeyhex"].as<std::string>();
|
||||
// Convert the hex keys to binary and keep for later use.
|
||||
if (hexpair_to_bin() != 0)
|
||||
return -1;
|
||||
|
||||
cfg.binary = d["binary"].as<std::string>();
|
||||
cfg.binargs = d["binargs"].as<std::string>();
|
||||
@@ -278,9 +279,14 @@ namespace conf
|
||||
std::cerr << "Error decoding unl list.\n";
|
||||
return -1;
|
||||
}
|
||||
cfg.unl.emplace(bin_pubkey);
|
||||
cfg.unl.push_back(bin_pubkey);
|
||||
}
|
||||
|
||||
cfg.unl.push_back(cfg.pubkey); // Add self pubkey.
|
||||
unl::add(cfg.unl);
|
||||
// Remove self pubkey after sending to unl list. This is so that it doesn't get saved in the config in "rekey" mode.
|
||||
cfg.unl.pop_back();
|
||||
|
||||
cfg.peerport = d["peerport"].as<uint16_t>();
|
||||
cfg.pubport = d["pubport"].as<uint16_t>();
|
||||
cfg.roundtime = d["roundtime"].as<uint16_t>();
|
||||
@@ -318,10 +324,6 @@ namespace conf
|
||||
for (auto &v : d["loggers"].array_range())
|
||||
cfg.loggers.emplace(v.as<std::string>());
|
||||
|
||||
// Convert the hex keys to binary and keep for later use.
|
||||
if (hexpair_to_bin() != 0)
|
||||
return -1;
|
||||
|
||||
return 0;
|
||||
}
|
||||
/**
|
||||
|
||||
@@ -93,14 +93,14 @@ namespace conf
|
||||
std::string appbill; // binary to execute for appbill
|
||||
std::string appbillargs; // any arguments to supply to appbill binary by default
|
||||
std::vector<peer_properties> peers; // Vector of peers with ip_port, timestamp, capacity
|
||||
std::unordered_set<std::string> unl; // Unique node list (list of binary public keys)
|
||||
std::vector<std::string> unl; // Unique node list (list of binary public keys)
|
||||
uint16_t peerport = 0; // Listening port for peer connections
|
||||
uint16_t roundtime = 0; // Consensus round time in ms
|
||||
uint16_t pubport = 0; // Listening port for public user connections
|
||||
uint16_t peerdiscoverytime = 0; // Time interval in ms to find for peers dynamicpeerdiscovery should be on for this
|
||||
|
||||
uint16_t peeridletimeout = 0; // Idle connection timeout for peer connections in seconds.
|
||||
uint16_t pubidletimeout = 0; // Idle connection timeout for user connections in seconds.
|
||||
uint16_t peeridletimeout = 0; // Idle connection timeout for peer connections in seconds.
|
||||
uint16_t pubidletimeout = 0; // Idle connection timeout for user connections in seconds.
|
||||
|
||||
uint64_t pubmaxsize = 0; // User message max size in bytes
|
||||
uint64_t pubmaxcpm = 0; // User message rate (characters(bytes) per minute)
|
||||
|
||||
@@ -15,6 +15,7 @@
|
||||
#include "hpfs/hpfs.hpp"
|
||||
#include "state/state_common.hpp"
|
||||
#include "state/state_sync.hpp"
|
||||
#include "unl.hpp"
|
||||
#include "ledger.hpp"
|
||||
#include "consensus.hpp"
|
||||
|
||||
@@ -117,7 +118,8 @@ namespace consensus
|
||||
|
||||
// Get current lcl and state.
|
||||
std::string lcl = ledger::ctx.get_lcl();
|
||||
uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
|
||||
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
|
||||
const size_t unl_count = unl::count();
|
||||
hpfs::h32 state = state_common::ctx.get_state();
|
||||
vote_counter votes;
|
||||
|
||||
@@ -133,19 +135,19 @@ namespace consensus
|
||||
}
|
||||
else if (ctx.stage == 1)
|
||||
{
|
||||
if (is_in_sync(lcl, votes))
|
||||
if (is_in_sync(lcl, unl_count, votes))
|
||||
{
|
||||
// If we are in sync, vote and broadcast the winning votes to next stage.
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, state);
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state);
|
||||
broadcast_proposal(p);
|
||||
}
|
||||
}
|
||||
else if (ctx.stage == 2)
|
||||
{
|
||||
if (is_in_sync(lcl, votes))
|
||||
if (is_in_sync(lcl, unl_count, votes))
|
||||
{
|
||||
// If we are in sync, vote and broadcast the winning votes to next stage.
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, state);
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state);
|
||||
broadcast_proposal(p);
|
||||
}
|
||||
|
||||
@@ -156,11 +158,11 @@ namespace consensus
|
||||
}
|
||||
else if (ctx.stage == 3)
|
||||
{
|
||||
if (is_in_sync(lcl, votes))
|
||||
if (is_in_sync(lcl, unl_count, votes))
|
||||
{
|
||||
// If we are in sync, vote and get the final winning votes.
|
||||
// This is the consensus proposal which makes it into the ledger and contract execution
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, state);
|
||||
const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state);
|
||||
|
||||
// Update the ledger and execute the contract using the consensus proposal.
|
||||
if (update_ledger_and_execute_contract(p, lcl, state) == -1)
|
||||
@@ -173,12 +175,12 @@ namespace consensus
|
||||
return 0;
|
||||
}
|
||||
|
||||
bool is_in_sync(std::string_view lcl, vote_counter &votes)
|
||||
bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes)
|
||||
{
|
||||
// Check if we're ahead/behind of consensus lcl.
|
||||
bool is_lcl_desync = false;
|
||||
std::string majority_lcl;
|
||||
if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl))
|
||||
if (check_lcl_votes(is_lcl_desync, majority_lcl, votes, lcl, unl_count))
|
||||
{
|
||||
// We proceed further only if lcl check was success (meaning lcl check could be reliably performed).
|
||||
|
||||
@@ -514,7 +516,7 @@ namespace consensus
|
||||
return stg_prop;
|
||||
}
|
||||
|
||||
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state)
|
||||
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state)
|
||||
{
|
||||
// The proposal to be emited at the end of this stage.
|
||||
p2p::proposal stg_prop;
|
||||
@@ -552,7 +554,7 @@ namespace consensus
|
||||
increment(votes.outputs, hash);
|
||||
}
|
||||
|
||||
const uint32_t required_votes = ceil(vote_threshold * conf::cfg.unl.size());
|
||||
const uint32_t required_votes = ceil(vote_threshold * unl_count);
|
||||
|
||||
// todo: check if inputs being proposed by another node are actually spoofed inputs
|
||||
// from a user locally connected to this node.
|
||||
@@ -637,7 +639,7 @@ namespace consensus
|
||||
* @param lcl Our lcl.
|
||||
* @return True if majority lcl could be calculated reliably. False if lcl check failed due to unreliable votes.
|
||||
*/
|
||||
bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl)
|
||||
bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl, const size_t unl_count)
|
||||
{
|
||||
uint32_t total_lcl_votes = 0;
|
||||
|
||||
@@ -648,7 +650,7 @@ namespace consensus
|
||||
}
|
||||
|
||||
// Check whether we have received enough votes in total.
|
||||
const uint32_t min_required = ceil(MAJORITY_THRESHOLD * conf::cfg.unl.size());
|
||||
const uint32_t min_required = ceil(MAJORITY_THRESHOLD * unl_count);
|
||||
if (total_lcl_votes < min_required)
|
||||
{
|
||||
LOG_DEBUG << "Not enough peers proposing to perform consensus. votes:" << total_lcl_votes << " needed:" << min_required;
|
||||
|
||||
@@ -97,7 +97,7 @@ namespace consensus
|
||||
|
||||
int consensus();
|
||||
|
||||
bool is_in_sync(std::string_view lcl, vote_counter &votes);
|
||||
bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes);
|
||||
|
||||
void revise_candidate_proposals();
|
||||
|
||||
@@ -111,11 +111,11 @@ namespace consensus
|
||||
|
||||
p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state);
|
||||
|
||||
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, hpfs::h32 state);
|
||||
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state);
|
||||
|
||||
void broadcast_proposal(const p2p::proposal &p);
|
||||
|
||||
bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl);
|
||||
bool check_lcl_votes(bool &is_desync, std::string &majority_lcl, vote_counter &votes, std::string_view lcl, const size_t unl_count);
|
||||
|
||||
void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes);
|
||||
|
||||
|
||||
@@ -7,9 +7,12 @@ namespace msg::controlmsg
|
||||
{
|
||||
// Message field names
|
||||
constexpr const char *FLD_TYPE = "type";
|
||||
constexpr const char *FLD_ADD = "add";
|
||||
constexpr const char *FLD_REMOVE = "remove";
|
||||
|
||||
// Message types
|
||||
constexpr const char *MSGTYPE_CONTRACT_END = "contract_end";
|
||||
constexpr const char *MSGTYPE_UNL_CHANGESET = "unl_changeset";
|
||||
|
||||
} // namespace msg::controlmsg
|
||||
|
||||
|
||||
@@ -8,12 +8,17 @@ namespace msg::controlmsg
|
||||
{
|
||||
int controlmsg_parser::parse(std::string_view message)
|
||||
{
|
||||
return jctlmsg::parse_control_message(jsonDoc, message);
|
||||
return jctlmsg::parse_control_message(jdoc, message);
|
||||
}
|
||||
|
||||
int controlmsg_parser::extract_type(std::string &extracted_type) const
|
||||
{
|
||||
return jctlmsg::extract_type(extracted_type, jsonDoc);
|
||||
return jctlmsg::extract_type(extracted_type, jdoc);
|
||||
}
|
||||
|
||||
int controlmsg_parser::extract_unl_changeset(std::vector<std::string> &additions, std::vector<std::string> &removals)
|
||||
{
|
||||
return jctlmsg::extract_unl_changeset(additions, removals, jdoc);
|
||||
}
|
||||
|
||||
} // namespace msg::controlmsg
|
||||
@@ -7,12 +7,12 @@ namespace msg::controlmsg
|
||||
{
|
||||
class controlmsg_parser
|
||||
{
|
||||
jsoncons::json jsonDoc;
|
||||
jsoncons::json jdoc;
|
||||
|
||||
public:
|
||||
int parse(std::string_view message);
|
||||
|
||||
int extract_type(std::string &extracted_type) const;
|
||||
int extract_unl_changeset(std::vector<std::string> &additions, std::vector<std::string> &removals);
|
||||
};
|
||||
|
||||
} // namespace msg::controlmsg
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
#include "../../hplog.hpp"
|
||||
#include "../../hpfs/h32.hpp"
|
||||
#include "../../hpfs/hpfs.hpp"
|
||||
#include "../../unl.hpp"
|
||||
#include "p2pmsg_container_generated.h"
|
||||
#include "p2pmsg_content_generated.h"
|
||||
#include "common_helpers.hpp"
|
||||
@@ -102,8 +103,8 @@ namespace msg::fbuf::p2pmsg
|
||||
return -1;
|
||||
}
|
||||
|
||||
//validate if the message is not from a node of current node's unl list.
|
||||
if (!conf::cfg.unl.count(std::string(msg_pubkey)))
|
||||
//validate if the message is not from a node listed in this node's unl list.
|
||||
if (!unl::exists(std::string(msg_pubkey)))
|
||||
{
|
||||
LOG_DEBUG << "Peer message pubkey verification failed. Not in UNL.";
|
||||
return -1;
|
||||
|
||||
@@ -1,4 +1,6 @@
|
||||
#include "../../pchheader.hpp"
|
||||
#include "../../util/util.hpp"
|
||||
#include "../../crypto.hpp"
|
||||
#include "../controlmsg_common.hpp"
|
||||
#include "controlmsg_json.hpp"
|
||||
|
||||
@@ -32,14 +34,14 @@ namespace msg::controlmsg::json
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
LOG_DEBUG << "User json message parsing failed.";
|
||||
LOG_ERROR << "Control json message parsing failed. " << e.what();
|
||||
return -1;
|
||||
}
|
||||
|
||||
// Check existence of msg type field.
|
||||
if (!d.contains(msg::controlmsg::FLD_TYPE) || !d[msg::controlmsg::FLD_TYPE].is<std::string>())
|
||||
{
|
||||
LOG_DEBUG << "User json message 'type' missing or invalid.";
|
||||
LOG_ERROR << "Control json message 'type' missing or invalid.";
|
||||
return -1;
|
||||
}
|
||||
|
||||
@@ -55,4 +57,40 @@ namespace msg::controlmsg::json
|
||||
return 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Extracts unl additions and removals from the json document.
|
||||
* Format:
|
||||
* {
|
||||
* "type": "unl_changeset",
|
||||
* "add": ["pk1","pk2",...]
|
||||
* "remove": ["pk1","pk2",...]
|
||||
* }
|
||||
*/
|
||||
int extract_unl_changeset(std::vector<std::string> &additions, std::vector<std::string> &removals, const jsoncons::json &d)
|
||||
{
|
||||
extract_string_array(additions, d, FLD_ADD);
|
||||
extract_string_array(removals, d, FLD_REMOVE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void extract_string_array(std::vector<std::string> &vec, const jsoncons::json &d, const char *field_name)
|
||||
{
|
||||
if (!d.contains(field_name) || !d[field_name].is_array())
|
||||
return;
|
||||
|
||||
for (const auto &v : d[field_name].array_range())
|
||||
{
|
||||
std::string hex_pubkey = "ed" + v.as<std::string>();
|
||||
std::string bin_pubkey;
|
||||
bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES);
|
||||
if (util::hex2bin(
|
||||
reinterpret_cast<unsigned char *>(bin_pubkey.data()),
|
||||
bin_pubkey.length(),
|
||||
hex_pubkey) != -1)
|
||||
{
|
||||
vec.push_back(bin_pubkey);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace msg::controlmsg::json
|
||||
@@ -12,6 +12,10 @@ namespace msg::controlmsg::json
|
||||
|
||||
int extract_type(std::string &extracted_type, const jsoncons::json &d);
|
||||
|
||||
int extract_unl_changeset(std::vector<std::string> &additions, std::vector<std::string> &removals, const jsoncons::json &d);
|
||||
|
||||
void extract_string_array(std::vector<std::string> &vec, const jsoncons::json &d, const char *field_name);
|
||||
|
||||
} // namespace msg::controlmsg::json
|
||||
|
||||
#endif
|
||||
@@ -303,7 +303,7 @@ namespace msg::usrmsg::json
|
||||
}
|
||||
catch (const std::exception &e)
|
||||
{
|
||||
LOG_DEBUG << "User json message parsing failed.";
|
||||
LOG_DEBUG << "User json message parsing failed. " << e.what();;
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
||||
@@ -49,33 +49,33 @@ namespace msg::usrmsg
|
||||
int usrmsg_parser::parse(std::string_view message)
|
||||
{
|
||||
if (protocol == util::PROTOCOL::JSON)
|
||||
return jusrmsg::parse_user_message(jsonDoc, message);
|
||||
return jusrmsg::parse_user_message(jdoc, message);
|
||||
else
|
||||
return busrmsg::parse_user_message(bsonDoc, message);
|
||||
return busrmsg::parse_user_message(bdoc, message);
|
||||
}
|
||||
|
||||
int usrmsg_parser::extract_type(std::string &extracted_type) const
|
||||
{
|
||||
if (protocol == util::PROTOCOL::JSON)
|
||||
return jusrmsg::extract_type(extracted_type, jsonDoc);
|
||||
return jusrmsg::extract_type(extracted_type, jdoc);
|
||||
else
|
||||
return busrmsg::extract_type(extracted_type, bsonDoc);
|
||||
return busrmsg::extract_type(extracted_type, bdoc);
|
||||
}
|
||||
|
||||
int usrmsg_parser::extract_read_request(std::string &extracted_content) const
|
||||
{
|
||||
if (protocol == util::PROTOCOL::JSON)
|
||||
return jusrmsg::extract_read_request(extracted_content, jsonDoc);
|
||||
return jusrmsg::extract_read_request(extracted_content, jdoc);
|
||||
else
|
||||
return busrmsg::extract_read_request(extracted_content, bsonDoc);
|
||||
return busrmsg::extract_read_request(extracted_content, bdoc);
|
||||
}
|
||||
|
||||
int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const
|
||||
{
|
||||
if (protocol == util::PROTOCOL::JSON)
|
||||
return jusrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, jsonDoc);
|
||||
return jusrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, jdoc);
|
||||
else
|
||||
return busrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, bsonDoc);
|
||||
return busrmsg::extract_signed_input_container(extracted_input_container, extracted_sig, bdoc);
|
||||
}
|
||||
|
||||
int usrmsg_parser::extract_input_container(std::string &input, std::string &nonce,
|
||||
|
||||
@@ -12,8 +12,8 @@ namespace msg::usrmsg
|
||||
class usrmsg_parser
|
||||
{
|
||||
const util::PROTOCOL protocol;
|
||||
jsoncons::json jsonDoc;
|
||||
jsoncons::ojson bsonDoc;
|
||||
jsoncons::json jdoc;
|
||||
jsoncons::ojson bdoc;
|
||||
|
||||
public:
|
||||
usrmsg_parser(const util::PROTOCOL protocol);
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
#include "../util/util.hpp"
|
||||
#include "../msg/fbuf/p2pmsg_helpers.hpp"
|
||||
#include "../ledger.hpp"
|
||||
#include "../unl.hpp"
|
||||
#include "peer_comm_server.hpp"
|
||||
#include "peer_comm_session.hpp"
|
||||
#include "self_node.hpp"
|
||||
@@ -199,7 +200,7 @@ namespace p2p
|
||||
if (connected_status_check_counter == 600)
|
||||
{
|
||||
// One is added to session list size to reflect the loop back connection.
|
||||
const bool current_state = (sessions.size() + 1) < (conf::cfg.unl.size() * WEAKLY_CONNECTED_THRESHOLD);
|
||||
const bool current_state = (sessions.size() + 1) < (unl::count() * WEAKLY_CONNECTED_THRESHOLD);
|
||||
if (is_weakly_connected != current_state)
|
||||
{
|
||||
is_weakly_connected = !is_weakly_connected;
|
||||
|
||||
29
src/sc.cpp
29
src/sc.cpp
@@ -8,6 +8,7 @@
|
||||
#include "msg/fbuf/p2pmsg_helpers.hpp"
|
||||
#include "msg/controlmsg_common.hpp"
|
||||
#include "msg/controlmsg_parser.hpp"
|
||||
#include "unl.hpp"
|
||||
|
||||
namespace sc
|
||||
{
|
||||
@@ -276,26 +277,9 @@ namespace sc
|
||||
|
||||
user_json_to_stream(ctx.userfds, ctx.args.userbufs, os);
|
||||
|
||||
os << "},\"unl\":[";
|
||||
os << "},\"unl\":" << unl::get_json() << "}";
|
||||
|
||||
for (auto nodepk = conf::cfg.unl.begin(); nodepk != conf::cfg.unl.end(); nodepk++)
|
||||
{
|
||||
if (nodepk != conf::cfg.unl.begin())
|
||||
os << ","; // Trailing comma separator for previous element.
|
||||
|
||||
// Convert binary nodepk into hex.
|
||||
std::string pubkeyhex;
|
||||
util::bin2hex(
|
||||
pubkeyhex,
|
||||
reinterpret_cast<const unsigned char *>((*nodepk).data()) + 1,
|
||||
(*nodepk).length() - 1);
|
||||
|
||||
os << "\"" << pubkeyhex << "\"";
|
||||
}
|
||||
|
||||
os << "]}";
|
||||
|
||||
// Get the json string that should be written to contract input pipe.
|
||||
// Get the final json string that should be written to contract input pipe.
|
||||
const std::string json = os.str();
|
||||
|
||||
// Establish contract input pipe.
|
||||
@@ -735,6 +719,7 @@ namespace sc
|
||||
const size_t bytes_to_read = is_stream_socket ? available_bytes : MIN(MAX_SEQ_PACKET_SIZE, available_bytes);
|
||||
output.resize(bytes_to_read);
|
||||
const int read_res = read(readfd, output.data(), bytes_to_read);
|
||||
output.resize(read_res);
|
||||
|
||||
if (read_res >= 0)
|
||||
{
|
||||
@@ -838,6 +823,12 @@ namespace sc
|
||||
{
|
||||
ctx.termination_signaled = true;
|
||||
}
|
||||
else if (type == msg::controlmsg::MSGTYPE_UNL_CHANGESET && !ctx.args.readonly)
|
||||
{
|
||||
std::vector<std::string> additions, removals;
|
||||
parser.extract_unl_changeset(additions, removals);
|
||||
unl::update(additions, removals);
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace sc
|
||||
|
||||
92
src/unl.cpp
Normal file
92
src/unl.cpp
Normal file
@@ -0,0 +1,92 @@
|
||||
#include "util/util.hpp"
|
||||
#include "hplog.hpp"
|
||||
#include "unl.hpp"
|
||||
|
||||
/**
|
||||
* Manages the UNL public keys of this node.
|
||||
*/
|
||||
namespace unl
|
||||
{
|
||||
std::set<std::string> list; // List of binary pubkeys of UNL.
|
||||
std::string json_list; // Stringified json array of UNL. (To be fed into the contract args)
|
||||
std::shared_mutex unl_mutex;
|
||||
|
||||
size_t count()
|
||||
{
|
||||
std::shared_lock lock(unl_mutex);
|
||||
return list.size();
|
||||
}
|
||||
|
||||
std::set<std::string> get()
|
||||
{
|
||||
std::shared_lock lock(unl_mutex);
|
||||
return list;
|
||||
}
|
||||
|
||||
std::string get_json()
|
||||
{
|
||||
std::shared_lock lock(unl_mutex);
|
||||
return json_list;
|
||||
}
|
||||
|
||||
bool exists(const std::string &bin_pubkey)
|
||||
{
|
||||
std::shared_lock lock(unl_mutex);
|
||||
return list.find(bin_pubkey) != list.end();
|
||||
}
|
||||
|
||||
void add(const std::vector<std::string> &additions)
|
||||
{
|
||||
if (additions.empty())
|
||||
return;
|
||||
|
||||
std::unique_lock lock(unl_mutex);
|
||||
|
||||
for (const std::string &pubkey : additions)
|
||||
list.emplace(pubkey);
|
||||
|
||||
update_json_list();
|
||||
}
|
||||
|
||||
void update(const std::vector<std::string> &additions, const std::vector<std::string> &removals)
|
||||
{
|
||||
if (additions.empty() && removals.empty())
|
||||
return;
|
||||
|
||||
std::unique_lock lock(unl_mutex);
|
||||
|
||||
for (const std::string &pubkey : additions)
|
||||
list.emplace(pubkey);
|
||||
|
||||
for (const std::string &pubkey : removals)
|
||||
list.erase(pubkey);
|
||||
|
||||
update_json_list();
|
||||
|
||||
LOG_INFO << "UNL updated. Count:" << list.size();
|
||||
;
|
||||
}
|
||||
|
||||
void update_json_list()
|
||||
{
|
||||
std::ostringstream os;
|
||||
os << "[";
|
||||
for (auto pk = list.begin(); pk != list.end(); pk++)
|
||||
{
|
||||
if (pk != list.begin())
|
||||
os << ","; // Trailing comma separator for previous element.
|
||||
|
||||
// Convert binary pubkey into hex.
|
||||
std::string pubkeyhex;
|
||||
util::bin2hex(
|
||||
pubkeyhex,
|
||||
reinterpret_cast<const unsigned char *>(pk->data()) + 1,
|
||||
pk->length() - 1);
|
||||
|
||||
os << "\"" << pubkeyhex << "\"";
|
||||
}
|
||||
os << "]";
|
||||
json_list = os.str();
|
||||
}
|
||||
|
||||
} // namespace unl
|
||||
21
src/unl.hpp
Normal file
21
src/unl.hpp
Normal file
@@ -0,0 +1,21 @@
|
||||
#ifndef _HP_UNL_
|
||||
#define _HP_UNL_
|
||||
|
||||
#include "pchheader.hpp"
|
||||
|
||||
/**
|
||||
* Manages the UNL public keys of this node.
|
||||
*/
|
||||
namespace unl
|
||||
{
|
||||
size_t count();
|
||||
std::set<std::string> get();
|
||||
std::string get_json();
|
||||
bool exists(const std::string &bin_pubkey);
|
||||
void add(const std::vector<std::string> &additions);
|
||||
void update(const std::vector<std::string> &additions, const std::vector<std::string> &removals);
|
||||
void update_json_list();
|
||||
|
||||
} // namespace unl
|
||||
|
||||
#endif
|
||||
Reference in New Issue
Block a user