Code improvements.

This commit is contained in:
Ravin
2019-11-07 07:47:33 +05:30
committed by Ravin Perera
parent d16b43406b
commit 8d31075b7b
27 changed files with 155 additions and 171 deletions

View File

@@ -12,7 +12,7 @@ else
fi
# Delete and recreate 'hpcluster' directory.
rm -r hpcluster > /dev/null 2>&1
rm -rf hpcluster > /dev/null 2>&1
mkdir hpcluster
clusterloc="./hpcluster"
@@ -56,7 +56,7 @@ do
rm tmp.json
# Generate ssl certs
openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem \
openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout tlskey.pem -out tlscert.pem \
-subj "/C=AU/ST=ST/L=L/O=O/OU=OU/CN=localhost/emailAddress=hpnode${n}@example" > /dev/null 2>&1
popd > /dev/null 2>&1

View File

@@ -117,8 +117,8 @@ void set_contract_dir_paths(std::string basedir)
ctx.contractDir = basedir;
ctx.configDir = basedir + "/cfg";
ctx.configFile = ctx.configDir + "/hp.cfg";
ctx.tlsKeyFile = ctx.configDir + "/key.pem";
ctx.tlsCertFile = ctx.configDir + "/cert.pem";
ctx.tlsKeyFile = ctx.configDir + "/tlskey.pem";
ctx.tlsCertFile = ctx.configDir + "/tlscert.pem";
ctx.histDir = basedir + "/hist";
ctx.stateDir = basedir + "/state";
ctx.logDir = basedir + "/log";
@@ -459,7 +459,7 @@ int validate_contract_dir_paths()
if (path == ctx.tlsKeyFile || path == ctx.tlsCertFile)
{
std::cout << path << " does not exist. Please provide self-signed certificates. Can generate using command\n"
<< "openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout key.pem -out cert.pem\n"
<< "openssl req -newkey rsa:2048 -new -nodes -x509 -days 3650 -keyout tlskey.pem -out tlscert.pem\n"
<< "and add it to " + ctx.configDir << std::endl;
}
else

View File

@@ -52,9 +52,9 @@ void consensus()
}
LOG_DBG << "Started stage " << std::to_string(ctx.stage);
for (auto p : ctx.candidate_proposals)
for (const auto p : ctx.candidate_proposals)
{
bool self = p.pubkey == conf::cfg.pubkey;
const bool self = p.pubkey == conf::cfg.pubkey;
LOG_DBG << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " hinp:" << p.hash_inputs.size()
@@ -121,7 +121,7 @@ void consensus()
}
if (is_lcl_desync)
{
bool should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4);
const bool should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4);
//for now we are resetting to stage 0 to avoid possible deadlock situations
timewait_stage(true);
return;
@@ -206,7 +206,7 @@ void verify_and_populate_candidate_user_inputs()
for (const usr::user_submitted_message &umsg : umsgs)
{
std::string sig_hash = crypto::get_hash(umsg.sig);
const std::string sig_hash = crypto::get_hash(umsg.sig);
// Check for duplicate messages using hash of the signature.
if (ctx.recent_userinput_hashes.try_emplace(sig_hash))
@@ -311,7 +311,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// todo: repeat above for state
}
float_t vote_threshold = get_stage_threshold(ctx.stage);
const float_t vote_threshold = get_stage_threshold(ctx.stage);
// todo: check if inputs being proposed by another node are actually spoofed inputs
// from a user locally connected to this node.
@@ -319,17 +319,17 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// if we're at proposal stage 1 we'll accept any input and connection that has 1 or more vote.
// Add user pubkeys which have votes over stage threshold to proposal.
for (auto &[pubkey, numvotes] : votes.users)
for (const auto &[pubkey, numvotes] : votes.users)
if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0))
stg_prop.users.emplace(pubkey);
// Add inputs which have votes over stage threshold to proposal.
for (auto &[hash, numvotes] : votes.inputs)
for (const auto &[hash, numvotes] : votes.inputs)
if (numvotes >= vote_threshold || (ctx.stage == 1 && numvotes > 0))
stg_prop.hash_inputs.emplace(hash);
// Add outputs which have votes over stage threshold to proposal.
for (auto &[hash, numvotes] : votes.outputs)
for (const auto &[hash, numvotes] : votes.outputs)
if (numvotes >= vote_threshold)
stg_prop.hash_outputs.emplace(hash);
@@ -337,7 +337,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
// time is voted on a simple sorted and majority basis, since there will always be disagreement.
int32_t highest_votes = 0;
for (auto [time, numvotes] : votes.time)
for (const auto [time, numvotes] : votes.time)
{
if (numvotes > highest_votes)
{
@@ -474,7 +474,7 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string
* Returns the consensus percentage threshold for the specified stage.
* @param stage The consensus stage [1, 2, 3]
*/
float_t get_stage_threshold(uint8_t stage)
float_t get_stage_threshold(const uint8_t stage)
{
switch (stage)
{
@@ -488,7 +488,7 @@ float_t get_stage_threshold(uint8_t stage)
return -1;
}
void timewait_stage(bool reset)
void timewait_stage(const bool reset)
{
if (reset)
ctx.stage = 0;
@@ -539,8 +539,8 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop)
for (const std::string &hash : cons_prop.hash_outputs)
{
auto cu_itr = ctx.candidate_user_outputs.find(hash);
bool hashfound = (cu_itr != ctx.candidate_user_outputs.end());
const auto cu_itr = ctx.candidate_user_outputs.find(hash);
const bool hashfound = (cu_itr != ctx.candidate_user_outputs.end());
if (!hashfound)
{
LOG_ERR << "Output required but wasn't in our candidate outputs map, this will potentially cause desync.";
@@ -553,10 +553,10 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop)
candidate_user_output &cand_output = cu_itr->second;
// Find the user session by user pubkey.
auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey);
const auto sess_itr = usr::ctx.sessionids.find(cand_output.userpubkey);
if (sess_itr != usr::ctx.sessionids.end()) // match found
{
auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
const auto user_itr = usr::ctx.users.find(sess_itr->second); // sess_itr->second is the session id.
if (user_itr != usr::ctx.users.end()) // match found
{
std::string outputtosend;
@@ -589,8 +589,8 @@ void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::
for (const std::string &hash : cons_prop.hash_inputs)
{
// For each consensus input hash, we need to find the actual input content to feed the contract.
auto itr = ctx.candidate_user_inputs.find(hash);
bool hashfound = (itr != ctx.candidate_user_inputs.end());
const auto itr = ctx.candidate_user_inputs.find(hash);
const bool hashfound = (itr != ctx.candidate_user_inputs.end());
if (!hashfound)
{
LOG_ERR << "input required but wasn't in our candidate inputs map, this will potentially cause desync.";
@@ -628,7 +628,7 @@ void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap)
std::string output;
output.swap(bufpair.output);
std::string hash = crypto::get_hash(pubkey, output);
const std::string hash = crypto::get_hash(pubkey, output);
ctx.candidate_user_outputs.try_emplace(
std::move(hash),
candidate_user_output(pubkey, std::move(output)));
@@ -641,14 +641,14 @@ void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap)
* @param time_now The time that must be passed on to the contract.
* @param useriobufmap The contract bufmap which holds user I/O buffers.
*/
void run_contract_binary(int64_t time_now, proc::contract_bufmap_t &useriobufmap)
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap)
{
// todo:implement exchange of npl and hpsc bufs
proc::contract_bufmap_t nplbufmap;
proc::contract_iobuf_pair hpscbufpair;
proc::contract_exec_args eargs(time_now, useriobufmap, nplbufmap, hpscbufpair);
proc::exec_contract(eargs);
proc::exec_contract(
proc::contract_exec_args(time_now, useriobufmap, nplbufmap, hpscbufpair));
}
/**

View File

@@ -22,15 +22,13 @@ static const float STAGE3_THRESHOLD = 0.8;
*/
struct candidate_user_input
{
std::string userpubkey;
const std::string userpubkey;
const uint64_t maxledgerseqno;
std::string input;
uint64_t maxledgerseqno;
candidate_user_input(std::string userpubkey, std::string input, uint64_t maxledgerseqno)
candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno)
: userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno)
{
this->userpubkey = std::move(userpubkey);
this->input = std::move(input);
this->maxledgerseqno = maxledgerseqno;
}
};
@@ -39,13 +37,12 @@ struct candidate_user_input
*/
struct candidate_user_output
{
std::string userpubkey;
const std::string userpubkey;
std::string output;
candidate_user_output(std::string userpubkey, std::string output)
candidate_user_output(const std::string userpubkey, const std::string output)
: userpubkey(std::move(userpubkey)), output(std::move(output))
{
this->userpubkey = std::move(userpubkey);
this->output = std::move(output);
}
};
@@ -77,9 +74,6 @@ struct consensus_context
uint64_t time_now;
std::string lcl;
uint64_t led_seq_no;
std::string novel_proposal;
int32_t next_sleep;
consensus_context() : recent_userinput_hashes(200)
{
@@ -116,9 +110,9 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
float_t get_stage_threshold(uint8_t stage);
float_t get_stage_threshold(const uint8_t stage);
void timewait_stage(bool reset);
void timewait_stage(const bool reset);
void apply_ledger(const p2p::proposal &proposal);
@@ -128,7 +122,7 @@ void feed_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const p2p::
void extract_outputs_from_contract_bufmap(proc::contract_bufmap_t &bufmap);
void run_contract_binary(int64_t time_now, proc::contract_bufmap_t &useriobufmap);
void run_contract_binary(const int64_t time_now, proc::contract_bufmap_t &useriobufmap);
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate);

View File

@@ -9,7 +9,7 @@
namespace cons
{
const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no)
std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no)
{
//Serialize lcl using flatbuffer ledger schema.
flatbuffers::FlatBufferBuilder builder(1024);
@@ -27,7 +27,7 @@ const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_
//create file path to save lcl.
//file name -> [ledger sequnce numer]-[lcl hex]
std::string path;
std::string seq_no = std::to_string(led_seq_no);
const std::string seq_no = std::to_string(led_seq_no);
path.reserve(conf::ctx.histDir.size() + lcl_hash.size() + seq_no.size() + 6);
path.append(conf::ctx.histDir);
path.append("/");
@@ -41,10 +41,10 @@ const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_
ofs.write(ledger_str.data(), ledger_str.size());
ofs.close();
return (lcl_hash);
return lcl_hash;
}
const ledger_history load_ledger()
ledger_history load_ledger()
{
ledger_history ldg_hist;
ldg_hist.led_seq_no = 0;
@@ -53,8 +53,8 @@ const ledger_history load_ledger()
//Get all records at lcl history direcory and find the last closed ledger.
std::string latest_file_name;
std::string::size_type latest_pos = 0;
for (auto &entry : boost::filesystem::directory_iterator(conf::ctx.histDir))
size_t latest_pos = 0;
for (const auto &entry : boost::filesystem::directory_iterator(conf::ctx.histDir))
{
const boost::filesystem::path file_path = entry.path();
const std::string file_name = entry.path().filename().string();
@@ -69,7 +69,7 @@ const ledger_history load_ledger()
}
else
{
std::string::size_type pos = file_name.find("-");
const size_t pos = file_name.find("-");
uint64_t seq_no;
if (pos != std::string::npos)

View File

@@ -12,9 +12,9 @@ struct ledger_history
uint64_t led_seq_no;
};
const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no);
std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no);
const ledger_history load_ledger();
ledger_history load_ledger();
}

View File

@@ -8,7 +8,7 @@ namespace fbschema
/**
* Returns string_view from flat buffer data pointer and length.
*/
std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length)
std::string_view flatbuff_bytes_to_sv(const uint8_t *data, const flatbuffers::uoffset_t length)
{
const char *signature_content_str = reinterpret_cast<const char *>(data);
return std::string_view(signature_content_str, length);

View File

@@ -13,7 +13,7 @@ namespace fbschema
//---Conversion helpers from flatbuffers data types to std data types---//
std::string_view flatbuff_bytes_to_sv(const uint8_t *data, flatbuffers::uoffset_t length);
std::string_view flatbuff_bytes_to_sv(const uint8_t *data, const flatbuffers::uoffset_t length);
std::string_view flatbuff_bytes_to_sv(const flatbuffers::Vector<uint8_t> *buffer);

View File

@@ -42,7 +42,7 @@ int validate_and_extract_container(const Container **container_ref, std::string_
{
//Accessing message buffer
const uint8_t *container_buf_ptr = reinterpret_cast<const uint8_t *>(container_buf.data());
size_t container_buf_size = container_buf.length();
const size_t container_buf_size = container_buf.length();
//Defining Flatbuffer verifier (default max depth = 64, max_tables = 1000000,)
flatbuffers::Verifier container_verifier(container_buf_ptr, container_buf_size);
@@ -66,7 +66,7 @@ int validate_and_extract_container(const Container **container_ref, std::string_
}
//check message timestamp.
int64_t time_now = util::get_epoch_milliseconds();
const int64_t time_now = util::get_epoch_milliseconds();
if (container->timestamp() < (time_now - conf::cfg.roundtime * 4))
{
LOG_DBG << "Peer message is too old.";
@@ -122,7 +122,7 @@ int validate_container_trust(const Container *container)
* @param content_size Data buffer size.
* @return 0 on successful verification. -1 for failure.
*/
int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size)
int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, const flatbuffers::uoffset_t content_size)
{
//Defining Flatbuffer verifier for message content verification.
//Since content is also serialised by using Flatbuffer we can verify it using Flatbuffer.
@@ -144,7 +144,7 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con
* @param The Flatbuffer non-unl poporal received from the peer.
* @return A non-unl proposal struct representing the message.
*/
const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, uint64_t timestamp)
const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp)
{
p2p::nonunl_proposal nup;
@@ -159,7 +159,7 @@ const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal
* @param The Flatbuffer poporal received from the peer.
* @return A proposal struct representing the message.
*/
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, uint64_t timestamp)
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, const uint64_t timestamp)
{
p2p::proposal p;
@@ -189,12 +189,12 @@ void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_b
{
flatbuffers::FlatBufferBuilder builder(1024);
flatbuffers::Offset<NonUnl_Proposal_Message> nupmsg =
const flatbuffers::Offset<NonUnl_Proposal_Message> nupmsg =
CreateNonUnl_Proposal_Message(
builder,
usermsgsmap_to_flatbuf_usermsgsmap(builder, nup.user_messages));
flatbuffers::Offset<Content> message = CreateContent(builder, Message_NonUnl_Proposal_Message, nupmsg.Union());
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_NonUnl_Proposal_Message, nupmsg.Union());
builder.Finish(message); // Finished building message content to get serialised content.
// Now that we have built the content message,
@@ -212,7 +212,7 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder,
// todo:get a average propsal message size and allocate content builder based on that.
flatbuffers::FlatBufferBuilder builder(1024);
flatbuffers::Offset<Proposal_Message> proposal =
const flatbuffers::Offset<Proposal_Message> proposal =
CreateProposal_Message(
builder,
p.stage,
@@ -222,7 +222,7 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder,
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs),
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs));
flatbuffers::Offset<Content> message = CreateContent(builder, Message_Proposal_Message, proposal.Union());
const flatbuffers::Offset<Content> message = CreateContent(builder, Message_Proposal_Message, proposal.Union());
builder.Finish(message); // Finished building message content to get serialised content.
// Now that we have built the content message,
@@ -238,13 +238,13 @@ void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder,
* @param sign Whether to sign the message content.
*/
void create_containermsg_from_content(
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, bool sign)
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, const bool sign)
{
uint8_t *content_buf = content_builder.GetBufferPointer();
flatbuffers::uoffset_t content_size = content_builder.GetSize();
const uint8_t *content_buf = content_builder.GetBufferPointer();
const flatbuffers::uoffset_t content_size = content_builder.GetSize();
// Create container message content from serialised content from previous step.
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> content = container_builder.CreateVector(content_buf, content_size);
const flatbuffers::Offset<flatbuffers::Vector<uint8_t>> content = container_builder.CreateVector(content_buf, content_size);
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> pubkey_offset = 0;
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> sig_offset = 0;
@@ -258,7 +258,7 @@ void create_containermsg_from_content(
pubkey_offset = sv_to_flatbuff_bytes(container_builder, conf::cfg.pubkey);
}
flatbuffers::Offset<Container> container_message = CreateContainer(
const flatbuffers::Offset<Container> container_message = CreateContainer(
container_builder,
util::PEERMSG_VERSION,
util::get_epoch_milliseconds(),
@@ -281,7 +281,7 @@ flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector<flatbuffers::Offset
{
std::list<usr::user_submitted_message> msglist;
for (auto msg : *group->messages())
for (const auto msg : *group->messages())
{
msglist.push_back(usr::user_submitted_message(
flatbuff_bytes_to_sv(msg->content()),
@@ -302,7 +302,7 @@ usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, cons
{
std::vector<flatbuffers::Offset<UserSubmittedMessageGroup>> fbvec;
fbvec.reserve(map.size());
for (auto const &[pubkey, msglist] : map)
for (const auto &[pubkey, msglist] : map)
{
std::vector<flatbuffers::Offset<UserSubmittedMessage>> fbmsgsvec;
for (const usr::user_submitted_message &msg : msglist)

View File

@@ -19,11 +19,11 @@ int validate_and_extract_container(const Container **container_ref, std::string_
int validate_container_trust(const Container *container);
int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size);
int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, const flatbuffers::uoffset_t content_size);
const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, uint64_t timestamp);
const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal_Message &msg, const uint64_t timestamp);
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, uint64_t timestamp);
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, const uint64_t timestamp);
//---Message creation helpers---//
@@ -32,7 +32,7 @@ void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_b
void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p);
void create_containermsg_from_content(
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, bool sign);
flatbuffers::FlatBufferBuilder &container_builder, const flatbuffers::FlatBufferBuilder &content_builder, const bool sign);
//---Conversion helpers from flatbuffers data types to std data types---//

View File

@@ -39,7 +39,7 @@ void init()
severity = LOG_SEVERITY::ERROR;
// Log line format expression.
auto format_expr = (expr::stream
const auto format_expr = (expr::stream
<< expr::format_date_time<boost::posix_time::ptime>("TimeStamp", "%Y-%m-%d %H:%M:%S")
//<< ":" << expr::attr<boost::log::attributes::current_thread_id::value_type>("ThreadID")
<< " [" << expr::attr<std::string>("Channel")

View File

@@ -26,7 +26,7 @@ static const size_t CHALLENGE_LEN = 16;
* "type": "public_challenge",
* "challenge": "<hex challenge string>"
* }
* @param challenge String reference to copy the generated hex challenge string into.
* @param challengehex String reference to copy the generated hex challenge string into.
*/
void create_user_challenge(std::string &msg, std::string &challengehex)
{
@@ -207,7 +207,7 @@ int extract_input_container(std::string &nonce, std::string &input, uint64_t &ma
return -1;
}
rapidjson::Value &inputval = d[FLD_INPUT];
const rapidjson::Value &inputval = d[FLD_INPUT];
std::string_view inputhex(inputval.GetString(), inputval.GetStringLength());
// Convert hex input to binary.

View File

@@ -113,7 +113,9 @@ void std_terminate() noexcept
int main(int argc, char **argv)
{
// Register exception handler for std exceptions.
std::set_terminate(&std_terminate);
// Extract the CLI args
// This call will populate conf::ctx
if (parse_cmd(argc, argv) != 0)
@@ -157,15 +159,9 @@ int main(int argc, char **argv)
hplog::init();
if (p2p::init() != 0)
if (p2p::init() != 0 || usr::init() != 0 || cons::init() != 0)
return -1;
if (usr::init() != 0)
return -1;
if (cons::init() != 0)
return 1;
// After initializing primary subsystems, register the SIGINT handler.
signal(SIGINT, signal_handler);

View File

@@ -65,7 +65,7 @@ int init()
void start_peer_connections()
{
auto address = net::ip::make_address(conf::cfg.listenip);
boost::asio::ip::address address = net::ip::make_address(conf::cfg.listenip);
// Setting up the message max size. Retrieve it from config
default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize;
@@ -98,24 +98,24 @@ void peer_connection_watchdog()
//todo: implement exit gracefully.
while (true)
{
for (auto &v : conf::cfg.peers)
for (const auto &[peerid, ipport] : conf::cfg.peers)
{
if (peer_connections.find(v.first) == peer_connections.end())
if (peer_connections.find(peerid) == peer_connections.end())
{
LOG_DBG << "Trying to connect :" << v.second.first << ":" << v.second.second;
LOG_DBG << "Trying to connect : " << peerid;
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler, default_sess_opts)
->run(v.second.first, v.second.second);
->run(ipport.first, ipport.second);
}
}
util::sleep(200);
util::sleep(conf::cfg.roundtime * 4);
}
}
/**
* Broadcasts the given message to all currently connected outbound peers.
*/
void broadcast_message(peer_outbound_message msg)
void broadcast_message(const peer_outbound_message msg)
{
if (p2p::peer_connections.size() == 0)
{
@@ -126,7 +126,7 @@ void broadcast_message(peer_outbound_message msg)
//Broadcast while locking the peer_connections.
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
for (auto &[k, session] : p2p::peer_connections)
for (const auto &[k, session] : p2p::peer_connections)
session->send(msg);
}

View File

@@ -53,7 +53,7 @@ void start_peer_connections();
void peer_connection_watchdog();
void broadcast_message(peer_outbound_message msg);
void broadcast_message(const peer_outbound_message msg);
} // namespace p2p

View File

@@ -45,7 +45,7 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
//Accessing message content and size.
const uint8_t *content_ptr = container_content->Data();
flatbuffers::uoffset_t content_size = container_content->size();
const flatbuffers::uoffset_t content_size = container_content->size();
const p2pmsg::Content *content;
if (p2pmsg::validate_and_extract_content(&content, content_ptr, content_size) != 0)
@@ -58,7 +58,7 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
return;
}
p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
const p2pmsg::Message content_message_type = content->message_type(); //i.e - proposal, npl, state request, state response, etc
if (content_message_type == p2pmsg::Message_Proposal_Message) //message is a proposal message
{

View File

@@ -61,7 +61,7 @@ int exec_contract(const contract_exec_args &args)
return -1;
}
__pid_t pid = fork();
const __pid_t pid = fork();
if (pid > 0)
{
// HotPocket process.
@@ -73,7 +73,7 @@ int exec_contract(const contract_exec_args &args)
// Wait for child process (contract process) to complete execution.
LOG_INFO << "Contract process started.";
int presult = await_contract_execution();
const int presult = await_contract_execution();
LOG_INFO << "Contract process ended.";
contract_pid = 0;
@@ -201,7 +201,7 @@ int write_contract_args(const contract_exec_args &args)
os << "]}";
// Get the json string that should be written to contract input pipe.
std::string json = os.str();
const std::string json = os.str();
// Establish contract input pipe.
int stdinpipe[2];
@@ -383,7 +383,7 @@ int create_and_write_iopipes(std::vector<int> &fds, std::list<std::string> &inpu
// Write the inputs (if any) into the contract and close the writefd.
int writefd = fds[FDTYPE::HPWRITE];
const int writefd = fds[FDTYPE::HPWRITE];
bool vmsplice_error = false;
for (std::string &input : inputs)
@@ -420,7 +420,7 @@ int read_iopipe(std::vector<int> &fds, std::string &output)
// from the output pipe and store in the output buffer.
// Outputs will be read by the consensus process later when it wishes so.
int readfd = fds[FDTYPE::HPREAD];
const int readfd = fds[FDTYPE::HPREAD];
int bytes_available = 0;
ioctl(readfd, FIONREAD, &bytes_available);
bool vmsplice_error = false;
@@ -446,7 +446,7 @@ int read_iopipe(std::vector<int> &fds, std::string &output)
return vmsplice_error ? -1 : 0;
}
void close_unused_fds(bool is_hp)
void close_unused_fds(const bool is_hp)
{
close_unused_vectorfds(is_hp, hpscfds);
@@ -464,7 +464,7 @@ void close_unused_fds(bool is_hp)
* @param is_hp Specify 'true' when calling from HP process. 'false' from SC process.
* @param fds Vector of fds to close.
*/
void close_unused_vectorfds(bool is_hp, std::vector<int> &fds)
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds)
{
if (is_hp)
{

View File

@@ -91,9 +91,9 @@ int create_and_write_iopipes(std::vector<int> &fds, std::list<std::string> &inpu
int read_iopipe(std::vector<int> &fds, std::string &output);
void close_unused_fds(bool is_hp);
void close_unused_fds(const bool is_hp);
void close_unused_vectorfds(bool is_hp, std::vector<int> &fds);
void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds);
} // namespace proc

View File

@@ -85,15 +85,14 @@ void socket_server<T>::on_accept(error_code ec, tcp::socket socket)
}
else
{
std::string port = std::to_string(socket.remote_endpoint().port());
std::string address = socket.remote_endpoint().address().to_string();
const std::string port = std::to_string(socket.remote_endpoint().port());
const std::string address = socket.remote_endpoint().address().to_string();
//Creating websocket stream required to pass to initiate a new session
websocket::stream<beast::ssl_stream<beast::tcp_stream>> ws(std::move(socket), ctx);
// Launch a new session for this connection
std::make_shared<socket_session<T>>(
std::move(ws), sess_handler)
std::make_shared<socket_session<T>>(std::move(ws), sess_handler)
->run(std::move(port), std::move(address), true, sess_opts);
}

View File

@@ -20,7 +20,7 @@ socket_session<T>::socket_session(websocket::stream<beast::ssl_stream<beast::tcp
* to handle this.
*/
template <class T>
void socket_session<T>::set_max_socket_read_len(uint64_t size)
void socket_session<T>::set_max_socket_read_len(const uint64_t size)
{
ws.read_message_max(size);
}
@@ -29,11 +29,12 @@ void socket_session<T>::set_max_socket_read_len(uint64_t size)
* Set thresholds to the socket session
*/
template <class T>
void socket_session<T>::set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms)
void socket_session<T>::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms)
{
thresholds[threshold_type].counter_value = 0;
thresholds[threshold_type].intervalms = intervalms;
thresholds[threshold_type].threshold_limit = threshold_limit;
session_threshold &t = thresholds[threshold_type];
t.counter_value = 0;
t.intervalms = intervalms;
t.threshold_limit = threshold_limit;
}
/*
@@ -41,15 +42,15 @@ void socket_session<T>::set_threshold(SESSION_THRESHOLDS threshold_type, uint64_
* configured threshold limit.
*/
template <class T>
void socket_session<T>::increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount)
void socket_session<T>::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount)
{
sock::session_threshold &t = thresholds[threshold_type];
session_threshold &t = thresholds[threshold_type];
// Ignore the counter if limit is set as 0.
if (t.threshold_limit == 0)
return;
uint64_t time_now = util::get_epoch_milliseconds();
const uint64_t time_now = util::get_epoch_milliseconds();
t.counter_value += amount;
if (t.timestamp == 0)
@@ -60,7 +61,7 @@ void socket_session<T>::increment_metric(SESSION_THRESHOLDS threshold_type, uint
else
{
// Check whether we have exceeded the threshold within the monitering interval.
auto elapsed_time = time_now - t.timestamp;
const uint64_t elapsed_time = time_now - t.timestamp;
if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit)
{
t.timestamp = 0;
@@ -79,7 +80,7 @@ void socket_session<T>::increment_metric(SESSION_THRESHOLDS threshold_type, uint
//port and address will be used to identify from which remote party the message recieved in the handler
template <class T>
void socket_session<T>::run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts)
void socket_session<T>::run(const std::string &&address, const std::string &&port, const bool is_server_session, const session_options &sess_opts)
{
if (sess_opts.max_socket_read_len > 0)
{
@@ -96,17 +97,14 @@ void socket_session<T>::run(const std::string &&address, const std::string &&por
thresholds.push_back(session_threshold(sess_opts.max_badsigmsgs_per_minute, 60000));
thresholds.push_back(session_threshold(sess_opts.max_badmsgs_per_minute, 60000));
ssl::stream_base::handshake_type handshake_type = ssl::stream_base::client;
const ssl::stream_base::handshake_type handshake_type =
is_server_session ? ssl::stream_base::server : ssl::stream_base::client;
// Set this flag to identify whether this socket session created when node acts as a server
// INBOUND true - when node acts as server
// INBOUND false (OUTBOUND) - when node acts as client
if (is_server_session)
{
/**
* Set this flag to identify whether this socket session created when node acts as a server
* INBOUND true - when node acts as server
* INBOUND false (OUTBOUND) - when node acts as client
*/
flags.set(SESSION_FLAG::INBOUND);
handshake_type = ssl::stream_base::server;
}
this->port = std::move(port);
this->address = std::move(address);
@@ -127,7 +125,7 @@ void socket_session<T>::run(const std::string &&address, const std::string &&por
* Close an active websocket connection gracefully
*/
template <class T>
void socket_session<T>::on_ssl_handshake(error_code ec)
void socket_session<T>::on_ssl_handshake(const error_code ec)
{
if (ec)
return fail(ec, "handshake");
@@ -161,7 +159,7 @@ void socket_session<T>::on_ssl_handshake(error_code ec)
* Executes on acceptance of new connection
*/
template <class T>
void socket_session<T>::on_accept(error_code ec)
void socket_session<T>::on_accept(const error_code ec)
{
// Handle the error, if any
if (ec)
@@ -177,7 +175,7 @@ void socket_session<T>::on_accept(error_code ec)
* Executes on completion of recieiving a new message
*/
template <class T>
void socket_session<T>::on_read(error_code ec, std::size_t)
void socket_session<T>::on_read(const error_code ec, const std::size_t)
{
//if something goes wrong when trying to read, socket connection will be closed and calling this to inform it to the handler
// read may get called when operation_aborted as well.
@@ -213,7 +211,7 @@ void socket_session<T>::on_read(error_code ec, std::size_t)
* Send message through an active websocket connection
*/
template <class T>
void socket_session<T>::send(T msg)
void socket_session<T>::send(const T msg)
{
// Always add to queue
queue.push_back(std::move(msg));
@@ -232,7 +230,7 @@ void socket_session<T>::send(T msg)
* Executes on completion of write operation to a socket
*/
template <class T>
void socket_session<T>::on_write(error_code ec, std::size_t)
void socket_session<T>::on_write(const error_code ec, const std::size_t)
{
// Handle the error, if any
if (ec)
@@ -264,7 +262,7 @@ void socket_session<T>::close()
*/
//type will be used identify whether the error is due to failure in closing the web socket or transfer of another exception to this method
template <class T>
void socket_session<T>::on_close(error_code ec, int8_t type)
void socket_session<T>::on_close(const error_code ec, const int8_t type)
{
if (type == 1)
return;
@@ -277,7 +275,7 @@ void socket_session<T>::on_close(error_code ec, int8_t type)
* Executes on error
*/
template <class T>
void socket_session<T>::fail(error_code ec, char const *what)
void socket_session<T>::fail(const error_code ec, char const *what)
{
LOG_ERR << what << ": " << ec.message();

View File

@@ -92,23 +92,23 @@ class socket_session : public std::enable_shared_from_this<socket_session<T>>
socket_session_handler<T> &sess_handler; // handler passed to gain access to websocket events
std::vector<session_threshold> thresholds; // track down various communication thresholds
void fail(error_code ec, char const *what);
void fail(const error_code ec, char const *what);
void on_ssl_handshake(error_code ec);
void on_ssl_handshake(const error_code ec);
void on_accept(error_code ec);
void on_accept(const error_code ec);
void on_read(error_code ec, std::size_t bytes_transferred);
void on_read(const error_code ec, const std::size_t bytes_transferred);
void on_write(error_code ec, std::size_t bytes_transferred);
void on_write(const error_code ec, const std::size_t bytes_transferred);
void on_close(error_code ec, int8_t type);
void on_close(const error_code ec, const int8_t type);
// Websocket lambda expression helpers.
// Implementation of these are separated to a different .cpp to reduce regular compile time.
void ws_next_layer_async_handshake(ssl::stream_base::handshake_type handshake_type);
void ws_next_layer_async_handshake(const ssl::stream_base::handshake_type handshake_type);
void ws_async_accept();
@@ -145,15 +145,15 @@ public:
// Setting and reading flags to this is completely managed by user-code.
std::bitset<8> flags;
void run(const std::string &&address, const std::string &&port, bool is_server_session, const session_options &sess_opts);
void set_max_socket_read_len(const uint64_t size);
void send(T msg);
void set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms);
void set_max_socket_read_len(uint64_t size);
void increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount);
void set_threshold(SESSION_THRESHOLDS threshold_type, uint64_t threshold_limit, uint32_t intervalms);
void run(const std::string &&address, const std::string &&port, const bool is_server_session, const session_options &sess_opts);
void increment_metric(SESSION_THRESHOLDS threshold_type, uint64_t amount);
void send(const T msg);
void close();
};

View File

@@ -16,7 +16,7 @@ namespace sock
// This reduces lambda expression compilation time in regular code changes as long as this file is not touched.
template <class T>
void socket_session<T>::ws_next_layer_async_handshake(ssl::stream_base::handshake_type handshake_type)
void socket_session<T>::ws_next_layer_async_handshake(const ssl::stream_base::handshake_type handshake_type)
{
// Perform the SSL handshake
ws.next_layer().async_handshake(

View File

@@ -11,19 +11,17 @@ namespace usr
*/
struct user_submitted_message
{
std::string content;
std::string sig;
const std::string content;
const std::string sig;
user_submitted_message(std::string content, std::string sig)
user_submitted_message(const std::string content, const std::string sig)
: content(std::move(content)), sig(std::move(sig))
{
this->content = std::move(content);
this->sig = std::move(sig);
}
user_submitted_message(std::string_view content, std::string_view sig)
: content(content), sig(sig)
{
this->content = content;
this->sig = sig;
}
};

View File

@@ -21,9 +21,8 @@ void user_session_handler::on_connect(sock::socket_session<user_outbound_message
// As soon as a user connects, we issue them a challenge message. We remember the
// challenge we issued and later verifies the user's response with it.
user_outbound_message outmsg(issue_challenge(session->uniqueid));
session->send(std::move(outmsg));
session->send(
user_outbound_message(issue_challenge(session->uniqueid)));
// Set the challenge-issued flag to help later checks in on_message.
session->flags.set(sock::SESSION_FLAG::USER_CHALLENGE_ISSUED);
@@ -49,7 +48,7 @@ void user_session_handler::on_message(
// Check whether this user is among authenticated users
// and perform authenticated msg processing.
auto itr = ctx.users.find(session->uniqueid);
const auto itr = ctx.users.find(session->uniqueid);
if (itr != ctx.users.end())
{
// This is an authed user.

View File

@@ -54,7 +54,7 @@ std::string issue_challenge(const std::string sessionid)
int verify_challenge(std::string_view message, sock::socket_session<user_outbound_message> *session)
{
// The received message must be the challenge response. We need to verify it.
auto itr = ctx.pending_challenges.find(session->uniqueid);
const auto itr = ctx.pending_challenges.find(session->uniqueid);
if (itr == ctx.pending_challenges.end())
{
LOG_DBG << "No challenge found for the session " << session->uniqueid;
@@ -178,7 +178,7 @@ int add_user(sock::socket_session<user_outbound_message> *session, const std::st
*/
int remove_user(const std::string &sessionid)
{
auto itr = ctx.users.find(sessionid);
const auto itr = ctx.users.find(sessionid);
if (itr == ctx.users.end())
{

View File

@@ -6,7 +6,7 @@ namespace util
// rollover_hashset class methods
rollover_hashset::rollover_hashset(uint32_t maxsize)
rollover_hashset::rollover_hashset(const uint32_t maxsize)
{
this->maxsize = maxsize == 0 ? 1 : maxsize;
}
@@ -15,13 +15,13 @@ rollover_hashset::rollover_hashset(uint32_t maxsize)
* Inserts the given hash to the list.
* @return True on succesful insertion. False if hash already exists.
*/
bool rollover_hashset::try_emplace(std::string hash)
bool rollover_hashset::try_emplace(const std::string hash)
{
auto itr = recent_hashes.find(hash);
const auto itr = recent_hashes.find(hash);
if (itr == recent_hashes.end()) // Not found
{
// Add the new message hash to the set.
auto [newitr, success] = recent_hashes.emplace(hash);
const auto [newitr, success] = recent_hashes.emplace(std::move(hash));
// Insert a pointer to the stored hash value to the back of the ordered list of hashes.
recent_hashes_list.push_back(&(*newitr));
@@ -48,7 +48,7 @@ bool rollover_hashset::try_emplace(std::string hash)
* @param bin_len Bytes length.
* @return Always returns 0.
*/
int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_len)
int bin2hex(std::string &encoded_string, const unsigned char *bin, const size_t bin_len)
{
// Allocate the target string.
encoded_string.resize(bin_len * 2);
@@ -70,7 +70,7 @@ int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_le
* @param decodedbuf_len Decoded buffer size.
* @param hex_str hex string to decode.
*/
int hex2bin(unsigned char *decodedbuf, size_t decodedbuf_len, std::string_view hex_str)
int hex2bin(unsigned char *decodedbuf, const size_t decodedbuf_len, std::string_view hex_str)
{
const char *hex_end;
size_t bin_len;
@@ -99,7 +99,7 @@ int64_t get_epoch_milliseconds()
/**
* Sleeps the current thread for specified no. of milliseconds.
*/
void sleep(uint64_t milliseconds)
void sleep(const uint64_t milliseconds)
{
std::this_thread::sleep_for(std::chrono::milliseconds(milliseconds));
}

View File

@@ -39,17 +39,17 @@ private:
uint32_t maxsize;
public:
rollover_hashset(uint32_t maxsize);
bool try_emplace(std::string hash);
rollover_hashset(const uint32_t maxsize);
bool try_emplace(const std::string hash);
};
int bin2hex(std::string &encoded_string, const unsigned char *bin, size_t bin_len);
int bin2hex(std::string &encoded_string, const unsigned char *bin, const size_t bin_len);
int hex2bin(unsigned char *decoded, size_t decoded_len, std::string_view hex_str);
int hex2bin(unsigned char *decoded, const size_t decoded_len, std::string_view hex_str);
int64_t get_epoch_milliseconds();
void sleep(uint64_t milliseconds);
void sleep(const uint64_t milliseconds);
int version_compare(const std::string &x, const std::string &y);