Consensus reliability enhancements (#62)

* Implemented going observer mode, fixed genesis lcl retrieval issue and stage closing time.

* Fixed clearing all user output instead of consensed outputs

* Modified waiting time to improve performance.

* Fixed deadlock of waiting for insufficient peers because of recent changes.

* Removed initial waiting time for peer connections to start consensus.
This commit is contained in:
Asanka Indrajith
2019-12-06 05:08:51 -05:00
committed by GitHub
parent 7428d42aad
commit b506b34b4f
6 changed files with 85 additions and 49 deletions

View File

@@ -14,8 +14,8 @@ contract_ctx ctx;
// Global configuration struct exposed to the application.
contract_config cfg;
const static char *MODE_PASSIVE = "passive";
const static char *MODE_ACTIVE = "active";
const static char *MODE_OBSERVING = "observing";
const static char *MODE_PROPOSING = "proposing";
/**
* Loads and initializes the contract config for execution. Must be called once during application startup.
@@ -31,7 +31,7 @@ int init()
if (validate_contract_dir_paths() != 0 || load_config() != 0 || validate_config() != 0)
return -1;
if (cfg.mode == OPERATING_MODE::ACTIVE)
if (cfg.mode == OPERATING_MODE::PROPOSING)
{
// Append self peer to peer list.
const std::string portstr = std::to_string(cfg.peerport);
@@ -91,7 +91,7 @@ int create_contract()
crypto::generate_signing_keys(cfg.pubkey, cfg.seckey);
binpair_to_hex();
cfg.mode = OPERATING_MODE::ACTIVE;
cfg.mode = OPERATING_MODE::PROPOSING;
cfg.listenip = "0.0.0.0";
cfg.peerport = 22860;
cfg.roundtime = 1000;
@@ -202,13 +202,13 @@ int load_config()
// Load up the values into the struct.
if (d["mode"] == MODE_PASSIVE)
cfg.mode = OPERATING_MODE::PASSIVE;
else if (d["mode"] == MODE_ACTIVE)
cfg.mode = OPERATING_MODE::ACTIVE;
if (d["mode"] == MODE_OBSERVING)
cfg.mode = OPERATING_MODE::OBSERVING;
else if (d["mode"] == MODE_PROPOSING)
cfg.mode = OPERATING_MODE::PROPOSING;
else
{
std::cout << "Invalid mode. 'passive' or 'active' expected.\n";
std::cout << "Invalid mode. 'observing' or 'proposing' expected.\n";
return -1;
}
@@ -304,7 +304,7 @@ int save_config()
d.SetObject();
rapidjson::Document::AllocatorType &allocator = d.GetAllocator();
d.AddMember("version", rapidjson::StringRef(util::HP_VERSION), allocator);
d.AddMember("mode", rapidjson::StringRef(cfg.mode == OPERATING_MODE::PASSIVE ? MODE_PASSIVE : MODE_ACTIVE),
d.AddMember("mode", rapidjson::StringRef(cfg.mode == OPERATING_MODE::OBSERVING ? MODE_OBSERVING : MODE_PROPOSING),
allocator);
d.AddMember("pubkeyhex", rapidjson::StringRef(cfg.pubkeyhex.data()), allocator);
@@ -594,4 +594,9 @@ int is_schema_valid(const rapidjson::Document &d)
return 0;
}
void change_operating_mode(const OPERATING_MODE mode)
{
cfg.mode = mode;
}
} // namespace conf

View File

@@ -16,8 +16,8 @@ typedef std::pair<std::string, std::string> ip_port_pair;
// The operating mode of the contract node.
enum OPERATING_MODE
{
PASSIVE = 0, // Observer mode. Only emits NUPs. Does not participate in voting.
ACTIVE = 1 // Consensus participant mode.
OBSERVING = 0, // Observer mode. Only emits NUPs. Does not participate in voting.
PROPOSING = 1 // Consensus participant mode.
};
// Holds contextual information about the currently loaded contract.
@@ -49,7 +49,7 @@ struct contract_config
// Config elements which are loaded from the config file.
OPERATING_MODE mode; // Operating mode of the contract (Passive/Active).
OPERATING_MODE mode; // Operating mode of the contract (Observing/Proposing).
std::string pubkeyhex; // Contract hex public key
std::string seckeyhex; // Contract hex secret key
std::string keytype; // Key generation algorithm used by libsodium
@@ -110,6 +110,8 @@ int binpair_to_hex();
int hexpair_to_bin();
void change_operating_mode(const OPERATING_MODE mode);
} // namespace conf
#endif

View File

@@ -91,11 +91,11 @@ void consensus()
p2p::ctx.collected_msgs.npl_messages.clear();
}
if (ctx.stage == 0) // Stage 0 means begining of a consensus round.
{
if (ctx.stage == 0) // Stage 0 means begining of a consensus round.
{
// Broadcast non-unl proposals (NUP) containing inputs from locally connected users.
broadcast_nonunl_proposal();
util::sleep(conf::cfg.roundtime / 10);
//util::sleep(conf::cfg.roundtime / 10);
// Verify and transfer user inputs from incoming NUPs onto consensus candidate data.
verify_and_populate_candidate_user_inputs();
@@ -106,7 +106,7 @@ void consensus()
}
else // Stage 1, 2, 3
{
std::cout << "Started stage " << std::to_string(ctx.stage) << "\n";
LOG_DBG << "Started stage " << std::to_string(ctx.stage) << "\n";
for (auto &[pubkey, proposal] : ctx.candidate_proposals)
{
bool self = proposal.pubkey == conf::cfg.pubkey;
@@ -128,7 +128,7 @@ void consensus()
check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes);
if (is_stage_desync)
{
timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 10));
timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 20));
return;
}
@@ -145,13 +145,17 @@ void consensus()
}
if (is_lcl_desync)
{
//We are resetting to stage 0 to avoid possible deadlock situations.
//Also we try to converge consensus by trying to reset every node in same time(close time range)
//by resetting node to max close time of candidate list of unl list peers.
timewait_stage(true, (time_off - ctx.time_now));
//We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time.
//this might not make sense now after stage 1 now since we are applying a stage time resolution?.
timewait_stage(true, time_off - ctx.time_now);
//LOG_DBG << "time off: " << std::to_string(time_off);
return;
}
else
{
//Node is in sync with current lcl ->switch to proposing mode.
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING);
}
// In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds.
const p2p::proposal stg_prop = create_stage123_proposal(votes);
@@ -172,11 +176,8 @@ void consensus()
// Transition to next stage.
ctx.stage = (ctx.stage + 1) % 4;
// after a stage 0 novel proposal we will just busy wait for proposals
if (ctx.stage == 0)
util::sleep(conf::cfg.roundtime / 10);
else
util::sleep(conf::cfg.roundtime / 4);
// after a stage proposal we will just busy wait for proposals.
util::sleep(conf::cfg.roundtime / 4);
}
/**
@@ -367,11 +368,14 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
stg_prop.time = time;
}
}
//todo:apply a round time resolution to increase close time reliability(for stage 1,2)
if (ctx.stage == 3)
get_ledger_time_resolution(stg_prop.time);
else
get_stage_time_resolution(stg_prop.time);
return stg_prop;
}
@@ -381,8 +385,8 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
*/
void broadcast_proposal(const p2p::proposal &p)
{
// In passive mode, we do not send out any propopsals.
if (conf::cfg.mode == conf::OPERATING_MODE::PASSIVE)
// In observing mode, we do not send out any proposals.
if (conf::cfg.mode == conf::OPERATING_MODE::OBSERVING)
return;
p2p::peer_outbound_message msg(std::make_shared<flatbuffers::FlatBufferBuilder>(1024));
@@ -447,7 +451,7 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
total_lcl_votes++;
}
//keep track of max time of peers, so we can reset nodes in a close time range to increase reliability.
//keep track of max time of peers, so we can reset nodes in a random time range to increase reliability.
//This is very usefull especially boostrapping a node cluster.
if (cp.time > time_off)
time_off = cp.time;
@@ -460,6 +464,10 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
{
LOG_DBG << "Not enough peers proposing to perform consensus" << std::to_string(total_lcl_votes) << " needed " << std::to_string(0.8 * conf::cfg.unl.size());
is_desync = true;
//Not enough nodes are propsing. So Node is switching to Proposing if it's in observing mode.
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING);
return;
}
@@ -480,6 +488,10 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
{
LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer";
is_desync = true;
//Node is in not sync with current lcl ->switch to observing mode.
conf::change_operating_mode(conf::OPERATING_MODE::OBSERVING);
should_request_history = true;
return;
}
@@ -487,7 +499,6 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
if (winning_votes < 0.8 * ctx.candidate_proposals.size())
{
// potential fork condition.
// critical!!!
LOG_WARN << "No consensus on lcl. Possible fork condition. " << std::to_string(winning_votes) << std::to_string(ctx.candidate_proposals.size());
is_desync = true;
return;
@@ -521,7 +532,6 @@ void timewait_stage(const bool reset, const uint64_t time)
{
if (reset)
{
ctx.candidate_proposals.clear();
ctx.stage = 0;
}
@@ -534,18 +544,33 @@ void timewait_stage(const bool reset, const uint64_t time)
* also ensure it is sufficiently separated from the prior close time.
* @param close_time voted/agreed closed time
*/
const uint64_t get_ledger_time_resolution(uint64_t close_time)
uint64_t get_ledger_time_resolution(const uint64_t time)
{
uint64_t closeResolution = conf::cfg.roundtime / 4;
//todo: change time resolution dynamically.
uint64_t closeResolution = conf::cfg.roundtime / 4;
//todo: change time resolution dynamically.
//When nodes agree often reduce resolution time and increase if they don't.
uint64_t close_time = time;
close_time += (closeResolution / 2);
close_time -= (close_time % closeResolution);
return std::max(close_time, (ctx.prev_close_time + conf::cfg.roundtime));
}
/**
* Calculate the stage time
* Adjusting the stage time based on the current resolution.
* @param stage_time voted/agreed closed time
*/
uint64_t get_stage_time_resolution(const uint64_t time)
{
uint64_t closeResolution = conf::cfg.roundtime / 8;
uint64_t stage_time = time;
stage_time += (closeResolution / 2);
stage_time -= (stage_time % closeResolution);
return stage_time;
}
/**
* Finalize the ledger after consensus.
* @param cons_prop The proposal that reached consensus.
@@ -631,11 +656,11 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop)
user.session->send(usr::user_outbound_message(std::move(msg)));
}
}
// now we can safely delete this candidate output.
ctx.candidate_user_outputs.erase(cu_itr);
}
}
// now we can safely clear our candidate outputs.
ctx.candidate_user_outputs.clear();
}
/**
@@ -673,6 +698,7 @@ void feed_user_inputs_to_contract_bufmap(proc::contract_bufmap_t &bufmap, const
bufpair.inputs.push_back(std::move(inputtofeed));
// Remove the input from the candidate set because we no longer need it.
//LOG_DBG << "candidate input deleted.";
ctx.candidate_user_inputs.erase(itr);
}
}

View File

@@ -119,7 +119,9 @@ float_t get_stage_threshold(const uint8_t stage);
void timewait_stage(const bool reset, const uint64_t time);
const uint64_t get_ledger_time_resolution(uint64_t close_time);
uint64_t get_ledger_time_resolution(const uint64_t time);
uint64_t get_stage_time_resolution(const uint64_t time);
void apply_ledger(const p2p::proposal &proposal);

View File

@@ -256,6 +256,10 @@ bool check_required_lcl_availability(const p2p::history_request &hr)
return false;
}
}
else
{
return false; //Very rare case: node asking for the genisis lcl.
}
return true;
}
@@ -285,14 +289,14 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request &
//eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition.
if (hr.minimum_lcl != itr->second)
{
LOG_DBG << "Invalid minimum ledger. Recieved min hash: "<< min_lcl_hash << " Node hash: " << itr->second;
LOG_DBG << "Invalid minimum ledger. Recieved min hash: " << min_lcl_hash << " Node hash: " << itr->second;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
}
else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence.
else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence.
{
LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: "<< min_lcl_hash;
LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: " << min_lcl_hash;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
@@ -313,7 +317,7 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request &
lcl_list.begin(),
lcl_list.lower_bound(min_seq_no));
//Get raw content of lcls that going to be send.
//Get raw content of lcls that going to be send.
for (auto &[seq_no, lcl_hash] : lcl_list)
{
p2p::history_ledger ledger;

View File

@@ -168,7 +168,7 @@ int main(int argc, char **argv)
hplog::init();
LOG_INFO << "Operating mode: "
<< (conf::cfg.mode == conf::OPERATING_MODE::PASSIVE ? "Passive" : "Active");
<< (conf::cfg.mode == conf::OPERATING_MODE::OBSERVING ? "Observing" : "Proposing");
if (p2p::init() != 0 || usr::init() != 0 || cons::init() != 0)
return -1;
@@ -178,9 +178,6 @@ int main(int argc, char **argv)
// After initializing primary subsystems, register the SIGINT handler.
signal(SIGINT, signal_handler);
//we are waiting for peer to estasblish peer connections.
sleep(10); //todo: replace waiting with a check to peer check.
while (true)
{
cons::consensus();