Refactored consensus to run on a separate thread. (#123)

This commit is contained in:
Savinda Senevirathne
2020-09-18 15:53:15 +05:30
committed by GitHub
parent 180b1ec714
commit 37629471c5
4 changed files with 164 additions and 133 deletions

View File

@@ -22,8 +22,8 @@ namespace cons
{
/**
* Voting thresholds for consensus stages.
*/
* Voting thresholds for consensus stages.
*/
constexpr float STAGE1_THRESHOLD = 0.5;
constexpr float STAGE2_THRESHOLD = 0.65;
constexpr float STAGE3_THRESHOLD = 0.8;
@@ -33,6 +33,11 @@ namespace cons
bool init_success = false;
bool is_shutting_down = false;
// Consensus processing thread.
std::thread consensus_thread;
int init()
{
//load lcl details from lcl history.
@@ -58,28 +63,56 @@ namespace cons
ctx.contract_ctx.args.state_dir = conf::ctx.state_rw_dir;
ctx.contract_ctx.args.readonly = false;
// Starting consensus processing thread.
consensus_thread = std::thread(cons::run_consensus);
init_success = true;
return 0;
}
/**
* Cleanup any resources.
*/
* Cleanup any resources.
*/
void deinit()
{
// Stop the contract if running.
sc::stop(ctx.contract_ctx);
if (init_success)
{
// Making the consensus while loop stop.
is_shutting_down = true;
// Stop the contract if running.
sc::stop(ctx.contract_ctx);
// Joining consensus processing thread.
if (consensus_thread.joinable())
consensus_thread.join();
}
}
int run_consensus()
/**
* Joins the consensus processing thread.
*/
void wait()
{
while (true)
consensus_thread.join();
}
void run_consensus()
{
util::mask_signal();
LOG_INFO << "Consensus processor started.";
while (!is_shutting_down)
{
if (consensus() == -1)
return -1;
{
LOG_ERR << "Consensus thread exited due to an error.";
break;
}
}
return 0;
LOG_INFO << "Consensus processor stopped.";
}
int consensus()

View File

@@ -12,155 +12,156 @@
namespace cons
{
/**
* Represents a contract input that takes part in consensus.
*/
struct candidate_user_input
{
const std::string userpubkey;
const uint64_t maxledgerseqno = 0;
std::string input;
candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno)
: userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno)
/**
* Represents a contract input that takes part in consensus.
*/
struct candidate_user_input
{
}
};
const std::string userpubkey;
const uint64_t maxledgerseqno = 0;
std::string input;
/**
* Represents a contract output that takes part in consensus.
*/
struct candidate_user_output
{
const std::string userpubkey;
std::string output;
candidate_user_input(const std::string userpubkey, const std::string input, const uint64_t maxledgerseqno)
: userpubkey(std::move(userpubkey)), input(std::move(input)), maxledgerseqno(maxledgerseqno)
{
}
};
candidate_user_output(const std::string userpubkey, const std::string output)
: userpubkey(std::move(userpubkey)), output(std::move(output))
/**
* Represents a contract output that takes part in consensus.
*/
struct candidate_user_output
{
}
};
const std::string userpubkey;
std::string output;
/**
* This is used to store consensus information
*/
struct consensus_context
{
// The map of proposals that are being collected as consensus stages are progressing.
// peer public key is the key.
// todo: having a queue of proposals against peer pubkey.
std::unordered_map<std::string, const p2p::proposal> candidate_proposals;
candidate_user_output(const std::string userpubkey, const std::string output)
: userpubkey(std::move(userpubkey)), output(std::move(output))
{
}
};
// The set of npl messages that are being collected as consensus stages are progressing.
std::list<p2p::npl_message> candidate_npl_messages;
// Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round.
std::unordered_set<std::string> candidate_users;
// Map of candidate user inputs with input hash as map key. Inputs will stay here until they
// achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs
// from all users. We will use this map to feed inputs into the contract once consensus is achieved.
std::unordered_map<std::string, candidate_user_input> candidate_user_inputs;
// Map of outputs generated by the contract with output hash is the map key. Outputs will stay
// here until the end of the current consensus round. Output hash is globally unique among outputs for
// all users. We will use this map to distribute outputs back to connected users once consensus is achieved.
std::unordered_map<std::string, candidate_user_output> candidate_user_outputs;
util::rollover_hashset recent_userinput_hashes;
uint8_t stage = 0;
uint64_t time_now = 0;
std::string lcl;
uint64_t led_seq_no = 0;
hpfs::h32 state = hpfs::h32_empty;
//Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key.
//contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE.
//this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range.
//We will use this to track lcls related logic.- track state, lcl request, response.
std::map<uint64_t, ledger_cache_entry> ledger_cache;
std::string last_requested_lcl;
//ledger close time of previous hash
uint16_t stage_time = 0; // Time allocated to a consensus stage.
uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
std::mutex state_sync_lock;
sc::execution_context contract_ctx;
bool is_shutting_down = false;
consensus_context()
: recent_userinput_hashes(200)
/**
* This is used to store consensus information
*/
struct consensus_context
{
}
};
// The map of proposals that are being collected as consensus stages are progressing.
// peer public key is the key.
// todo: having a queue of proposals against peer pubkey.
std::unordered_map<std::string, const p2p::proposal> candidate_proposals;
struct vote_counter
{
std::map<uint64_t, int32_t> time;
std::map<std::string, int32_t> lcl;
std::map<std::string, int32_t> users;
std::map<std::string, int32_t> inputs;
std::map<std::string, int32_t> outputs;
std::map<hpfs::h32, int32_t> state;
};
// The set of npl messages that are being collected as consensus stages are progressing.
std::list<p2p::npl_message> candidate_npl_messages;
extern consensus_context ctx;
// Set of user pubkeys that is said to be connected to the cluster. This will be cleared in each round.
std::unordered_set<std::string> candidate_users;
int init();
// Map of candidate user inputs with input hash as map key. Inputs will stay here until they
// achieve consensus or expire (due to maxledgerseqno). Input hash is globally unique among inputs
// from all users. We will use this map to feed inputs into the contract once consensus is achieved.
std::unordered_map<std::string, candidate_user_input> candidate_user_inputs;
void deinit();
// Map of outputs generated by the contract with output hash is the map key. Outputs will stay
// here until the end of the current consensus round. Output hash is globally unique among outputs for
// all users. We will use this map to distribute outputs back to connected users once consensus is achieved.
std::unordered_map<std::string, candidate_user_output> candidate_user_outputs;
int run_consensus();
util::rollover_hashset recent_userinput_hashes;
int consensus();
uint8_t stage = 0;
uint64_t time_now = 0;
std::string lcl;
uint64_t led_seq_no = 0;
hpfs::h32 state = hpfs::h32_empty;
void purify_candidate_proposals();
//Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key.
//contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE.
//this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range.
//We will use this to track lcls related logic.- track state, lcl request, response.
std::map<uint64_t, ledger_cache_entry> ledger_cache;
std::string last_requested_lcl;
bool wait_and_proceed_stage(uint64_t &stage_start);
//ledger close time of previous hash
uint16_t stage_time = 0; // Time allocated to a consensus stage.
uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
void broadcast_nonunl_proposal();
std::mutex state_sync_lock;
sc::execution_context contract_ctx;
bool is_shutting_down = false;
void verify_and_populate_candidate_user_inputs();
consensus_context()
: recent_userinput_hashes(200)
{
}
};
bool verify_appbill_check(std::string_view pubkey, const size_t input_len);
struct vote_counter
{
std::map<uint64_t, int32_t> time;
std::map<std::string, int32_t> lcl;
std::map<std::string, int32_t> users;
std::map<std::string, int32_t> inputs;
std::map<std::string, int32_t> outputs;
std::map<hpfs::h32, int32_t> state;
};
p2p::proposal create_stage0_proposal();
extern consensus_context ctx;
p2p::proposal create_stage123_proposal(vote_counter &votes);
int init();
void broadcast_proposal(const p2p::proposal &p);
void deinit();
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
void wait();
void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes);
void run_consensus();
float_t get_stage_threshold(const uint8_t stage);
int consensus();
void timewait_stage(const bool reset, const uint64_t time);
void purify_candidate_proposals();
uint64_t get_ledger_time_resolution(const uint64_t time);
bool wait_and_proceed_stage(uint64_t &stage_start);
uint64_t get_stage_time_resolution(const uint64_t time);
void broadcast_nonunl_proposal();
int apply_ledger(const p2p::proposal &proposal);
void verify_and_populate_candidate_user_inputs();
void dispatch_user_outputs(const p2p::proposal &cons_prop);
bool verify_appbill_check(std::string_view pubkey, const size_t input_len);
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop);
p2p::proposal create_stage0_proposal();
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap);
p2p::proposal create_stage123_proposal(vote_counter &votes);
void broadcast_npl_output(std::string &output);
void broadcast_proposal(const p2p::proposal &p);
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
int get_initial_state_hash(hpfs::h32 &hash);
void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes);
void on_state_sync_completion(const hpfs::h32 new_state);
float_t get_stage_threshold(const uint8_t stage);
void timewait_stage(const bool reset, const uint64_t time);
uint64_t get_ledger_time_resolution(const uint64_t time);
uint64_t get_stage_time_resolution(const uint64_t time);
int apply_ledger(const p2p::proposal &proposal);
void dispatch_user_outputs(const p2p::proposal &cons_prop);
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop);
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap);
void broadcast_npl_output(std::string &output);
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate);
int get_initial_state_hash(hpfs::h32 &hash);
void on_state_sync_completion(const hpfs::h32 new_state);
} // namespace cons

View File

@@ -201,13 +201,11 @@ int main(int argc, char **argv)
// After initializing primary subsystems, register the SIGINT handler.
signal(SIGINT, &sigint_handler);
if (cons::run_consensus() == -1)
{
LOG_ERR << "Error occured in consensus.";
deinit();
return -1;
}
// Wait until consensus thread finishes.
cons::wait();
// deinit() here only gets called when there is an error in consensus.
// If not deinit in the sigint handler is called when a SIGINT is received.
deinit();
}
}

View File

@@ -153,9 +153,8 @@ namespace p2p
{
if (ctx.peer_connections.size() == 0)
{
LOG_DBG << "No peers to broadcast (not even self). Waiting until at least one peer connects.";
while (ctx.peer_connections.size() == 0)
util::sleep(100);
LOG_DBG << "No peers to broadcast (not even self). Cannot broadcast.";
return;
}
//Broadcast while locking the peer_connections.