diff --git a/examples/rndcontract/rnd_contract b/examples/rndcontract/rnd_contract new file mode 100755 index 00000000..97e39640 Binary files /dev/null and b/examples/rndcontract/rnd_contract differ diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 9b909c2b..16162cde 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -186,8 +186,9 @@ void consensus() } if (is_lcl_desync) { - //We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time. - LOG_DBG << "time off: " << std::to_string(ctx.reset_time); + //Resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time. + //todo: This might not needed with current synchronization with network clock. + // LOG_DBG << "time off: " << std::to_string(ctx.reset_time); timewait_stage(true, ctx.reset_time); const uint16_t decrement = rand() % (conf::cfg.roundtime / 40); @@ -219,19 +220,60 @@ void consensus() apply_ledger(stg_prop); ctx.reset_time = MAX_RESET_TIME; - // We have finished a consensus round (all 4 stages). - LOG_INFO << "****Stage 3 consensus reached**** (state:" << *reinterpret_cast(cons::ctx.curr_hash_state.c_str()) << ")"; + // node has finished a consensus round (all 4 stages). + LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl + << " state:" << *reinterpret_cast(cons::ctx.curr_hash_state.c_str()) << ")"; } } } - // We have finished a consensus stage. - + // Node has finished a consensus stage. // Transition to next stage. ctx.stage = (ctx.stage + 1) % 4; - // after a stage proposal we will just busy wait for proposals. - util::sleep(conf::cfg.roundtime / 4); + //Here nodes try to synchronise nodes stages using network clock. + uint64_t now = util::get_epoch_milliseconds(); + + // round start is the floor + uint64_t round_start = ((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime; + + uint64_t next_stage_start = 0; + + // Compute start time of next stage. + // Last stage (stage 3) waiting twice as any other stage's waiting time. + // This is for a node to catch up from lcl/state desync, + // becuase we are waiting (round time + time to next stage) to sync. + if (ctx.stage == 3) + next_stage_start = round_start + conf::cfg.roundtime; + else + next_stage_start = round_start + (int64_t)(ctx.stage * ((double)conf::cfg.roundtime / 5.0)); + + // Compute stage time wait. + // Node wait between stages to collect enough proposals from previous stages from other nodes. + int64_t to_wait = next_stage_start - now; + + LOG_DBG << "now = " << now << ", roundtime = " << conf::cfg.roundtime << ", round_start = " << round_start << ", next_stage_start = " << next_stage_start << ", to_wait = " << to_wait; + + // If a node doesn't have enough time (due to network delay) to recieve/send reliable stage proposals for next stage, + // it will continue particapating in this round, otherwise will join in next round(s). + if (to_wait < floor(conf::cfg.roundtime / 10)) //todo: self claculating/adjusting network delay + { + uint64_t next_round = round_start; + while (to_wait < floor(conf::cfg.roundtime / 10)) + { + next_round += conf::cfg.roundtime; + to_wait = next_round - now; + } + + LOG_INFO << "we missed a round, waiting " << to_wait << " and resetting to stage 0"; + ctx.stage = 0; + util::sleep(to_wait); + } + else + { + // after a stage proposal we will just busy wait for proposals. + util::sleep(to_wait); + } } /** @@ -411,7 +453,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) // time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement. int32_t highest_time_vote = 0; - for(auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr) + for (auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr) { const uint64_t time = itr->first; const int32_t numvotes = itr->second; @@ -872,4 +914,4 @@ void increment(std::map &counter, const T &candidate) counter.try_emplace(candidate, 1); } -} // namespace cons \ No newline at end of file +} // namespace cons diff --git a/src/cons/state_handler.cpp b/src/cons/state_handler.cpp index dca12bde..56442a22 100644 --- a/src/cons/state_handler.cpp +++ b/src/cons/state_handler.cpp @@ -12,11 +12,9 @@ namespace cons { // Max number of requests that can be awaiting response at any given time. -constexpr uint16_t MAX_AWAITING_REQUESTS = 4; +constexpr uint16_t MAX_AWAITING_REQUESTS = 1; // Syncing loop sleep delay. -constexpr uint16_t SYNC_LOOP_WAIT = 50; -// No of loop cycles to wait for a response before resubmitting request. -constexpr uint16_t MAX_AWAITING_CYCLES = 1000 / SYNC_LOOP_WAIT; +constexpr uint16_t SYNC_LOOP_WAIT = 100; // List of state responses flatbuffer messages to be processed. std::list candidate_state_responses; @@ -37,7 +35,7 @@ void request_state_from_peer(const std::string &path, const bool is_file, const p2p::peer_outbound_message msg(std::make_unique(1024)); fbschema::p2pmsg::create_msg_from_state_request(msg.builder(), sr, lcl); - p2p::send_message_to_random_peer(msg); + p2p::send_message_to_random_peer(msg); //todo: send to a node that hold the majority state to improve reliability of retrieving state. } int create_state_response(p2p::peer_outbound_message &msg, const p2p::state_request &sr) @@ -159,7 +157,8 @@ int run_state_sync_iterator() // Check for long-awaited responses and re-request them. for (auto &[hash, request] : submitted_requests) { - if (request.waiting_cycles < MAX_AWAITING_CYCLES) + // We wait for half of round time before each request is resubmitted. + if (request.waiting_cycles < (conf::cfg.roundtime / (SYNC_LOOP_WAIT * 2))) { // Increment counter. request.waiting_cycles++; diff --git a/test/vm-cluster/cluster.sh b/test/vm-cluster/cluster.sh index 4ddbbdb7..02b4d1e6 100755 --- a/test/vm-cluster/cluster.sh +++ b/test/vm-cluster/cluster.sh @@ -7,17 +7,26 @@ mode=$1 hpcore=$(realpath ../..) -if [ "$mode" = "new" ] || [ "$mode" = "run" ] || [ "$mode" = "update" ]; then +if [ "$mode" = "new" ] || [ "$mode" = "run" ] || [ "$mode" = "update" ] || [ "$mode" = "kill" ]; then echo "" else - echo "Invalid command. new | run | update expected." + echo "Invalid command. new | run | update | kill expected." exit 1 fi if [ $mode = "run" ]; then let nodeid=$2-1 vmip=${vmips[$nodeid]} - sshpass -p $vmpass ssh geveo@$vmip 'sudo ./hpcore run contract' + sshpass -p $vmpass ssh geveo@$vmip 'nohup sudo ./hpcore run contract' + sshpass -p $vmpass ssh geveo@$vmip 'tail -f nohup.out' + exit 0 +fi + +if [ $mode = "kill" ]; then + let nodeid=$2-1 + vmip=${vmips[$nodeid]} + sshpass -p $vmpass ssh geveo@$vmip 'sudo kill $(pidof hpcore)' + sshpass -p $vmpass ssh geveo@$vmip 'sudo kill $(pidof hpstatemon)' exit 0 fi @@ -80,7 +89,7 @@ do mypeers=$(joinarr peers $j) myunl=$(joinarr pubkeys $j) - node -p "JSON.stringify({...require('./cfg/node$n.json'),binary:'/usr/bin/node',binargs:'/home/geveo/contract.js',peers:${mypeers},unl:${myunl}}, null, 2)" > ./cfg/node$n.cfg + node -p "JSON.stringify({...require('./cfg/node$n.json'),binary:'/usr/bin/node',binargs:'/home/geveo/contract.js',peers:${mypeers},unl:${myunl},loggers:['console', 'file']}, null, 2)" > ./cfg/node$n.cfg # Copy local cfg file back to remote vm. vmip=${vmips[j]} diff --git a/test/vm-cluster/consensus-test-continuous.sh b/test/vm-cluster/consensus-test-continuous.sh index e0c04084..df7ea450 100644 --- a/test/vm-cluster/consensus-test-continuous.sh +++ b/test/vm-cluster/consensus-test-continuous.sh @@ -20,7 +20,7 @@ while true; do echo 'starting ...' STARTTIME=`date +%s` - nohup ~/hpcore run ~/contract > $PIPE 2>> $PIPE 3>MARKER & + nohup sudo ~/hpcore run ~/contract > $PIPE 2>> $PIPE 3>MARKER & PID=$! sleep 1 diff --git a/test/vm-cluster/setup-vm.sh b/test/vm-cluster/setup-vm.sh index 1f8a6663..6f2a9322 100755 --- a/test/vm-cluster/setup-vm.sh +++ b/test/vm-cluster/setup-vm.sh @@ -13,6 +13,7 @@ if [ $mode = "new" ]; then sshpass -p $vmpass scp $hpcore/build/hpcore \ $hpcore/build/hpstatemon \ $hpcore/examples/echocontract/contract.js \ + $hpcore/examples/rndcontract/rnd_contract \ /usr/local/lib/x86_64-linux-gnu/libfuse3.so.3 \ /usr/local/bin/fusermount3 \ ./consensus-test-continuous.sh \ @@ -25,5 +26,7 @@ else sshpass -p $vmpass scp $hpcore/build/hpcore \ $hpcore/build/hpstatemon \ $hpcore/examples/echocontract/contract.js \ + $hpcore/examples/rndcontract/rnd_contract \ + ./consensus-test-continuous.sh \ geveo@$vmip:~/ fi \ No newline at end of file