diff --git a/reset.sh b/reset.sh index c43e756d..8eb87bf6 100755 --- a/reset.sh +++ b/reset.sh @@ -1,12 +1,13 @@ #!/bin/bash -sudo ./cluster-create.sh 5 -sudo mkdir -p /home/geveodev/hpcore/hpcluster/node1/statehist/0/data -sudo mkdir -p /home/geveodev/hpcore/hpcluster/node2/statehist/0/data/ -sudo mkdir -p /home/geveodev/hpcore/hpcluster/node3/statehist/0/data -sudo mkdir -p /home/geveodev/hpcore/hpcluster/node4/statehist/0/data/ +nodes=3 +sudo ./cluster-create.sh $nodes -sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node1/statehist/0/data -sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node2/statehist/0/data -sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node3/statehist/0/data -sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node4/statehist/0/data +# Setup initial state data for all nodes but one. +for (( i=1; i<$nodes; i++ )) +do + + sudo mkdir -p ~/hpcore/hpcluster/node$i/statehist/0/data/ + #sudo cp -r ~/Downloads/big.mkv ~/hpcore/hpcluster/node$i/statehist/0/data/ + +done \ No newline at end of file diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 01ce0940..9b909c2b 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -29,6 +29,7 @@ constexpr float STAGE1_THRESHOLD = 0.5; constexpr float STAGE2_THRESHOLD = 0.65; constexpr float STAGE3_THRESHOLD = 0.8; constexpr float MAJORITY_THRESHOLD = 0.8; +constexpr uint16_t MAX_RESET_TIME = 200; consensus_context ctx; @@ -68,6 +69,7 @@ int init() }); ctx.prev_close_time = util::get_epoch_milliseconds(); + ctx.reset_time = MAX_RESET_TIME; return 0; } @@ -142,6 +144,7 @@ void consensus() << " hinp:" << proposal.hash_inputs.size() << " hout:" << proposal.hash_outputs.size() << " lcl:" << proposal.lcl + << " state:" << *reinterpret_cast(proposal.curr_hash_state.c_str()) << " self:" << self << "\n"; } @@ -162,28 +165,37 @@ void consensus() // check if we're ahead/behind of consensus lcl bool is_lcl_desync, should_request_history; std::string majority_lcl; - uint64_t time_off = 0; - check_lcl_votes(is_lcl_desync, should_request_history, time_off, majority_lcl, votes); + check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); if (should_request_history) { - //create history request message and request history from a random peer. - send_ledger_history_request(ctx.lcl, majority_lcl); + //handle minority going forward when boostrapping cluster. + //Here we are mimicking invalid min ledger scenario. + if (majority_lcl == GENESIS_LEDGER) + { + last_requested_lcl = majority_lcl; + p2p::history_response res; + res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + handle_ledger_history_response(std::move(res)); + } + else + { + //create history request message and request history from a random peer. + send_ledger_history_request(ctx.lcl, majority_lcl); + } } if (is_lcl_desync) { - int64_t diff = 0; - if (time_off > ctx.time_now) - diff = time_off - ctx.time_now; - - else - diff = ctx.time_now - time_off; //We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time. - //this might not make sense now after stage 1 now since we are applying a stage time resolution?. + LOG_DBG << "time off: " << std::to_string(ctx.reset_time); + timewait_stage(true, ctx.reset_time); + const uint16_t decrement = rand() % (conf::cfg.roundtime / 40); + + if (decrement > ctx.reset_time) + ctx.reset_time = MAX_RESET_TIME; + else + ctx.reset_time -= decrement; - LOG_DBG << "time off: " << std::to_string(diff); - timewait_stage(true, diff); - //LOG_DBG << "time off: " << std::to_string(time_off); return; } else @@ -205,9 +217,10 @@ void consensus() { ctx.prev_close_time = stg_prop.time; apply_ledger(stg_prop); + ctx.reset_time = MAX_RESET_TIME; // We have finished a consensus round (all 4 stages). - LOG_INFO << "****Stage 3 consensus reached****"; + LOG_INFO << "****Stage 3 consensus reached**** (state:" << *reinterpret_cast(cons::ctx.curr_hash_state.c_str()) << ")"; } } } @@ -396,10 +409,13 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) if (numvotes >= vote_threshold) stg_prop.hash_outputs.emplace(hash); - // time is voted on a simple sorted and majority basis, since there will always be disagreement. + // time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement. int32_t highest_time_vote = 0; - for (const auto [time, numvotes] : votes.time) + for(auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr) { + const uint64_t time = itr->first; + const int32_t numvotes = itr->second; + if (numvotes > highest_time_vote) { highest_time_vote = numvotes; @@ -407,12 +423,10 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) } } - //todo:apply a round time resolution to increase close time reliability(for stage 1,2) if (ctx.stage == 3) - get_ledger_time_resolution(stg_prop.time); - + stg_prop.time = get_ledger_time_resolution(stg_prop.time); else - get_stage_time_resolution(stg_prop.time); + stg_prop.time = get_stage_time_resolution(stg_prop.time); return stg_prop; } @@ -476,7 +490,7 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority /** * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. */ -void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes) +void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes) { int32_t total_lcl_votes = 0; @@ -488,11 +502,6 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti increment(votes.lcl, cp.lcl); total_lcl_votes++; } - - //keep track of max time of peers, so we can reset nodes in a random time range to increase reliability. - //This is very useful especially boostrapping a node cluster. - if (cp.time > time_off) - time_off = cp.time; } is_desync = false; @@ -537,7 +546,7 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti if (winning_votes < MAJORITY_THRESHOLD * ctx.candidate_proposals.size()) { // potential fork condition. - LOG_WARN << "No consensus on lcl. Possible fork condition. won:" << std::to_string(winning_votes) << " total:" << std::to_string(ctx.candidate_proposals.size()); + LOG_DBG << "No consensus on lcl. Possible fork condition. won:" << std::to_string(winning_votes) << " total:" << std::to_string(ctx.candidate_proposals.size()); is_desync = true; return; } @@ -738,20 +747,22 @@ void check_state(vote_counter &votes) { if (ctx.state_sync_lcl != ctx.lcl) { - LOG_DBG << "State mismatch. Starting state sync..."; - // Change the mode to passive and not sending out proposals till the state is synced conf::change_operating_mode(conf::OPERATING_MODE::OBSERVING); const hasher::B2H majority_state_hash = *reinterpret_cast(majority_state.c_str()); + LOG_INFO << "Starting state sync. Curr state:" << *reinterpret_cast(ctx.curr_hash_state.c_str()) << " majority:" << majority_state_hash; + start_state_sync(majority_state_hash); ctx.is_state_syncing = true; ctx.state_sync_lcl = ctx.lcl; } } - else if (majority_state == ctx.curr_hash_state) + else if (majority_state == ctx.curr_hash_state && ctx.is_state_syncing) { + LOG_INFO << "State sync complete. state:" << *reinterpret_cast(ctx.curr_hash_state.c_str()); + ctx.is_state_syncing = false; ctx.state_sync_lcl.clear(); conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING); diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index aa421331..7b99173f 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -69,11 +69,11 @@ struct consensus_context util::rollover_hashset recent_userinput_hashes; - uint8_t stage; - uint64_t novel_proposal_time; - uint64_t time_now; + uint8_t stage = 0; + uint64_t novel_proposal_time = 0; + uint64_t time_now = 0; std::string lcl; - uint64_t led_seq_no; + uint64_t led_seq_no = 0; std::string curr_hash_state; std::string prev_hash_state; @@ -83,9 +83,11 @@ struct consensus_context //We will use this to track lcls related logic.- track state, lcl request, response. std::map cache; //ledger close time of previous hash - uint64_t prev_close_time; - bool is_state_syncing; + uint64_t prev_close_time = 0; + uint16_t reset_time = 0; + + bool is_state_syncing = false; std::string state_sync_lcl; std::thread state_syncing_thread; std::mutex state_syncing_mutex; @@ -124,7 +126,7 @@ void broadcast_proposal(const p2p::proposal &p); void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes); -void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes); +void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); float_t get_stage_threshold(const uint8_t stage); diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index f0593a5d..3bb02c2c 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -206,7 +206,7 @@ const ledger_history load_ledger() if (ldg_hist.cache.empty()) { ldg_hist.led_seq_no = 0; - ldg_hist.lcl = "0-genesis"; + ldg_hist.lcl = GENESIS_LEDGER; } else { @@ -467,15 +467,11 @@ void handle_ledger_history_response(const p2p::history_response &hr) } last_requested_lcl = ""; - - const auto latest_lcl_itr = cons::ctx.cache.rbegin(); - cons::ctx.lcl = latest_lcl_itr->second.lcl; - cons::ctx.led_seq_no = latest_lcl_itr->first; if (cons::ctx.cache.empty()) { cons::ctx.led_seq_no = 0; - cons::ctx.lcl = "0-genesis"; + cons::ctx.lcl = GENESIS_LEDGER; } else { diff --git a/src/cons/ledger_handler.hpp b/src/cons/ledger_handler.hpp index 5da6efc3..8d79b22b 100644 --- a/src/cons/ledger_handler.hpp +++ b/src/cons/ledger_handler.hpp @@ -9,7 +9,7 @@ namespace cons //max ledger count constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; - +constexpr const char* GENESIS_LEDGER = "0-genesis"; struct ledger_cache { std::string lcl; diff --git a/src/cons/state_handler.cpp b/src/cons/state_handler.cpp index b4f2256c..b25a42ea 100644 --- a/src/cons/state_handler.cpp +++ b/src/cons/state_handler.cpp @@ -12,7 +12,7 @@ namespace cons { constexpr uint16_t MAX_AWAITING_REQUESTS = 1; -constexpr uint16_t MAX_RESPONSE_WAIT_CYCLES = 100; +constexpr uint16_t MAX_RESPONSE_WAIT_CYCLES = 10; // List of state responses flatbuffer messages to be processed. std::list candidate_state_responses; @@ -80,8 +80,6 @@ int create_state_response(p2p::peer_outbound_message &msg, const p2p::state_requ void start_state_sync(const hasher::B2H state_hash_to_request) { - std::cout << "start_state_sync() " << state_hash_to_request << "\n"; - { std::lock_guard lock(p2p::ctx.collected_msgs.state_response_mutex); p2p::ctx.collected_msgs.state_response.clear(); @@ -127,10 +125,7 @@ int run_state_sync_iterator() hasher::B2H response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash()); const auto pending_resp_itr = submitted_requests.find(response_hash); if (pending_resp_itr == submitted_requests.end()) - { - std::cout << "Ignoring state response.\n"; continue; - } // Now that we have received matching hash, remove it from the waiting list. submitted_requests.erase(pending_resp_itr); @@ -169,7 +164,7 @@ int run_state_sync_iterator() { // Reset the counter and re-submit request. request.waiting_cycles = 0; - std::cout << "Resubmit state request\n"; + LOG_DBG << "Resubmitting state request..."; submit_request(request); } } @@ -192,7 +187,7 @@ int run_state_sync_iterator() void submit_request(const backlog_item &request) { - std::cout << "Submitting state request. type: " << request.type << " path:" << request.path << " blockid: " << request.block_id << "\n"; + LOG_DBG << "Submitting state request. type:" << request.type << " path:" << request.path << " blockid:" << request.block_id; submitted_requests.try_emplace(request.expected_hash, request); @@ -202,14 +197,9 @@ void submit_request(const backlog_item &request) int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp) { - std::cout << "Recieved state fs entry response\n"; - std::unordered_map state_fs_entry_list; fbschema::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(state_fs_entry_list, fs_entry_resp->entries()); - for (const auto [a, b] : state_fs_entry_list) - std::cout << "Recieved fsentry: " << a << "\n"; - std::unordered_map existing_fs_entries; std::string_view root_path_sv = fbschema::flatbuff_str_to_sv(fs_entry_resp->path()); std::string root_path_str(root_path_sv.data(), root_path_sv.size()); @@ -227,12 +217,9 @@ int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry // Request more info on fs entries that exist on both sides but are different. for (const auto &[path, fs_entry] : existing_fs_entries) { - std::cout << "Existing path :" << path << std::endl; const auto fs_itr = state_fs_entry_list.find(path); if (fs_itr != state_fs_entry_list.end()) { - std::cout << "Existing fs_entry_hash :" << fs_entry.hash << std::endl; - std::cout << "Recieved fs_entry_hash :" << fs_itr->second.hash << std::endl; if (fs_itr->second.hash != fs_entry.hash) { if (fs_entry.is_file) @@ -276,8 +263,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response * std::string_view path_sv = fbschema::flatbuff_str_to_sv(file_resp->path()); const std::string path_str(path_sv.data(), path_sv.size()); - std::cout << "Recieved file hash map of " << path_str << std::endl; - std::vector existing_block_hashmap; if (statefs::get_block_hash_map(existing_block_hashmap, path_str, hasher::B2H_empty) == -1) return -1; @@ -288,9 +273,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response * const hasher::B2H *resp_hashes = reinterpret_cast(file_resp->hash_map()->data()); auto resp_hash_count = file_resp->hash_map()->size() / hasher::HASH_SIZE; - std::cout << "Reieved file hashmap size :" << file_resp->hash_map()->size() << std::endl; - std::cout << "Existing file hashmap size :" << existing_block_hashmap.size() << std::endl; - auto insert_itr = pending_requests.begin(); for (int block_id = 0; block_id < existing_hash_count; ++block_id) @@ -300,7 +282,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response * if (existing_hashes[block_id] != resp_hashes[block_id]) { - std::cout << "Mismatch in file block :" << block_id << std::endl; // Insert at front to give priority to block requests while preserving block order. pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]}); } @@ -315,7 +296,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response * { for (int block_id = existing_hash_count; block_id < resp_hash_count; ++block_id) { - std::cout << "Missing block: " << block_id << "\n"; // Insert at front to give priority to block requests while preserving block order. pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]}); } @@ -328,8 +308,6 @@ int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg { p2p::block_response block_resp = fbschema::p2pmsg::create_block_response_from_msg(*block_msg); - std::cout << "Recieved block " << block_resp.block_id << " of " << block_resp.path << "\n"; - if (statefs::write_block(block_resp.path, block_resp.block_id, block_resp.data.data(), block_resp.data.size()) == -1) return -1; diff --git a/src/main.cpp b/src/main.cpp index eee84aa7..eeecadd8 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -64,6 +64,7 @@ int parse_cmd(int argc, char **argv) */ void deinit() { + proc::deinit(); hplog::deinit(); } @@ -115,7 +116,7 @@ void std_terminate() noexcept int main(int argc, char **argv) { //seed rand - srand(time(0)); + srand(util::get_epoch_milliseconds()); // Register exception handler for std exceptions. std::set_terminate(&std_terminate); diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index db915603..16f44c86 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -128,7 +128,6 @@ void peer_session_handler::on_message(sock::socket_session lock(ctx.collected_msgs.state_response_mutex); // Insert state_response with lock. std::string response(reinterpret_cast(content_ptr), content_size); ctx.collected_msgs.state_response.push_back(std::move(response)); diff --git a/src/proc/proc.cpp b/src/proc/proc.cpp index fc9a2d19..835bc94a 100644 --- a/src/proc/proc.cpp +++ b/src/proc/proc.cpp @@ -72,7 +72,7 @@ int exec_contract(const contract_exec_args &args) // Wait for child process (contract process) to complete execution. const int presult = await_process_execution(contract_pid); - LOG_INFO << "Contract process ended."; + LOG_DBG << "Contract process ended."; contract_pid = 0; if (presult != 0) @@ -99,7 +99,7 @@ int exec_contract(const contract_exec_args &args) // Write the contract input message from HotPocket to the stdin (0) of the contract process. write_contract_args(args); - LOG_INFO << "Starting contract process..."; + LOG_DBG << "Starting contract process..."; // Fill process args. char *execv_args[conf::cfg.runtime_binexec_args.size() + 1]; @@ -209,11 +209,7 @@ int stop_state_monitor() if (htreebuilder.generate(statehash) != 0) return -1; - std::string root_hash(reinterpret_cast(&statehash), hasher::HASH_SIZE); - root_hash.swap(cons::ctx.curr_hash_state); - - LOG_DBG << "State hash: " << std::hex << statehash << std::dec; - + cons::ctx.curr_hash_state = std::string(reinterpret_cast(&statehash), hasher::HASH_SIZE); return 0; } @@ -710,4 +706,16 @@ void close_unused_vectorfds(const bool is_hp, std::vector &fds) } } +/** + * Cleanup any running processes. + */ +void deinit() +{ + if (contract_pid > 0) + kill(contract_pid, SIGINT); + + if (statemon_pid > 0) + kill(statemon_pid, SIGINT); +} + } // namespace proc \ No newline at end of file diff --git a/src/proc/proc.hpp b/src/proc/proc.hpp index 194641c1..80a8e9e4 100644 --- a/src/proc/proc.hpp +++ b/src/proc/proc.hpp @@ -78,6 +78,8 @@ struct contract_exec_args int exec_contract(const contract_exec_args &args); +void deinit(); + //------Internal-use functions for this namespace. int await_process_execution(pid_t pid); diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index 392ca8f4..d64a18ea 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -277,6 +277,8 @@ void socket_session::close() template void socket_session::on_close(const error_code ec, const int8_t type) { + sess_handler.on_close(this); + if (type == 1) return; diff --git a/src/statefs/hasher.cpp b/src/statefs/hasher.cpp index 58134082..d9c0a1f3 100644 --- a/src/statefs/hasher.cpp +++ b/src/statefs/hasher.cpp @@ -37,7 +37,7 @@ void B2H::operator^=(const B2H rhs) std::ostream &operator<<(std::ostream &output, const B2H &h) { - output << h.data[0] << h.data[1] << h.data[2] << h.data[3]; + output << h.data[0];// << h.data[1] << h.data[2] << h.data[3]; return output; }