diff --git a/src/consensus.cpp b/src/consensus.cpp index b0ec1a1b..8d89b9a2 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -346,7 +346,7 @@ namespace consensus */ void revise_candidate_proposals() { - // Move over the network proposal collection into a local list. This is to have a private working + // Move over the incoming proposal collection into a local list. This is to have a private working // set for candidate parsing and avoid threading conflicts with network incoming proposals. std::list collected_proposals; { @@ -354,42 +354,64 @@ namespace consensus collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals); } + // Prune incoming proposals if they are older than existing proposal from same node. + { + auto itr = collected_proposals.begin(); + while (itr != collected_proposals.end()) + { + const auto ex_itr = ctx.candidate_proposals.find(itr->pubkey); + if (ex_itr != ctx.candidate_proposals.end()) // There is an existing proposal from same node. + { + const p2p::proposal &existing = ex_itr->second; + if ((itr->time == existing.time && itr->stage < existing.stage) || // Existing proposal is from a newer stage in same round. + (itr->time < existing.time)) // Existing proposal is from an newer round + { + collected_proposals.erase(itr++); // Erase the incoming proposal as it is older. + continue; + } + } + + ++itr; + } + } + // Provide latest roundtime information to unl statistics. unl::update_roundtime_stats(collected_proposals); // Move collected propsals to candidate set of proposals. - // Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes. - for (const auto &proposal : collected_proposals) + for (const auto &p : collected_proposals) { - ctx.candidate_proposals.erase(proposal.pubkey); // Erase if already exists. - ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); + ctx.candidate_proposals.erase(p.pubkey); // Erase if already exists. + ctx.candidate_proposals.emplace(p.pubkey, std::move(p)); } - auto itr = ctx.candidate_proposals.begin(); - const uint64_t time_now = util::get_epoch_milliseconds(); - while (itr != ctx.candidate_proposals.end()) + // Prune candidate proposals. { - const p2p::proposal &cp = itr->second; - const int8_t stage_diff = ctx.stage - cp.stage; + auto itr = ctx.candidate_proposals.begin(); + while (itr != ctx.candidate_proposals.end()) + { + const p2p::proposal &cp = itr->second; + const int8_t stage_diff = ctx.stage - cp.stage; - // Only consider this round's proposals which are from previous stage. - const bool keep_candidate = (ctx.round_start_time == cp.time) && (stage_diff == 1); - LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased") - << " [s" << std::to_string(cp.stage) - << "] u/i:" << cp.users.size() - << "/" << cp.input_ordered_hashes.size() - << " ts:" << cp.time - << " state:" << cp.state_hash - << " patch:" << cp.patch_hash - << " lps:" << cp.last_primary_shard_id - << " lbs:" << cp.last_raw_shard_id - << " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]" - << "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)"; + // Only consider this round's proposals which are from previous stage. + const bool keep_candidate = (ctx.round_start_time == cp.time) && (stage_diff == 1); + LOG_DEBUG << (keep_candidate ? "Prop--->" : "Erased") + << " [s" << std::to_string(cp.stage) + << "] u/i:" << cp.users.size() + << "/" << cp.input_ordered_hashes.size() + << " ts:" << cp.time + << " state:" << cp.state_hash + << " patch:" << cp.patch_hash + << " lps:" << cp.last_primary_shard_id + << " lbs:" << cp.last_raw_shard_id + << " [from:" << ((cp.pubkey == conf::cfg.node.public_key) ? "self" : util::to_hex(cp.pubkey).substr(2, 10)) << "]" + << "(" << (cp.recv_timestamp > cp.sent_timestamp ? (cp.recv_timestamp - cp.sent_timestamp) : 0) << "ms)"; - if (keep_candidate) - ++itr; - else - ctx.candidate_proposals.erase(itr++); + if (keep_candidate) + ++itr; + else + ctx.candidate_proposals.erase(itr++); + } } }