Merge branch 'hpsh' into test-release

Conflicts:
	CMakeLists.txt
	test/docker/Dockerfile.ubt.20.04
	test/docker/build.sh
	test/local-cluster/Dockerfile
This commit is contained in:
kithminisg
2024-01-05 18:43:00 +05:30
42 changed files with 1092 additions and 156 deletions

View File

@@ -59,6 +59,7 @@ add_executable(hpcore
src/ledger/ledger_sync.cpp
src/ledger/ledger_serve.cpp
src/ledger/ledger.cpp
src/debug_shell/debug_shell.cpp
src/status.cpp
src/consensus.cpp
src/main.cpp
@@ -77,7 +78,7 @@ target_link_libraries(hpcore
add_custom_command(TARGET hpcore POST_BUILD
# COMMAND strip ./build/hpcore
COMMAND cp ./test/bin/hpws ./test/bin/hpfs ./evernode-license.pdf ./build/
COMMAND cp ./test/bin/hpws ./test/bin/hpfs ./test/bin/hpsh ./evernode-license.pdf ./build/
)
target_precompile_headers(hpcore PUBLIC src/pchheader.hpp)
@@ -87,9 +88,9 @@ target_precompile_headers(hpcore PUBLIC src/pchheader.hpp)
add_custom_target(docker
COMMAND mkdir -p ./test/local-cluster/bin
COMMAND cp ./build/hpcore ./test/local-cluster/bin/
COMMAND cp ./test/bin/libblake3.so ./test/bin/hpws ./test/bin/hpfs ./test/local-cluster/bin/
COMMAND cp ./test/bin/libblake3.so ./test/bin/hpws ./test/bin/hpfs ./test/bin/hpsh ./test/local-cluster/bin/
COMMAND cp ./evernode-license.pdf ./test/local-cluster/bin/
COMMAND docker build -t hpcore:latest -t hpcore:0.6.4 -f ./test/local-cluster/Dockerfile ./test/local-cluster/bin/
COMMAND docker build -t hpcore:latest -t hpcore:0.6.5 -f ./test/local-cluster/Dockerfile ./test/local-cluster/bin/
)
set_target_properties(docker PROPERTIES EXCLUDE_FROM_ALL TRUE)
add_dependencies(docker hpcore)

View File

@@ -67,22 +67,6 @@ async function main() {
});
})
// This will get fired when contract sends a read response.
hpc.on(HotPocket.events.contractReadResponse, (response) => {
const result = bson.deserialize(response);
if (result.type == "downloadResult") {
if (result.status == "ok") {
fs.writeFileSync(result.fileName, result.content.buffer);
console.log("File " + result.fileName + " downloaded to current directory.");
}
else {
console.log("File " + result.fileName + " download failed. reason: " + result.status);
}
}
else {
console.log("Unknown read request result.");
}
})
console.log("Ready to accept inputs.");
@@ -122,10 +106,16 @@ async function main() {
else if (inp.startsWith("download ")) {
const fileName = inp.substr(9);
hpc.sendContractReadRequest(bson.serialize({
hpc.submitContractReadRequest(bson.serialize({
type: "download",
fileName: fileName
}));
})).then(reply => {
const res = bson.deserialize(reply);
if (res && res.type === 'downloadResult' && res.status === 'ok')
console.log(res.content);
else
console.log(res);
});
}
else {
console.log("Invalid command. [upload <local path> | delete <filename> | download <filename>] expected.")

View File

@@ -123,6 +123,16 @@ async function main() {
else if (inp === "stat") {
hpc.getStatus().then(stat => console.log(stat));
}
else if (inp.startsWith("debug_shell ")) {
hpc.submitDebugShellRequest(inp.substr(12)).then(id => {
hpc.on(id, (reply) => {
if (reply.data)
console.log(reply.data);
else
console.error(reply.error);
})
});
}
else {
if (inp.startsWith("upload ")) {

View File

@@ -6,7 +6,7 @@ const filename = "file.dat";
const autofilePrefix = "autofile";
const autofileSize = 1 * 1024 * 1024;
const diagnosticContract = async (ctx) => {
const diagnosticContract = async (ctx, readOnly = false) => {
// Collection of per-user promises to wait for. Each promise completes when inputs for that user is processed.
const userHandlers = [];
@@ -134,5 +134,16 @@ const diagnosticContract = async (ctx) => {
}
const fallback = async (ctx) => {
console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`);
}
const hpc = new HotPocket.Contract();
hpc.init(diagnosticContract);
hpc.init({
"consensus": async (ctx) => { await diagnosticContract(ctx); },
"consensus_fallback": async (ctx) => { await fallback(ctx); },
"read_req": async (ctx) => { await diagnosticContract(ctx, true); }
});

View File

@@ -5,10 +5,10 @@ const exectsFile = "exects.txt";
// HP smart contract is defined as a function which takes HP ExecutionContext as an argument.
// HP considers execution as complete, when this function completes and all the NPL message callbacks are complete.
const echoContract = async (ctx) => {
const contract = async (ctx, readonly = false) => {
// We just save execution timestamp as an example state file change.
if (!ctx.readonly) {
if (!readonly) {
fs.appendFileSync(exectsFile, "ts:" + ctx.timestamp + "\n");
const stats = fs.statSync(exectsFile);
@@ -56,7 +56,7 @@ const echoContract = async (ctx) => {
// ctx.unl.find("<public key hex>");
// NPL messages example.
// if (!ctx.readonly) {
// if (!readonly) {
// // Start listening to incoming NPL messages before we send ours.
// const promise = new Promise((resolve, reject) => {
// let timeout = setTimeout(() => {
@@ -84,5 +84,33 @@ const echoContract = async (ctx) => {
// await ctx.updateConfig(config);
}
const fallback = async (ctx) => {
console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`);
// NPL messages example.
// Start listening to incoming NPL messages before we send ours.
const promise = new Promise((resolve, reject) => {
let timeout = setTimeout(() => {
reject('NPL timeout.');
}, 2000);
let list = [];
ctx.unl.onMessage((node, msg) => {
console.log(`${node.publicKey} said ${msg} to me.`);
list.push(msg);
if (list.length == ctx.unl.list().length) {
clearTimeout(timeout);
resolve();
}
});
});
await ctx.unl.send("Hello");
await promise;
}
const hpc = new HotPocket.Contract();
hpc.init(echoContract);
hpc.init({
"consensus": async (ctx) => { await contract(ctx, false); },
"consensus_fallback": async (ctx) => { await fallback(ctx); },
"read_req": async (ctx) => { await contract(ctx, true); }
});

View File

@@ -2,7 +2,7 @@ const HotPocket = require("hotpocket-nodejs-contract");
const fs = require('fs');
const bson = require('bson');
const fileContract = async (ctx) => {
const fileContract = async (ctx, readonly = false) => {
for (const user of ctx.users.list()) {
@@ -54,7 +54,7 @@ const fileContract = async (ctx) => {
}));
}
}
else if (msg.type == "download") {
else if (readonly && msg.type == "download") {
if (fs.existsSync(msg.fileName)) {
const fileContent = fs.readFileSync(msg.fileName);
await user.send(bson.serialize({
@@ -76,5 +76,14 @@ const fileContract = async (ctx) => {
}
};
const fallback = async (ctx) => {
console.log(`Fallback mode: Non consensus execution count: ${ctx.nonConsensusRounds}`);
}
const hpc = new HotPocket.Contract();
hpc.init(fileContract, HotPocket.clientProtocols.bson);
hpc.init({
"consensus": async (ctx) => { await fileContract(ctx); },
"consensus_fallback": async (ctx) => { await fallback(ctx); },
"read_req": async (ctx) => { await fileContract(ctx, true); }
}, HotPocket.clientProtocols.bson);

View File

@@ -6,7 +6,7 @@
"": {
"dependencies": {
"bson": "4.0.4",
"hotpocket-nodejs-contract": "0.5.10",
"hotpocket-nodejs-contract": "0.7.0",
"seedrandom": "3.0.5"
}
},
@@ -37,9 +37,9 @@
}
},
"node_modules/hotpocket-nodejs-contract": {
"version": "0.5.10",
"resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.10.tgz",
"integrity": "sha512-1Cw9WcyoQmJabGkjRbM4FZ1HMvUI3z6IVTbhi0BDiD+K1c6bHROPgNKbIHCSL15+Z5xvbHpeuOOPLGogGtdC7Q=="
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.7.0.tgz",
"integrity": "sha512-MHIhGGzPBsKkmXMZrr8L3ze6cSqlSKQ+V2J59/fSS7FDeo5+Z983ZOwt1aBvaLmkN22YOvtLVdBUn1EEamryOg=="
},
"node_modules/ieee754": {
"version": "1.1.13",
@@ -82,9 +82,9 @@
}
},
"hotpocket-nodejs-contract": {
"version": "0.5.10",
"resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.5.10.tgz",
"integrity": "sha512-1Cw9WcyoQmJabGkjRbM4FZ1HMvUI3z6IVTbhi0BDiD+K1c6bHROPgNKbIHCSL15+Z5xvbHpeuOOPLGogGtdC7Q=="
"version": "0.7.0",
"resolved": "https://registry.npmjs.org/hotpocket-nodejs-contract/-/hotpocket-nodejs-contract-0.7.0.tgz",
"integrity": "sha512-MHIhGGzPBsKkmXMZrr8L3ze6cSqlSKQ+V2J59/fSS7FDeo5+Z983ZOwt1aBvaLmkN22YOvtLVdBUn1EEamryOg=="
},
"ieee754": {
"version": "1.1.13",

View File

@@ -5,7 +5,7 @@
"build-diag": "ncc build diagnostic_contract.js -o dist/diagnostic-contract"
},
"dependencies": {
"hotpocket-nodejs-contract": "0.5.10",
"hotpocket-nodejs-contract": "0.7.0",
"bson": "4.0.4",
"seedrandom": "3.0.5"
}

View File

@@ -194,6 +194,8 @@ namespace conf
cfg.log.loggers.emplace("console");
cfg.log.loggers.emplace("file");
cfg.debug_shell.enabled = false;
// Save the default settings into the config file.
if (write_config(cfg) != 0)
return -1;
@@ -242,6 +244,7 @@ namespace conf
ctx.hpws_exe_path = ctx.exe_dir + "/" + "hpws";
ctx.hpfs_exe_path = ctx.exe_dir + "/" + "hpfs";
ctx.hpsh_exe_path = ctx.exe_dir + "/" + "hpsh";
ctx.contract_dir = basedir;
ctx.config_dir = basedir + "/cfg";
@@ -511,6 +514,42 @@ namespace conf
}
}
// debug_shell
{
jpath = "debug_shell";
try
{
const jsoncons::ojson &debug_shell = d["debug_shell"];
cfg.debug_shell.enabled = debug_shell["enabled"].as<bool>();
if (cfg.debug_shell.run_as.from_string(debug_shell["run_as"].as<std::string>()) == -1)
{
std::cerr << "Invalid format for debug_shell run as config (\"uid>0:gid>0\" expected).\n";
return -1;
}
jpath = "debug_shell.users";
cfg.debug_shell.users.clear();
for (auto &userpk : debug_shell["users"].array_range())
{
// Convert the public key hex of each node to binary and store it.
const std::string bin_pubkey = util::to_bin(userpk.as<std::string_view>());
if (bin_pubkey.empty())
{
std::cerr << "Error decoding user pubkey list.\n";
return -1;
}
cfg.debug_shell.users.emplace(bin_pubkey);
}
}
catch (const std::exception &e)
{
print_missing_field_error(jpath, e);
return -1;
}
}
return 0;
}
@@ -623,6 +662,20 @@ namespace conf
d.insert_or_assign("log", log_config);
}
// debug_shell configs
{
jsoncons::ojson debug_shell_config;
debug_shell_config.insert_or_assign("enabled", cfg.debug_shell.enabled);
debug_shell_config.insert_or_assign("run_as", cfg.debug_shell.run_as.to_string());
jsoncons::ojson users(jsoncons::json_array_arg);
for (const auto &userpk : cfg.debug_shell.users)
{
users.push_back(util::to_hex(userpk));
}
debug_shell_config.insert_or_assign("users", users);
d.insert_or_assign("debug_shell", debug_shell_config);
}
return write_json_file(ctx.config_file, d);
}
@@ -709,7 +762,7 @@ namespace conf
*/
int validate_contract_dir_paths()
{
const std::string paths[8] = {
const std::string paths[9] = {
ctx.contract_dir,
ctx.config_file,
ctx.contract_hpfs_dir,
@@ -717,7 +770,8 @@ namespace conf
ctx.tls_key_file,
ctx.tls_cert_file,
ctx.hpfs_exe_path,
ctx.hpws_exe_path};
ctx.hpws_exe_path,
ctx.hpsh_exe_path};
for (const std::string &path : paths)
{
@@ -729,7 +783,7 @@ namespace conf
<< "openssl req -newkey rsa:2048 -new -nodes -x509 -days 365 -keyout tlskey.pem -out tlscert.pem\n"
<< "and add it to " + ctx.config_dir << std::endl;
}
else if (path == ctx.hpfs_exe_path || path == ctx.hpws_exe_path)
else if (path == ctx.hpfs_exe_path || path == ctx.hpws_exe_path || path == ctx.hpsh_exe_path)
{
std::cerr << path << " binary does not exist.\n";
}
@@ -952,10 +1006,14 @@ namespace conf
jdoc.insert_or_assign("max_input_ledger_offset", contract.max_input_ledger_offset);
jsoncons::ojson consensus;
jsoncons::ojson fallback;
fallback.insert_or_assign("execute", contract.consensus.fallback.execute);
consensus.insert_or_assign("mode", contract.consensus.mode == MODE::PUBLIC ? MODE_PUBLIC : MODE_PRIVATE);
consensus.insert_or_assign("roundtime", contract.consensus.roundtime.load());
consensus.insert_or_assign("stage_slice", contract.consensus.stage_slice.load());
consensus.insert_or_assign("threshold", contract.consensus.threshold);
consensus.insert_or_assign("fallback", fallback);
jdoc.insert_or_assign("consensus", consensus);
jsoncons::ojson npl;
@@ -1095,6 +1153,9 @@ namespace conf
}
contract.consensus.mode = jdoc["consensus"]["mode"].as<std::string>() == MODE_PUBLIC ? MODE::PUBLIC : MODE::PRIVATE;
jpath = "contract.consensus.fallback";
contract.consensus.fallback.execute = jdoc["consensus"]["fallback"]["execute"].as<bool>();
jpath = "contract.npl";
if (jdoc["npl"]["mode"].as<std::string>() != MODE_PUBLIC && jdoc["npl"]["mode"].as<std::string>() != MODE_PRIVATE)
{

View File

@@ -173,12 +173,18 @@ namespace conf
size_t max_file_count = 0; // Max no. of log files to keep.
};
struct fallback_config
{
bool execute = false; // Whether or not to execute the contract on fallback mode.
};
struct consensus_config
{
MODE mode; // If PUBLIC, consensus are broadcasted to non-unl nodes as well.
std::atomic<uint32_t> roundtime = 0; // Consensus round time in ms (max: 3,600,000).
std::atomic<uint32_t> stage_slice = 0; // Percentage slice of round time that stages 0,1,2 get (max: 33).
uint16_t threshold = 0;
uint16_t threshold = 0; // The minimum percentage of votes for accepting a stage 3 proposal to create a ledger.
fallback_config fallback; // Consensus fallback related configuration.
};
struct npl_config
@@ -208,6 +214,13 @@ namespace conf
std::vector<std::string> runtime_env_args; // Contract environment variables.
};
struct debug_shell_config
{
bool enabled = false; // Whether or not to enable debug_shell.
ugid run_as; // The user/groups id to execute the debug_shell as.
std::set<std::string> users; // List of users who are allowed to perform debug_shell (list of binary public keys).
};
struct user_config
{
uint16_t port = 0; // Listening port for public user connections
@@ -263,6 +276,7 @@ namespace conf
std::string exe_dir; // HotPocket executable dir.
std::string hpws_exe_path; // hpws executable file path.
std::string hpfs_exe_path; // hpfs executable file path.
std::string hpsh_exe_path; // debug_shell executable path file
std::string contract_dir; // Contract base directory full path.
std::string contract_hpfs_dir; // Contract hpfs metadata dir (The location of hpfs log file).
@@ -316,14 +330,15 @@ namespace conf
hpfs_config hpfs;
log_config log;
health_config health; // For debugging only. Not included in the config file.
debug_shell_config debug_shell;
};
// Global contract context struct exposed to the application.
// Other modeuls will access context values via this.
// Other modules will access context values via this.
extern contract_ctx ctx;
// Global configuration struct exposed to the application.
// Other modeuls will access config values via this.
// Other modules will access config values via this.
extern hp_config cfg;
int init();

View File

@@ -29,6 +29,7 @@ namespace consensus
// Max no. of time to get unreliable votes before we try heuristics to increase vote receiving reliability.
constexpr uint16_t MAX_UNRELIABLE_VOTES_ATTEMPTS = 5;
constexpr uint16_t FALLBACK_CONTRACT_TERMINATE_MARGIN = 100;
consensus_context ctx;
bool init_success = false;
@@ -140,6 +141,9 @@ namespace consensus
broadcast_nonunl_proposal();
}
// Keep copy of current stage since we might change the stage in following conditions.
uint8_t cur_stage = ctx.stage;
if (ctx.stage == 0)
{
// Prepare the consensus candidate user inputs that we have accumulated so far. (We receive them periodically via NUPs)
@@ -147,7 +151,7 @@ namespace consensus
if (verify_and_populate_candidate_user_inputs(lcl_id.seq_no) == -1)
return -1;
const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
const p2p::proposal p = create_stage0_proposal(state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id, lcl_id);
broadcast_proposal(p);
ctx.stage = 1; // Transition to next stage.
@@ -200,6 +204,7 @@ namespace consensus
{
ctx.stage = 0;
ctx.unreliable_votes_attempts++;
ctx.non_consensus_rounds++;
// If we get too many consecutive unreliable vote rounds, then we perform time config sniffing just in case the unreliable votes
// are caused because our roundtime config information is different from other nodes.
@@ -208,15 +213,19 @@ namespace consensus
refresh_time_config(true);
ctx.unreliable_votes_attempts = 0;
}
if (execute_contract_in_fallback(cur_stage, lcl_id))
apply_consensed_patch_file_changes();
}
else
{
ctx.unreliable_votes_attempts = 0;
ctx.non_consensus_rounds = 0;
if (new_vote_status == status::VOTE_STATUS::SYNCED)
{
// If we are in sync, vote and broadcast the winning votes to next stage.
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id);
const p2p::proposal p = create_stage123_proposal(votes, unl_count, state_hash, patch_hash, last_primary_shard_id, last_raw_shard_id, lcl_id);
broadcast_proposal(p);
// This marks the moment we finish a sync cycle. We are in stage 1 and we just detected that our votes are in sync.
@@ -238,34 +247,33 @@ namespace consensus
}
/**
* Scan the stage 3 proposals from last round and create the ledger if all majority critertia are met.
* Evaluates stage 3 proposals and populates winning info.
* @param proposal_count Number of stage proposals.
* @param winning_votes Number of votes fo winning proposal.
* @param winning_proposal The winning proposal.
* @param stage Stage to evaluate.
*/
void attempt_ledger_close()
int evaluate_proposals(uint32_t &proposal_count, uint32_t &winning_votes, p2p::proposal &winning_proposal, const uint8_t stage)
{
std::map<util::h32, std::vector<p2p::proposal>> proposal_groups; // Stores sets of proposals against grouped by their root hashes.
uint32_t stage3_prop_count = 0; // Keep track of the number of stage 3 proposals received.
proposal_count = 0; // Keep track of the number of stage 3 proposals received.
// Count votes of all stage 3 proposal hashes.
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
if (cp.stage == 3)
if (cp.stage == stage)
{
stage3_prop_count++;
proposal_count++;
proposal_groups[cp.root_hash].push_back(cp);
}
}
// Threshold is devided by 100 to convert average to decimal.
const uint32_t min_votes_required = ceil((conf::cfg.contract.consensus.threshold * unl::count()) / 100.0);
if (stage3_prop_count < min_votes_required)
{
// We don't have enough stage 3 proposals to create a ledger.
LOG_DEBUG << "Not enough stage 3 proposals to create a ledger. received:" << stage3_prop_count << " needed:" << min_votes_required;
return;
}
// Return -1 if there are no proposals to evaluate.
if (proposal_count == 0)
return -1;
// Find the winning hash and no. of votes for it.
uint32_t winning_votes = 0;
winning_votes = 0;
util::h32 winning_hash = util::h32_empty;
for (const auto [hash, proposals] : proposal_groups)
{
@@ -276,16 +284,44 @@ namespace consensus
}
}
if (winning_votes < min_votes_required)
// Return -1 if there are no winning proposal.
if (winning_votes == 0)
return -1;
// Consensus reached. This is the winning set of proposals.
std::vector<p2p::proposal> &winning_group = proposal_groups[winning_hash];
winning_proposal = std::move(winning_group.front());
return winning_votes;
}
/**
* Scan the stage 3 proposals from last round and create the ledger if all majority criteria are met.
* @return true on successful ledger closure and false on failure.
*/
void attempt_ledger_close()
{
uint32_t stage3_prop_count = 0;
uint32_t winning_votes = 0;
p2p::proposal winning_prop;
const uint32_t min_votes_required = ceil((conf::cfg.contract.consensus.threshold * unl::count()) / 100.0);
evaluate_proposals(stage3_prop_count, winning_votes, winning_prop, 3);
// Threshold is divided by 100 to convert average to decimal.
if (stage3_prop_count < min_votes_required)
{
// We don't have enough stage 3 proposals to create a ledger.
LOG_DEBUG << "Not enough stage 3 proposals to create a ledger. received:" << stage3_prop_count << " needed:" << min_votes_required;
return;
}
else if (winning_votes < min_votes_required)
{
LOG_INFO << "Cannot close ledger. Possible fork condition. won:" << winning_votes << " needed:" << min_votes_required;
return;
}
// Consensus reached. This is the winning set of proposals.
std::vector<p2p::proposal> &winning_group = proposal_groups[winning_hash];
p2p::proposal &winning_prop = winning_group.front();
LOG_DEBUG << "Closing ledger with proposal:" << winning_prop.root_hash;
// Upon successful ledger close condition, update the ledger and execute the contract using the consensus proposal.
@@ -293,16 +329,18 @@ namespace consensus
if (prepare_consensed_users(consensed_users, winning_prop) == -1 ||
commit_consensus_results(winning_prop, consensed_users) == -1)
{
LOG_ERROR << "Error occured when closing ledger";
LOG_ERROR << "Error occurred when closing ledger";
// Cleanup obsolete information before next round starts.
cleanup_output_collections();
cleanup_consensed_user_inputs(consensed_users);
}
return;
}
/**
* Performs the consensus finalalization activities with the provided consensused information.
* Performs the consensus finalization activities with the provided consensed information.
* @param cons_prop The proposal which reached consensus.
* @param consensed_users Set of consensed users and their consensed inputs and outputs.
*/
@@ -338,7 +376,8 @@ namespace consensus
// Apply consensed config patch file changes to the hpcore runtime and hp.cfg.
const util::h32 patch_hash = sc::contract_fs.get_parent_hash(sc::PATCH_FILE_PATH);
if (apply_consensed_patch_file_changes(cons_prop.patch_hash, patch_hash) == -1)
// Check whether is there any patch changes to be applied which reached consensus.
if (patch_hash == cons_prop.patch_hash && apply_consensed_patch_file_changes() == -1)
return -1;
// Execute the smart contract with the consensed user inputs.
@@ -795,7 +834,7 @@ namespace consensus
if (ctx.contract_ctx->args.lcl_id == npl_msg.lcl_id)
return ctx.contract_ctx->args.npl_messages.try_enqueue(std::move(npl_msg));
else
LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no;
LOG_DEBUG << "Trying to add irrelevant NPL from " << util::to_hex(npl_msg.pubkey) << " | lcl-seq: " << npl_msg.lcl_id.seq_no;
}
return false;
}
@@ -904,8 +943,8 @@ namespace consensus
return 0;
}
p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash,
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id)
p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, const util::sequence_hash &last_primary_shard_id,
const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id)
{
// This is the proposal that stage 0 votes on.
// We report our own values in stage 0.
@@ -917,6 +956,7 @@ namespace consensus
p.last_primary_shard_id = last_primary_shard_id;
p.last_raw_shard_id = last_raw_shard_id;
p.time_config = CURRENT_TIME_CONFIG;
p.lcl_id = lcl_id;
// In stage 0 proposals, we calculate a random nonce from this node to contribute to the group nonce
std::string rand_bytes;
@@ -939,7 +979,7 @@ namespace consensus
}
p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash,
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id)
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id)
{
// The proposal to be emited at the end of this stage.
p2p::proposal p;
@@ -955,6 +995,7 @@ namespace consensus
p.time_config = CURRENT_TIME_CONFIG;
p.output_hash.resize(BLAKE3_OUT_LEN); // Default empty hash.
p.node_nonce = ctx.round_nonce;
p.lcl_id = lcl_id;
const uint64_t time_now = util::get_epoch_milliseconds();
@@ -1208,7 +1249,7 @@ namespace consensus
}
sc::contract_execution_args &args = ctx.contract_ctx->args;
args.readonly = false;
args.mode = sc::EXECUTION_MODE::CONSENSUS;
args.time = time;
// lcl to be passed to the contract.
@@ -1252,6 +1293,65 @@ namespace consensus
return 0;
}
/**
* Executes the contract on consensus failure.
* @param stage Stage the consensus are currently in.
* @param lcl_id Current lcl id of the node.
*/
bool execute_contract_in_fallback(const uint8_t stage, const util::sequence_hash &lcl_id)
{
// Do not execute if fallback is not enabled, HotPocket is shutting down.
if (!conf::cfg.contract.consensus.fallback.execute || ctx.is_shutting_down)
return false;
// Pass new user store for inputs, We are not handling inputs in fallback mode right now.
util::buffer_store fallback_store;
{
std::scoped_lock lock(ctx.contract_ctx_mutex);
ctx.contract_ctx.emplace(fallback_store);
}
sc::contract_execution_args &args = ctx.contract_ctx->args;
args.mode = sc::EXECUTION_MODE::CONSENSUS_FALLBACK;
// lcl to be passed to the contract.
args.lcl_id = lcl_id;
args.non_consensus_rounds = ctx.non_consensus_rounds;
uint32_t stage3_prop_count = 0;
uint32_t winning_votes = 0;
p2p::proposal winning_prop;
// Evaluate proposals and take tike from winning proposal.
if (evaluate_proposals(stage3_prop_count, winning_votes, winning_prop, (stage + 3) % 4) > 0)
args.time = winning_prop.time;
// We should end the contact before this round end, Otherwise we'll trap inside a round reset loop.
// We keep a margin of 100 milliseconds to avoid contract monitor thread hangs.
args.end_before = (ctx.round_start_time + conf::cfg.contract.consensus.roundtime - FALLBACK_CONTRACT_TERMINATE_MARGIN);
bool executed = false;
// Execute contract in fallback mode without user inputs or outputs.
if (sc::execute_contract(ctx.contract_ctx.value()) == -1)
{
LOG_ERROR << "Contract execution for fallback mode failed.";
}
else
{
// Get the new state hash after contract execution.
const util::h32 &new_state_hash = args.post_execution_state_hash;
// Update state hash in contract fs global hash tracker.
sc::contract_fs.set_parent_hash(sc::STATE_DIR_PATH, new_state_hash);
executed = true;
}
{
std::scoped_lock lock(ctx.contract_ctx_mutex);
ctx.contract_ctx.reset();
}
return executed;
}
/**
* Dispatch acceptence status responses to consensed user inputs, if the recipients are connected to us locally.
* @param consensed_users The map of consensed users containing their inputs.
@@ -1417,14 +1517,12 @@ namespace consensus
/**
* Apply patch file changes after verification from consensus.
* @param prop_patch_hash Hash of patch file which reached consensus.
* @param current_patch_hash Hash of the current patch file.
* @return 0 on success. -1 on failure.
*/
int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 &current_patch_hash)
int apply_consensed_patch_file_changes()
{
// Check whether is there any patch changes to be applied which reached consensus.
if (is_patch_update_pending && current_patch_hash == prop_patch_hash)
// Check whether is there any patch changes to be applied.
if (is_patch_update_pending)
{
if (sc::contract_fs.start_ro_session(HPFS_SESSION_NAME, false) != -1)
{

View File

@@ -113,6 +113,7 @@ namespace consensus
uint64_t round_boundry_offset = 0; // Time window boundry offset based on contract id.
uint16_t unreliable_votes_attempts = 0; // No. of times we failed to get reliable votes continously.
util::h32 round_nonce; // The random nonce generated by this node for this consensus round.
uint16_t non_consensus_rounds = 0; // No. of times we failed reach the consensus.
// Indicates whether we are inside a sync cycle or not. Sync cycle is considered to being when we first detect that we are out of sync
// and considered to end when we detect to be in sync inside stage 1 of a round for the first time after we began a sync.
@@ -167,6 +168,8 @@ namespace consensus
int consensus();
int evaluate_proposals(uint32_t &proposal_count, uint32_t &winning_votes, p2p::proposal &winning_proposal, const uint8_t stage);
void attempt_ledger_close();
int commit_consensus_results(const p2p::proposal &cons_prop, const consensus::consensed_user_map &consensed_users);
@@ -191,11 +194,11 @@ namespace consensus
int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no);
p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash,
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id);
p2p::proposal create_stage0_proposal(const util::h32 &state_hash, const util::h32 &patch_hash, const util::sequence_hash &last_primary_shard_id,
const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id);
p2p::proposal create_stage123_proposal(vote_counter &votes, const size_t unl_count, const util::h32 &state_hash, const util::h32 &patch_hash,
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id);
const util::sequence_hash &last_primary_shard_id, const util::sequence_hash &last_raw_shard_id, const util::sequence_hash &lcl_id);
void broadcast_proposal(const p2p::proposal &p);
@@ -215,6 +218,8 @@ namespace consensus
int execute_contract(const uint64_t time, const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
bool execute_contract_in_fallback(const uint8_t stage, const util::sequence_hash &lcl_id);
void dispatch_consensed_user_input_responses(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
void dispatch_consensed_user_outputs(const consensed_user_map &consensed_users, const util::sequence_hash &lcl_id);
@@ -232,7 +237,7 @@ namespace consensus
bool push_control_message(const std::string &control_msg);
int apply_consensed_patch_file_changes(const util::h32 &prop_patch_hash, const util::h32 &current_patch_hash);
int apply_consensed_patch_file_changes();
void refresh_time_config(const bool perform_detection);

View File

@@ -0,0 +1,313 @@
#include "debug_shell.hpp"
namespace debug_shell
{
constexpr uint8_t DEBUG_SHELL_CTRL_TERMINATE = 0;
constexpr uint8_t DEBUG_SHELL_CTRL_SH = 1;
constexpr uint32_t POLL_TIMEOUT = 1000;
constexpr uint32_t READ_BUFFER_SIZE = 128 * 1024;
debug_shell_context ctx;
int init()
{
// Do not initialize if disabled in config.
if (!conf::cfg.debug_shell.enabled)
return 0;
// Create a socket pair for the control channel.
if (socketpair(AF_UNIX, SOCK_SEQPACKET, 0, ctx.control_fds) == -1)
{
LOG_ERROR << errno << ": Error initializing socket pair.";
return -1;
}
// Create a child process for debug_shell process
ctx.debug_shell_pid = fork();
if (ctx.debug_shell_pid == -1)
{
LOG_ERROR << errno << ": Error forking hpfs process.";
close(ctx.control_fds[0]);
close(ctx.control_fds[1]);
return -1;
}
else if (ctx.debug_shell_pid > 0)
{
// Close child end of socket and start the watcher thread.
close(ctx.control_fds[0]);
ctx.watcher_thread = std::thread(response_watcher);
}
else if (ctx.debug_shell_pid == 0)
{
util::fork_detach();
close(ctx.control_fds[1]);
std::string fd_str;
fd_str.resize(10);
snprintf(fd_str.data(), 10, "%d", ctx.control_fds[0]);
char *argv[] = {(char *)conf::ctx.hpsh_exe_path.data(), fd_str.data(), NULL};
// Just before we execv the debug_shell binary, we set user execution user/group if specified in hp config.
// (Must set gid before setting uid)
if (!conf::cfg.debug_shell.run_as.empty() && (setgid(conf::cfg.debug_shell.run_as.gid) == -1 || setuid(conf::cfg.debug_shell.run_as.uid) == -1))
{
std::cerr << errno << ": DebugShell process setgid/uid failed."
<< "\n";
exit(1);
}
execv(argv[0], argv);
std::cerr << errno << ": Error executing debug_shell."
<< "\n";
close(ctx.control_fds[0]);
exit(1);
}
ctx.is_initialized = true;
LOG_INFO << "DebugShell handler started.";
return 0;
}
void deinit()
{
// This is not initialized if disabled in config.
if (!conf::cfg.debug_shell.enabled)
return;
ctx.is_shutting_down = true;
if (ctx.debug_shell_pid > 0)
send_terminate_message();
// Joining consensus processing thread.
if (ctx.watcher_thread.joinable())
ctx.watcher_thread.join();
// close sockets.
close(ctx.control_fds[0]);
for (const auto &command : ctx.commands)
{
close(command.out_fd);
}
if (ctx.debug_shell_pid > 0)
{
// Check if the debug_shell has exited voluntarily.
if (check_debug_shell_exited(false) == 0)
{
// Issue kill signal to kill the debug_shell process.
kill(ctx.debug_shell_pid, SIGKILL);
check_debug_shell_exited(true); // Blocking wait until exit.
}
}
LOG_INFO << "DebugShell handler stopped.";
}
int check_debug_shell_exited(const bool block)
{
int scstatus = 0;
const int wait_res = waitpid(ctx.debug_shell_pid, &scstatus, block ? 0 : WNOHANG);
if (wait_res == 0) // Child still running.
{
return 0;
}
if (wait_res == -1)
{
LOG_ERROR << errno << ": DebugShell process waitpid error. pid:" << ctx.debug_shell_pid;
ctx.debug_shell_pid = 0;
return -1;
}
else // Child has exited
{
ctx.debug_shell_pid = 0;
if (WIFEXITED(scstatus))
{
LOG_DEBUG << "DebugShell process ended normally.";
return 1;
}
else
{
LOG_WARNING << "DebugShell process ended prematurely. Exit code " << WEXITSTATUS(scstatus);
return -1;
}
}
}
int send_terminate_message()
{
return (write(ctx.control_fds[1], &DEBUG_SHELL_CTRL_TERMINATE, 1) < 0) ? -1 : 0;
}
void remove_user_commands(std::string_view user_pubkey)
{
std::scoped_lock lock(ctx.command_mutex);
// Loop for all the child commands and delete the commands belongs to the user.
auto itr = ctx.commands.begin();
while (itr != ctx.commands.end())
{
if (itr->user_pubkey == user_pubkey)
{
// Close the file descriptor and remove the command from context.
close(itr->out_fd);
itr = ctx.commands.erase(itr);
}
else
{
itr++;
}
}
}
int execute(std::string_view id, std::string_view user_pubkey, std::string_view message)
{
if (ctx.is_shutting_down)
return -1;
if (conf::cfg.debug_shell.users.find(std::string(user_pubkey)) == conf::cfg.debug_shell.users.end())
{
LOG_ERROR << "This user is not allowed to perform debug_shell operations.";
return -2;
}
std::string buffer;
buffer.resize(message.size() + 1);
buffer[0] = DEBUG_SHELL_CTRL_SH;
memcpy(buffer.data() + 1, message.data(), message.size());
// Send the debug_shell request header.
if (write(ctx.control_fds[1], buffer.data(), message.size() + 1) < 0)
{
LOG_ERROR << errno << ": Error writing header message to control fd.";
return -1;
}
// Read the control message which will contain the socket file descriptor.
struct msghdr child_msg = {0};
memset(&child_msg, 0, sizeof(child_msg));
char cmsgbuf[CMSG_SPACE(sizeof(int))];
child_msg.msg_control = cmsgbuf;
child_msg.msg_controllen = sizeof(cmsgbuf);
recvmsg(ctx.control_fds[1], &child_msg, 0);
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&child_msg);
// Skip if the message does not has file descriptor scm rights.
if (cmsg == NULL || cmsg->cmsg_type != SCM_RIGHTS)
{
LOG_ERROR << "Message sent on control line from debug_shell has non-scm_rights.";
return -1;
}
int out_fd = -1;
memcpy(&out_fd, CMSG_DATA(cmsg), sizeof(out_fd));
if (out_fd <= 0)
{
LOG_ERROR << "Invalid file descriptor receives on control line from debug_shell";
return -1;
}
// Add the command to the context.
{
std::scoped_lock lock(ctx.command_mutex);
ctx.commands.push_back(command_context{std::string(id), std::string(user_pubkey), out_fd});
}
return 0;
}
void response_watcher()
{
util::mask_signal();
while (!ctx.is_shutting_down)
{
// Iterate through received commands and check for outputs.
if (ctx.commands.size() > 0)
{
std::scoped_lock<std::mutex> lock(ctx.command_mutex);
auto itr = ctx.commands.begin();
while (itr != ctx.commands.end())
{
if (ctx.is_shutting_down)
break;
struct pollfd pfd;
pfd.fd = itr->out_fd;
pfd.events = POLLIN;
bool remove = false;
// If child fd has data to read handle them.
const int poll_res = poll(&pfd, 1, POLL_TIMEOUT);
if (poll_res == -1)
{
LOG_ERROR << errno << ": Error in poll";
remove = true;
}
else if (poll_res > 0 && (pfd.revents & POLLIN))
{
// Read the response and send to the user.
std::string response;
response.resize(READ_BUFFER_SIZE);
const int res = read(pfd.fd, response.data(), READ_BUFFER_SIZE);
if (res > 0)
{
response.resize(res);
// If response contains trailing new line, Remove it.
if (response[res - 1] == '\n')
response[res - 1] = '\0';
std::scoped_lock<std::mutex> lock(usr::ctx.users_mutex);
// Find the user session by user pubkey.
const auto user_itr = usr::ctx.users.find(itr->user_pubkey);
if (user_itr != usr::ctx.users.end()) // match found
{
const usr::connected_user &user = user_itr->second;
msg::usrmsg::usrmsg_parser parser(user.protocol);
usr::send_debug_shell_response(std::move(parser), user.session, itr->id, msg::usrmsg::STATUS_ACCEPTED, response);
response.clear();
}
}
else if (res == -1)
{
LOG_ERROR << errno << ": Error reading from fd.";
remove = true;
}
else
{
LOG_DEBUG << "DebugShell has closed the connection.";
remove = true;
}
}
if (remove)
{
// Close the file descriptor and remove the command from context.
close(itr->out_fd);
itr = ctx.commands.erase(itr);
}
else
{
itr++;
}
}
}
util::sleep(100);
}
}
}

View File

@@ -0,0 +1,46 @@
#ifndef _HP_DEBUG_SHELL_
#define _HP_DEBUG_SHELL_
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../usr/usr.hpp"
#include "../msg/usrmsg_common.hpp"
namespace debug_shell
{
struct command_context
{
std::string id;
std::string user_pubkey;
int out_fd;
};
struct debug_shell_context
{
std::mutex command_mutex;
std::list<command_context> commands;
int control_fds[2];
int debug_shell_pid;
std::thread watcher_thread;
bool is_shutting_down;
bool is_initialized = false;
};
extern debug_shell_context ctx;
int init();
void deinit();
int check_debug_shell_exited(const bool block);
int send_terminate_message();
void remove_user_commands(std::string_view user_pubkey);
int execute(std::string_view id, std::string_view user_pubkey, std::string_view message);
void response_watcher();
}
#endif

View File

@@ -16,6 +16,7 @@
#include "ledger/ledger.hpp"
#include "unl.hpp"
#include "killswitch/killswitch.h"
#include "debug_shell/debug_shell.hpp"
/**
* Parses CLI args and extracts HotPocket command and parameters given.
@@ -75,6 +76,7 @@ void deinit()
sc::deinit();
ledger::deinit();
conf::deinit();
debug_shell::deinit();
}
void sig_exit_handler(int signum)
@@ -213,7 +215,8 @@ int main(int argc, char **argv)
consensus::init() == -1 ||
read_req::init() == -1 ||
p2p::init() == -1 ||
usr::init() == -1)
usr::init() == -1 ||
debug_shell::init() == -1)
{
deinit();
return -1;

View File

@@ -161,6 +161,43 @@ namespace msg::usrmsg::bson
encoder.flush();
}
/**
* Constructs a debug_shell response message.
* @param msg Buffer to construct the generated bson message into.
* Message format:
* {
* "type": "debug_shell_response",
* "reply_for": "<corresponding request id>",
* "status": "<accepted|rejected>",
* "content": "<response>"
* "reason": "<reason>",
* }
* @param content The contract binary output content to be put in the message.
*/
void create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_DEBUG_SHELL_RESPONSE);
encoder.key(msg::usrmsg::FLD_REPLY_FOR);
encoder.string_value(reply_for);
encoder.key(msg::usrmsg::FLD_STATUS);
encoder.string_value(status);
encoder.key(msg::usrmsg::FLD_CONTENT);
encoder.byte_string_value(content);
// Reject reason is only included for rejected inputs.
if (!reason.empty())
{
encoder.key(msg::usrmsg::FLD_REASON);
encoder.string_value(reason);
}
encoder.end_object();
encoder.flush();
}
/**
* Constructs a contract read response message.
* @param msg Buffer to construct the generated bson message into.
@@ -196,7 +233,7 @@ namespace msg::usrmsg::bson
* "ledger_hash": <binary lcl hash>,
* "outputs": [<binary output 1>, <binary output 2>, ...], // The output order is the hash generation order.
* "output_hash": <binary hash of user's outputs>, [output hash = hash(pubkey+all outputs for the user)]
* "hash_tree": [<binary merkle hash tree for this round>], // Collapsed merkle tree with user's hash element marked as null.
* "hash_tree": [<binary merkle hash tree for this round>], // Collapsed merkle tree with user's hash element marked as null.
* "unl_sig": [["<pubkey>", "<sig>"], ...] // Binary UNL pubkeys and signatures of root hash.
* }
* @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)]
@@ -466,7 +503,7 @@ namespace msg::usrmsg::bson
/**
* Extracts a contract read request message sent by user.
*
*
* @param extracted_content The content to be passed to the contract, extracted from the message.
* @param d The bson document holding the read request message.
* Accepted signed input container format:
@@ -497,11 +534,44 @@ namespace msg::usrmsg::bson
return 0;
}
/**
* Extracts a debug_shell input message sent by user.
*
* @param extracted_content The content to be passed to the debug_shell, extracted from the message.
* @param d The bson document holding the debug_shell input message.
* Accepted signed input container format:
* {
* "type": "debug_shell_request",
* "id": "<any string>",
* "content": <binary buffer>
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d)
{
if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is<std::string>())
{
LOG_DEBUG << "DebugShell input 'id' field missing or invalid.";
return -1;
}
if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is_byte_string_view())
{
LOG_DEBUG << "DebugShell input 'content' field missing or invalid.";
return -1;
}
extracted_id = d[msg::usrmsg::FLD_ID].as<std::string>();
const jsoncons::byte_string_view &bsv = d[msg::usrmsg::FLD_CONTENT].as_byte_string_view();
extracted_content = std::string_view(reinterpret_cast<const char *>(bsv.data()), bsv.size());
return 0;
}
/**
* Extracts a signed input container message sent by user.
*
*
* @param extracted_input_container The input container extracted from the message.
* @param extracted_sig The binary signature extracted from the message.
* @param extracted_sig The binary signature extracted from the message.
* @param d The bson document holding the input container.
* Accepted signed input container format:
* {

View File

@@ -17,6 +17,8 @@ namespace msg::usrmsg::bson
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash);
void create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
@@ -43,6 +45,8 @@ namespace msg::usrmsg::bson
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d);
int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::ojson &d);
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig,
const jsoncons::ojson &d);

View File

@@ -68,6 +68,7 @@ table ProposalMsg {
output_sig:[ubyte];
state_hash: [ubyte];
patch_hash: [ubyte];
lcl_id:SequenceHash;
last_primary_shard_id:SequenceHash;
last_raw_shard_id: SequenceHash;

View File

@@ -157,6 +157,7 @@ namespace msg::fbuf::p2pmsg
p.stage = msg.stage();
p.state_hash = flatbuf_bytes_to_sv(msg.state_hash());
p.patch_hash = flatbuf_bytes_to_sv(msg.patch_hash());
p.lcl_id = flatbuf_seqhash_to_seqhash(msg.lcl_id());
p.last_primary_shard_id = flatbuf_seqhash_to_seqhash(msg.last_primary_shard_id());
p.last_raw_shard_id = flatbuf_seqhash_to_seqhash(msg.last_raw_shard_id());
@@ -432,6 +433,7 @@ namespace msg::fbuf::p2pmsg
sv_to_flatbuf_bytes(builder, p.output_sig),
hash_to_flatbuf_bytes(builder, p.state_hash),
hash_to_flatbuf_bytes(builder, p.patch_hash),
seqhash_to_flatbuf_seqhash(builder, p.lcl_id),
seqhash_to_flatbuf_seqhash(builder, p.last_primary_shard_id),
seqhash_to_flatbuf_seqhash(builder, p.last_raw_shard_id));

View File

@@ -1006,8 +1006,9 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
VT_OUTPUT_SIG = 24,
VT_STATE_HASH = 26,
VT_PATCH_HASH = 28,
VT_LAST_PRIMARY_SHARD_ID = 30,
VT_LAST_RAW_SHARD_ID = 32
VT_LCL_ID = 30,
VT_LAST_PRIMARY_SHARD_ID = 32,
VT_LAST_RAW_SHARD_ID = 34
};
const flatbuffers::Vector<uint8_t> *pubkey() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_PUBKEY);
@@ -1087,6 +1088,12 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
flatbuffers::Vector<uint8_t> *mutable_patch_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_PATCH_HASH);
}
const msg::fbuf::p2pmsg::SequenceHash *lcl_id() const {
return GetPointer<const msg::fbuf::p2pmsg::SequenceHash *>(VT_LCL_ID);
}
msg::fbuf::p2pmsg::SequenceHash *mutable_lcl_id() {
return GetPointer<msg::fbuf::p2pmsg::SequenceHash *>(VT_LCL_ID);
}
const msg::fbuf::p2pmsg::SequenceHash *last_primary_shard_id() const {
return GetPointer<const msg::fbuf::p2pmsg::SequenceHash *>(VT_LAST_PRIMARY_SHARD_ID);
}
@@ -1126,6 +1133,8 @@ struct ProposalMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
verifier.VerifyVector(state_hash()) &&
VerifyOffset(verifier, VT_PATCH_HASH) &&
verifier.VerifyVector(patch_hash()) &&
VerifyOffset(verifier, VT_LCL_ID) &&
verifier.VerifyTable(lcl_id()) &&
VerifyOffset(verifier, VT_LAST_PRIMARY_SHARD_ID) &&
verifier.VerifyTable(last_primary_shard_id()) &&
VerifyOffset(verifier, VT_LAST_RAW_SHARD_ID) &&
@@ -1177,6 +1186,9 @@ struct ProposalMsgBuilder {
void add_patch_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> patch_hash) {
fbb_.AddOffset(ProposalMsg::VT_PATCH_HASH, patch_hash);
}
void add_lcl_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> lcl_id) {
fbb_.AddOffset(ProposalMsg::VT_LCL_ID, lcl_id);
}
void add_last_primary_shard_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> last_primary_shard_id) {
fbb_.AddOffset(ProposalMsg::VT_LAST_PRIMARY_SHARD_ID, last_primary_shard_id);
}
@@ -1210,12 +1222,14 @@ inline flatbuffers::Offset<ProposalMsg> CreateProposalMsg(
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> output_sig = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> state_hash = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> patch_hash = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> lcl_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> last_primary_shard_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> last_raw_shard_id = 0) {
ProposalMsgBuilder builder_(_fbb);
builder_.add_time(time);
builder_.add_last_raw_shard_id(last_raw_shard_id);
builder_.add_last_primary_shard_id(last_primary_shard_id);
builder_.add_lcl_id(lcl_id);
builder_.add_patch_hash(patch_hash);
builder_.add_state_hash(state_hash);
builder_.add_output_sig(output_sig);
@@ -1246,6 +1260,7 @@ inline flatbuffers::Offset<ProposalMsg> CreateProposalMsgDirect(
const std::vector<uint8_t> *output_sig = nullptr,
const std::vector<uint8_t> *state_hash = nullptr,
const std::vector<uint8_t> *patch_hash = nullptr,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> lcl_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> last_primary_shard_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> last_raw_shard_id = 0) {
auto pubkey__ = pubkey ? _fbb.CreateVector<uint8_t>(*pubkey) : 0;
@@ -1273,6 +1288,7 @@ inline flatbuffers::Offset<ProposalMsg> CreateProposalMsgDirect(
output_sig__,
state_hash__,
patch_hash__,
lcl_id,
last_primary_shard_id,
last_raw_shard_id);
}

View File

@@ -34,7 +34,7 @@ namespace msg::usrmsg::json
* Constructs user challenge message json and the challenge string required for
* initial user challenge handshake. This gets called when a user establishes
* a web socket connection to HP.
*
*
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
@@ -84,7 +84,7 @@ namespace msg::usrmsg::json
/**
* Constructs server challenge response message json. This gets sent when we receive
* a challenge from the user.
*
*
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
@@ -272,7 +272,7 @@ namespace msg::usrmsg::json
* {
* "type": "contract_input_status",
* "status": "<accepted|rejected>",
* "reason": "<reson>",
* "reason": "<reason>",
* "input_hash": "<hex hash of original input signature>",
* "ledger_seq_no": <sequence no of the ledger that the input got included in>,
* "ledger_hash": "<hex hash no of the ledger that the input got included in>"
@@ -325,6 +325,68 @@ namespace msg::usrmsg::json
msg += "\"}";
}
/**
* Constructs a debug_shell response message.
* @param msg Buffer to construct the generated json message string into.
* Message format:
* {
* "type": "debug_shell_response",
* "reply_for": "<corresponding request id>",
* "status": "<accepted|rejected>",
* "content": "<response>"
* "reason": "<reason>",
* }
* @param content The contract binary output content to be put in the message.
*/
void create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason)
{
msg.reserve(content.size() + 256);
msg += "{\"";
msg += msg::usrmsg::FLD_TYPE;
msg += SEP_COLON;
msg += msg::usrmsg::MSGTYPE_DEBUG_SHELL_RESPONSE;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_REPLY_FOR;
msg += SEP_COLON;
msg += reply_for;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_STATUS;
msg += SEP_COLON;
msg += status;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_CONTENT;
msg += SEP_COLON_NOQUOTE;
if (is_json_string(content))
{
// Process the final string using jsoncons.
jsoncons::json jstring = content;
jsoncons::json_options options;
options.escape_all_non_ascii(true);
std::string escaped_content;
jstring.dump(escaped_content);
msg += escaped_content;
}
else
{
msg += content;
}
// Reject reason is only included for rejected inputs.
if (!reason.empty())
{
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_REASON;
msg += SEP_COLON;
msg += reason;
msg += DOUBLE_QUOTE;
}
msg += "}";
}
/**
* Constructs a contract read response message.
* @param msg Buffer to construct the generated json message string into.
@@ -381,7 +443,7 @@ namespace msg::usrmsg::json
* "ledger_hash": "<lcl hash hex>",
* "outputs": ["<output string 1>", "<output string 2>", ...], // The output order is the hash generation order.
* "output_hash": "<hex hash of user's outputs>", [output hash = hash(pubkey+all outputs for the user)]
* "hash_tree": [<hex merkle hash tree>], // Collapsed merkle tree with user's hash element marked as null.
* "hash_tree": [<hex merkle hash tree>], // Collapsed merkle tree with user's hash element marked as null.
* "unl_sig": [["<pubkey hex>", "<sig hex>"], ...] // UNL pubkeys and signatures of root hash.
* }
* @param hash This user's combined output hash. [output hash = hash(pubkey+all outputs for the user)]
@@ -566,12 +628,12 @@ namespace msg::usrmsg::json
* {
* "type": "health_event",
* "event": "proposal" | "connectivity",
*
*
* // proposal
* "comm_latency": {min:0, max:0, avg:0},
* "read_latency": {min:0, max:0, avg:0}
* "batch_size": 0
*
*
* // connectivity
* "peer_count": 0,
* "weakly_connected": true | false
@@ -697,7 +759,7 @@ namespace msg::usrmsg::json
/**
* Verifies the user handshake response with the original challenge issued to the user
* and the user public key contained in the response.
*
*
* @param extracted_pubkeyhex The hex public key extracted from the response.
* @param extracted_protocol The protocol code extracted from the response.
* @param extracted_server_challenge Any server challenge issued by user.
@@ -842,7 +904,7 @@ namespace msg::usrmsg::json
/**
* Extracts a contract read request message sent by user.
*
*
* @param extracted_content The content to be passed to the contract, extracted from the message.
* @param d The json document holding the read request message.
* Accepted signed input container format:
@@ -872,11 +934,43 @@ namespace msg::usrmsg::json
return 0;
}
/**
* Extracts a debug_shell input message sent by user.
*
* @param extracted_content The content to be passed to the, extracted from the message.
* @param d The json document holding the debug_shell input message.
* Accepted signed input container format:
* {
* "type": "debug_shell_request",
* "id": "<any string>",
* "content": "<any string>"
* }
* @return 0 on successful extraction. -1 for failure.
*/
int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d)
{
if (!d.contains(msg::usrmsg::FLD_ID) || !d[msg::usrmsg::FLD_ID].is<std::string>())
{
LOG_DEBUG << "DebugShell input 'id' field missing or invalid.";
return -1;
}
if (!d.contains(msg::usrmsg::FLD_CONTENT) || !d[msg::usrmsg::FLD_CONTENT].is<std::string>())
{
LOG_DEBUG << "DebugShell input 'content' field missing or invalid.";
return -1;
}
extracted_id = d[msg::usrmsg::FLD_ID].as<std::string>();
extracted_content = d[msg::usrmsg::FLD_CONTENT].as<std::string>();
return 0;
}
/**
* Extracts a signed input container message sent by user.
*
*
* @param extracted_input_container The input container extracted from the message.
* @param extracted_sig The binary signature extracted from the message.
* @param extracted_sig The binary signature extracted from the message.
* @param d The json document holding the input container.
* Accepted signed input container format:
* {

View File

@@ -21,6 +21,8 @@ namespace msg::usrmsg::json
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash);
void create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
@@ -47,6 +49,8 @@ namespace msg::usrmsg::json
int extract_read_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d);
int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content, const jsoncons::json &d);
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig,
const jsoncons::json &d);

View File

@@ -80,6 +80,8 @@ namespace msg::usrmsg
constexpr const char *MSGTYPE_SERVER_CHALLENGE_RESPONSE = "server_challenge_response";
constexpr const char *MSGTYPE_CONTRACT_READ_REQUEST = "contract_read_request";
constexpr const char *MSGTYPE_CONTRACT_READ_RESPONSE = "contract_read_response";
constexpr const char *MSGTYPE_DEBUG_SHELL_REQUEST = "debug_shell_request";
constexpr const char *MSGTYPE_DEBUG_SHELL_RESPONSE = "debug_shell_response";
constexpr const char *MSGTYPE_CONTRACT_INPUT = "contract_input";
constexpr const char *MSGTYPE_CONTRACT_INPUT_STATUS = "contract_input_status";
constexpr const char *MSGTYPE_CONTRACT_OUTPUT = "contract_output";
@@ -106,6 +108,9 @@ namespace msg::usrmsg
constexpr const char *REASON_NONCE_EXPIRED = "nonce_expired";
constexpr const char *REASON_ALREADY_SUBMITTED = "already_submitted";
constexpr const char *REASON_ROUND_INPUTS_OVERFLOW = "round_inputs_overflow";
constexpr const char *REASON_USER_NOT_ALLOWED = "user_not_allowed";
constexpr const char *REASON_NOT_INITIALIZED = "not_initialized";
constexpr const char *REASON_INTERNAL_ERROR = "internal_error";
constexpr const char *QUERY_FILTER_BY_SEQ_NO = "seq_no";
constexpr const char *STR_TRUE = "true";
constexpr const char *STR_FALSE = "false";

View File

@@ -38,6 +38,14 @@ namespace msg::usrmsg
busrmsg::create_contract_input_status(msg, status, reason, input_hash, ledger_seq_no, ledger_hash);
}
void usrmsg_parser::create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_debug_shell_response_container(msg, reply_for, status, content, reason);
else
busrmsg::create_debug_shell_response_container(msg, reply_for, status, content, reason);
}
void usrmsg_parser::create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content) const
{
if (protocol == util::PROTOCOL::JSON)
@@ -121,6 +129,14 @@ namespace msg::usrmsg
return busrmsg::extract_read_request(extracted_id, extracted_content, bdoc);
}
int usrmsg_parser::extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content) const
{
if (protocol == util::PROTOCOL::JSON)
return jusrmsg::extract_debug_shell_request(extracted_id, extracted_content, jdoc);
else
return busrmsg::extract_debug_shell_request(extracted_id, extracted_content, bdoc);
}
int usrmsg_parser::extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const
{
if (protocol == util::PROTOCOL::JSON)

View File

@@ -26,6 +26,8 @@ namespace msg::usrmsg
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_hash, const uint64_t ledger_seq_no, const util::h32 &ledger_hash) const;
void create_debug_shell_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view status, std::string_view content, std::string_view reason) const;
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view reply_for, std::string_view content) const;
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view hash, const ::std::vector<std::string> &outputs,
@@ -49,6 +51,8 @@ namespace msg::usrmsg
int extract_read_request(std::string &extracted_id, std::string &extracted_content) const;
int extract_debug_shell_request(std::string &extracted_id, std::string &extracted_content) const;
int extract_signed_input_container(std::string &extracted_input_container, std::string &extracted_sig) const;
int extract_input_container(std::string &input, uint64_t &nonce,

View File

@@ -62,6 +62,7 @@ namespace p2p
std::set<std::string> input_ordered_hashes;
std::string output_hash;
std::string output_sig;
util::sequence_hash lcl_id;
};
struct nonunl_proposal

View File

@@ -91,7 +91,7 @@ namespace sc
ctx.working_dir = contract_fs.physical_path(ctx.args.hpfs_session_name, STATE_DIR_PATH);
// Setup contract output log file paths (for consensus execution only).
if (conf::cfg.contract.log.enable && !ctx.args.readonly)
if (conf::cfg.contract.log.enable && ctx.args.mode == EXECUTION_MODE::CONSENSUS)
{
// We keep appending logs to the same out/err files (Rollout log files are maintained according to the hp config settings).
const std::string prefix = ctx.args.hpfs_session_name;
@@ -122,14 +122,17 @@ namespace sc
// (Note: User socket will only be used for contract output only. For feeding user inputs we are using a memfd.)
if (create_iosockets_for_fdmap(ctx.user_fds, ctx.args.userbufs) == -1 ||
create_iosockets(ctx.control_fds, SOCK_SEQPACKET) == -1 ||
(!ctx.args.readonly && create_iosockets(ctx.npl_fds, SOCK_SEQPACKET) == -1))
(ctx.args.mode != EXECUTION_MODE::READ_REQUEST && create_iosockets(ctx.npl_fds, SOCK_SEQPACKET) == -1))
{
cleanup_fds(ctx);
stop_hpfs_session(ctx);
return -1;
}
LOG_DEBUG << "Starting contract process..." << (ctx.args.readonly ? " (rdonly)" : "");
std::string_view mode = ctx.args.get_exec_mode_str();
LOG_DEBUG << "Starting contract process..."
<< "(" << mode << ")";
int ret = 0;
const pid_t pid = fork();
@@ -157,14 +160,18 @@ namespace sc
if (insert_demarkation_line(ctx) == -1)
{
std::cerr << errno << ": Contract process inserting demarkation line failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Contract process inserting demarkation line failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
// Set process resource limits.
if (set_process_rlimits() == -1)
{
std::cerr << errno << ": Failed to set contract process resource limits." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Failed to set contract process resource limits."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
@@ -195,7 +202,9 @@ namespace sc
if (chdir(ctx.working_dir.c_str()) == -1)
{
std::cerr << errno << ": Contract process chdir failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Contract process chdir failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
@@ -203,27 +212,32 @@ namespace sc
// (Must set gid before setting uid)
if (!conf::cfg.contract.run_as.empty() && (setgid(conf::cfg.contract.run_as.gid) == -1 || setuid(conf::cfg.contract.run_as.uid) == -1))
{
std::cerr << errno << ": Contract process setgid/uid failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Contract process setgid/uid failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
// We do not create logs files in readonly execution due to the difficulty in managing the log file limits.
(conf::cfg.contract.log.enable && !ctx.args.readonly)
(conf::cfg.contract.log.enable && ctx.args.mode == EXECUTION_MODE::CONSENSUS)
? execv_and_redirect_logs(execv_len - 1, (const char **)execv_args, ctx.stdout_file, ctx.stderr_file, (const char **)env_args)
: execve(execv_args[0], execv_args, env_args);
std::cerr << errno << ": Contract process execve() failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Contract process execve() failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
else
{
LOG_ERROR << errno << ": fork() failed when starting contract process." << (ctx.args.readonly ? " (rdonly)" : "");
LOG_ERROR << errno << ": fork() failed when starting contract process."
<< "(" << mode << ")";
ret = -1;
}
cleanup_fds(ctx);
// If the consensus contact finished executing successfully, run the post-exec.sh script if it exists.
if (ctx.exit_success && !ctx.args.readonly && run_post_exec_script(ctx) == -1)
if (ctx.exit_success && ctx.args.mode == EXECUTION_MODE::CONSENSUS && run_post_exec_script(ctx) == -1)
ret = -1;
if (stop_hpfs_session(ctx) == -1)
@@ -283,16 +297,20 @@ namespace sc
else // Child has exited
{
ctx.contract_pid = 0;
std::string_view mode = ctx.args.get_exec_mode_str();
if (WIFEXITED(scstatus))
{
ctx.exit_success = true;
LOG_DEBUG << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended normally.";
LOG_DEBUG << "Contract process ended normally. "
<< "(" << mode << ")";
return 1;
}
else
{
LOG_WARNING << "Contract process" << (ctx.args.readonly ? " (rdonly)" : "") << " ended prematurely. Exit code " << WEXITSTATUS(scstatus);
LOG_WARNING << "Contract process ended prematurely. "
<< "(" << mode << ") "
<< "Exit code " << WEXITSTATUS(scstatus);
return -1;
}
}
@@ -303,11 +321,11 @@ namespace sc
*/
int start_hpfs_session(execution_context &ctx)
{
if (!ctx.args.readonly)
if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST)
ctx.args.hpfs_session_name = hpfs::RW_SESSION_NAME;
return ctx.args.readonly ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false)
: contract_fs.acquire_rw_session();
return ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? contract_fs.start_ro_session(ctx.args.hpfs_session_name, false)
: contract_fs.acquire_rw_session();
}
/**
@@ -315,7 +333,7 @@ namespace sc
*/
int stop_hpfs_session(execution_context &ctx)
{
if (ctx.args.readonly)
if (ctx.args.mode == EXECUTION_MODE::READ_REQUEST)
{
return contract_fs.stop_ro_session(ctx.args.hpfs_session_name);
}
@@ -357,7 +375,7 @@ namespace sc
* "public_key": "<this node's hex public key>",
* "private_key": "<this node's hex private key>",
* "timestamp": <this node's timestamp (unix milliseconds)>,
* "readonly": <true|false>,
* "mode": <consensus|consensus_fallback|read_req>,
* "lcl_seq_no": "<lcl sequence no>",
* "lcl_hex": "<lcl hash hex>",
* "control_fd": fd,
@@ -367,7 +385,7 @@ namespace sc
* "unl":[ "<pkhex>", ... ]
* }
*/
int write_contract_args(const execution_context &ctx, const int user_inputs_fd)
int write_contract_args(execution_context &ctx, const int user_inputs_fd)
{
// Populate the json string with contract args.
// We don't use a JSON parser here because it's lightweight to contrstuct the
@@ -379,15 +397,20 @@ namespace sc
<< "\",\"public_key\":\"" << conf::cfg.node.public_key_hex
<< "\",\"private_key\":\"" << conf::cfg.node.private_key_hex
<< "\",\"timestamp\":" << ctx.args.time
<< ",\"readonly\":" << (ctx.args.readonly ? "true" : "false");
<< ",\"mode\":\"" << ctx.args.get_exec_mode_str() << "\"";
if (!ctx.args.readonly)
if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST)
{
os << ",\"lcl_seq_no\":" << ctx.args.lcl_id.seq_no
<< ",\"lcl_hash\":\"" << util::to_hex(ctx.args.lcl_id.hash.to_string_view())
<< "\",\"npl_fd\":" << ctx.npl_fds.scfd;
}
if (ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK)
{
os << ",\"non_consensus_rounds\":" << ctx.args.non_consensus_rounds;
}
os << ",\"control_fd\":" << ctx.control_fds.scfd;
os << ",\"user_in_fd\":" << user_inputs_fd
@@ -435,11 +458,19 @@ namespace sc
// We record the start time of the monitoring thread to track the contract execution timeout.
const uint64_t start_time = util::get_epoch_milliseconds();
const uint64_t exec_timeout = conf::cfg.contract.round_limits.exec_timeout;
// If this is in fallback mode, we should terminate the contract before the given timeout.
const uint64_t exec_timeout = ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK ? (ctx.args.end_before - start_time) : conf::cfg.contract.round_limits.exec_timeout;
// If this is in fallback mode, check wether we haven't passed the round end. If not, do not execute.
if (ctx.args.mode == EXECUTION_MODE::CONSENSUS_FALLBACK && start_time >= ctx.args.end_before)
{
LOG_INFO << "Contract process end time has passed.";
return;
}
// Prepare output poll fd list.
// User out fds + control fd + NPL fd (NPL fd not available in readonly mode)
const size_t out_fd_count = ctx.user_fds.size() + (ctx.args.readonly ? 1 : 2);
// User out fds + control fd + NPL fd (NPL fd not available in read request mode)
const size_t out_fd_count = ctx.user_fds.size() + (ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 1 : 2);
const size_t control_fd_idx = ctx.user_fds.size();
const size_t npl_fd_idx = control_fd_idx + 1;
struct pollfd out_fds[out_fd_count];
@@ -480,7 +511,7 @@ namespace sc
// Attempt to read messages from contract (regardless of contract terminated or not).
const int control_read_res = read_control_outputs(ctx, out_fds[control_fd_idx]);
const int npl_read_res = ctx.args.readonly ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]);
const int npl_read_res = ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 0 : read_npl_outputs(ctx, &out_fds[npl_fd_idx]);
const int user_read_res = read_contract_fdmap_outputs(ctx.user_fds, out_fds, ctx.args.userbufs);
messages_read = (control_read_res + npl_read_res + user_read_res) > 0;
@@ -495,7 +526,7 @@ namespace sc
{
// Reaching here means the contract is still running. Attempt to write any queued messages to the contract.
const int npl_write_res = ctx.args.readonly ? 0 : write_npl_messages(ctx);
const int npl_write_res = ctx.args.mode == EXECUTION_MODE::READ_REQUEST ? 0 : write_npl_messages(ctx);
if (npl_write_res == -1)
break;
@@ -550,24 +581,31 @@ namespace sc
util::fork_detach();
const std::string script_args = std::to_string(ctx.args.lcl_id.seq_no) + " " + util::to_hex(ctx.args.lcl_id.hash.to_string_view());
std::string_view mode = ctx.args.get_exec_mode_str();
// We set current working dir and pass command line arg to the script.
char *argv[] = {(char *)POST_EXEC_SCRIPT, (char *)script_args.data(), (char *)NULL};
if (chdir(ctx.working_dir.c_str()) == -1)
{
std::cerr << errno << ": Post-exec script chdir failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Post-exec script chdir failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
// Set user execution user/group if specified (Must set gid before setting uid).
if (!conf::cfg.contract.run_as.empty() && (setgid(conf::cfg.contract.run_as.gid) == -1 || setuid(conf::cfg.contract.run_as.uid) == -1))
{
std::cerr << errno << ": Post-exec script setgid/uid failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Post-exec script setgid/uid failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
conf::cfg.contract.log.enable ? execv_and_redirect_logs(2, (const char **)argv, ctx.stdout_file, ctx.stderr_file)
: execv(argv[0], argv);
std::cerr << errno << ": Post-exec script execv() failed." << (ctx.args.readonly ? " (rdonly)" : "") << "\n";
std::cerr << errno << ": Post-exec script execv() failed."
<< "(" << mode << ")"
<< "\n";
exit(1);
}
else if (pid > 0)
@@ -904,7 +942,7 @@ namespace sc
*/
int insert_demarkation_line(execution_context &ctx)
{
if (!conf::cfg.contract.log.enable || ctx.args.readonly)
if (!conf::cfg.contract.log.enable || ctx.args.mode != EXECUTION_MODE::CONSENSUS)
return 0;
// The permissions of a created file are restricted by the process's current umask, so group and world write are always disabled by default.
@@ -1066,7 +1104,7 @@ namespace sc
void close_unused_fds(execution_context &ctx, const bool is_hp)
{
if (!ctx.args.readonly)
if (ctx.args.mode != EXECUTION_MODE::READ_REQUEST)
{
close_unused_socket_fds(is_hp, ctx.npl_fds);
}

View File

@@ -19,6 +19,14 @@ namespace sc
constexpr uint16_t MAX_NPL_MSG_QUEUE_SIZE = 255; // Maximum npl message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
constexpr uint16_t MAX_CONTROL_MSG_QUEUE_SIZE = 255; // Maximum out message queue size, The size passed is rounded to next number in binary sequence 1(1),11(3),111(7),1111(15),11111(31)....
// Contract execution mode.
enum EXECUTION_MODE
{
CONSENSUS = 0,
CONSENSUS_FALLBACK = 1,
READ_REQUEST = 2
};
struct fd_pair
{
int hpfd = -1;
@@ -27,7 +35,7 @@ namespace sc
/**
* Stores contract output message length along with the message. Length is used to construct the message from the stream buffer.
*/
*/
struct contract_output
{
uint32_t message_len = 0;
@@ -67,8 +75,8 @@ namespace sc
*/
struct contract_execution_args
{
// Whether the contract should execute in read only mode (to serve read requests).
bool readonly = false;
// Contract execution mode.
EXECUTION_MODE mode;
// hpfs session name used for this execution.
std::string hpfs_session_name;
@@ -94,12 +102,32 @@ namespace sc
// State hash after execution will be copied to this (not applicable to read only mode).
util::h32 post_execution_state_hash = util::h32_empty;
uint64_t end_before;
// This is for fallback mode, No of times we failed reach the consensus.
uint16_t non_consensus_rounds = 0;
contract_execution_args(util::buffer_store &user_input_store)
: user_input_store(user_input_store),
npl_messages(MAX_NPL_MSG_QUEUE_SIZE),
control_messages(MAX_CONTROL_MSG_QUEUE_SIZE)
{
}
// Convert execution mode to string.
std::string_view get_exec_mode_str()
{
if (mode == EXECUTION_MODE::CONSENSUS_FALLBACK)
{
return "consensus_fallback";
}
else if (mode == EXECUTION_MODE::READ_REQUEST)
{
return "read_req";
}
return "consensus";
}
};
/**
@@ -164,7 +192,7 @@ namespace sc
int stop_hpfs_session(execution_context &ctx);
int write_contract_args(const execution_context &ctx, const int user_inputs_fd);
int write_contract_args(execution_context &ctx, const int user_inputs_fd);
void contract_monitor_loop(execution_context &ctx);

View File

@@ -12,8 +12,9 @@ namespace unl
{
struct node_stat
{
uint32_t time_config = 0; // Roundtime config of this node.
uint64_t active_on = 0; // Latest timestamp we received a proposal from this node.
uint32_t time_config = 0; // Roundtime config of this node.
uint64_t active_on = 0; // Latest timestamp we received a proposal from this node.
util::sequence_hash lcl_id; // Current HotPocket lcl (seq no. and ledger hash hex)
};
std::map<std::string, node_stat> list; // List of binary pubkeys of UNL nodes and their statistics.
@@ -61,7 +62,7 @@ namespace unl
* Check whether the given pubkey is in the unl list.
* @param bin_pubkey Pubkey to check for existence.
* @return Return true if the given pubkey is in the unl list.
*/
*/
bool exists(const std::string &bin_pubkey)
{
std::shared_lock lock(unl_mutex);
@@ -70,7 +71,7 @@ namespace unl
/**
* Replace the unl list from the latest unl list from patch file.
*/
*/
void update_unl_changes_from_patch()
{
bool is_unl_list_changed = false;
@@ -100,6 +101,7 @@ namespace unl
if (itr != list.end())
{
changes_made = true;
itr->second.lcl_id = p.lcl_id;
itr->second.active_on = p.recv_timestamp;
itr->second.time_config = p.time_config;
}
@@ -210,7 +212,9 @@ namespace unl
// Convert binary pubkey into hex.
os << "\"" << util::to_hex(node->first) << "\":{"
<< "\"active_on\":" << node->second.active_on
<< "\"active_on\":" << node->second.active_on << ","
<< "\"lcl_seq_no\":" << node->second.lcl_id.seq_no << ","
<< "\"lcl_hash\":\"" << util::to_hex(node->second.lcl_id.hash.to_string_view()) << "\""
<< "}";
}
os << "}";

View File

@@ -219,7 +219,7 @@ namespace read_req
void initialize_execution_context(const user_read_req &read_request, const pthread_t thread_id, sc::execution_context &contract_ctx)
{
contract_ctx.args.hpfs_session_name = "ro_" + std::to_string(thread_id);
contract_ctx.args.readonly = true;
contract_ctx.args.mode = sc::EXECUTION_MODE::READ_REQUEST;
sc::contract_iobufs user_bufs;
user_bufs.inputs.push_back(read_request.content);
contract_ctx.args.userbufs.try_emplace(read_request.pubkey, std::move(user_bufs));

View File

@@ -17,6 +17,7 @@
#include "user_input.hpp"
#include "read_req.hpp"
#include "input_nonce_map.hpp"
#include "../debug_shell/debug_shell.hpp"
namespace usr
{
@@ -204,7 +205,7 @@ namespace usr
const int nonce_status = nonce_map.check(user.pubkey, nonce, sig, max_ledger_seq_no, true);
if (nonce_status == 0)
{
//Add to the submitted input list.
// Add to the submitted input list.
user.submitted_inputs.push_back(submitted_user_input{
std::move(input_container),
std::move(sig),
@@ -272,6 +273,37 @@ namespace usr
user.session.send(resp);
return 0;
}
else if (msg_type == msg::usrmsg::MSGTYPE_DEBUG_SHELL_REQUEST)
{
std::string id, content;
if (parser.extract_debug_shell_request(id, content) == -1)
{
send_input_status(parser, user.session, msg::usrmsg::STATUS_REJECTED, msg::usrmsg::REASON_BAD_MSG_FORMAT, "");
return -1;
}
// If debug_shell is initialized, send status reject.
if (!debug_shell::ctx.is_initialized)
{
send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_NOT_INITIALIZED);
return -1;
}
const int res = debug_shell::execute(id, user.pubkey, content);
// Send user npt allowed status if not allowed.
if (res == -1)
{
send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_INTERNAL_ERROR);
return -1;
}
else if (res == -2)
{
send_debug_shell_response(parser, user.session, id, msg::usrmsg::STATUS_REJECTED, "", msg::usrmsg::REASON_USER_NOT_ALLOWED);
return -1;
}
return 0;
}
else
{
LOG_DEBUG << "Invalid user message type: " << msg_type;
@@ -333,6 +365,17 @@ namespace usr
}
}
/**
* Send the specified debug_shell request status result via the provided session.
*/
void send_debug_shell_response(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view reply_for,
std::string_view status, std::string_view content, std::string_view reason)
{
std::vector<uint8_t> msg;
parser.create_debug_shell_response_container(msg, reply_for, status, content, reason);
session.send(msg);
}
/**
* Send the specified contract input status result via the provided session.
*/
@@ -348,7 +391,7 @@ namespace usr
/**
* Adds the user denoted by specified session id and public key to the global authed user list.
* This should get called after the challenge handshake is verified.
*
*
* @param session User socket session.
* @param user_pubkey_hex User's hex public key.
* @param protocol_code Messaging protocol used by user.
@@ -397,14 +440,20 @@ namespace usr
/**
* Removes the specified public key from the global user list.
* This must get called when an authenticated user disconnects from HP.
*
*
* @param pubkey User pubkey.
* @return 0 on successful removals. -1 on failure.
*/
int remove_user(const std::string &pubkey)
{
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
ctx.users.erase(pubkey);
{
std::scoped_lock<std::mutex> lock(ctx.users_mutex);
ctx.users.erase(pubkey);
}
// Remove any debug_shell commands sent by the user.
if (debug_shell::ctx.is_initialized)
debug_shell::remove_user_commands(pubkey);
return 0;
}

View File

@@ -95,6 +95,9 @@ namespace usr
void send_input_status_responses(const std::unordered_map<std::string, std::vector<input_status_response>> &responses,
const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty);
void send_debug_shell_response(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session, std::string_view reply_for,
std::string_view status, std::string_view content, std::string_view reason = "");
void send_input_status(const msg::usrmsg::usrmsg_parser &parser, usr::user_comm_session &session,
std::string_view status, std::string_view reason, std::string_view input_hash,
const uint64_t ledger_seq_no = 0, const util::h32 &ledger_hash = util::h32_empty);

View File

@@ -6,10 +6,10 @@
namespace version
{
// HotPocket version. Written to new configs and p2p/user messages.
constexpr const char *HP_VERSION = "0.6.4";
constexpr const char *HP_VERSION = "0.6.5";
// Minimum compatible config version (this will be used to validate configs).
constexpr const char *MIN_CONFIG_VERSION = "0.6.3";
constexpr const char *MIN_CONFIG_VERSION = "0.6.5";
// Ledger file storage version. All nodes in a cluster MUST use the same ledger version.
constexpr const char *LEDGER_VERSION = "0.6.3";

BIN
test/bin/hpsh Executable file

Binary file not shown.

View File

@@ -13,6 +13,6 @@ RUN apt-get update \
&& rm -rf /var/lib/apt/lists/* \
&& mkdir /usr/local/bin/hotpocket
COPY hpcore hpfs hpws evernode-license.pdf /usr/local/bin/hotpocket/
COPY hpcore hpfs hpws hpsh evernode-license.pdf /usr/local/bin/hotpocket/
ENTRYPOINT ["/usr/local/bin/hotpocket/hpcore"]

View File

@@ -8,7 +8,7 @@ njsfile="Dockerfile.ubt.20.04-njs"
# Prepare build context
tmp=$(mktemp -d)
cp $hpcoredir/build/hpcore $hpcoredir/test/bin/{hpfs,hpws,libblake3.so} $hpcoredir/evernode-license.pdf $tmp/
cp $hpcoredir/build/hpcore $hpcoredir/test/bin/{hpfs,hpws,hpsh,libblake3.so} $hpcoredir/evernode-license.pdf $tmp/
strip $tmp/hpcore
# Remove the revision component from hp version to make up the image version.

View File

@@ -2,7 +2,7 @@
FROM evernode/hotpocket:latest-ubt.20.04-njs.20
# Copy (overwrite) the local build outputs into the docker image.
COPY hpcore hpfs hpws evernode-license.pdf /usr/local/bin/hotpocket/
COPY hpcore hpfs hpws hpsh evernode-license.pdf /usr/local/bin/hotpocket/
ENTRYPOINT ["/usr/local/bin/hotpocket/hpcore"]

View File

@@ -16,6 +16,7 @@ fi
ncount=$1
loglevel=$2
roundtime=$3
fallback_enabled=$4
hpcore=$(realpath ../..)
iprange="172.1.1"
@@ -51,7 +52,7 @@ elif [ "$CONTRACT" = "diag" ]; then # Diagnostic contract
else # nodejs echo contract (default)
echo "Using nodejs echo contract."
pushd $hpcore/examples/nodejs_contract/ > /dev/null 2>&1
npm install
npm install
npm run build-echo
popd > /dev/null 2>&1
copyfiles="$hpcore/examples/nodejs_contract/dist/echo-contract/index.js"
@@ -65,6 +66,9 @@ fi
if [ "$roundtime" = "" ]; then
roundtime=1000
fi
if [ "$fallback_enabled" != "true" ]; then
fallback_enabled=false
fi
# Delete and recreate 'hpcluster' directory.
sudo rm -rf hpcluster > /dev/null 2>&1
@@ -115,7 +119,10 @@ do
consensus: { \
...require('./tmp.json').contract.consensus, \
mode: 'public', \
roundtime: $roundtime \
roundtime: $roundtime,\
fallback: { \
execute: $fallback_enabled \
} \
}, \
npl: { \
mode: 'public' \

View File

@@ -14,7 +14,7 @@ fi
clusterloc=$(pwd)/hpcluster
n=$1
hpversion=0.6.4
hpversion=0.6.5
let pubport=8080+$n
let peerport=22860+$n

View File

@@ -5,7 +5,7 @@ WINDOWSIZE=60 # size of window in seconds to examine for successful consensus ro
PIPE=concon.pipe
clusterloc=$(pwd)/hpcluster
n=1
hpversion=0.6.4
hpversion=0.6.5
let pubport=8080+$n
while true; do

View File

@@ -3,7 +3,7 @@
clusterloc=$(pwd)/hpcluster
n=1
hpversion=0.6.4
hpversion=0.6.5
let pubport=8080+$n
while true; do
CONSENSUS="0"

View File

@@ -15,7 +15,7 @@ fi
dir=$(realpath $1)
dirname=$(basename $dir)
n=$1
hpversion=0.6.4
hpversion=0.6.5
let pubport=8080