Consensus enhancements and eliminating deadlocks after state inclusion. (#69)

This commit is contained in:
Asanka Indrajith
2019-12-13 09:38:50 -05:00
committed by GitHub
parent fd2c960068
commit 7a7aa6d5b3
12 changed files with 89 additions and 89 deletions

View File

@@ -1,12 +1,13 @@
#!/bin/bash
sudo ./cluster-create.sh 5
sudo mkdir -p /home/geveodev/hpcore/hpcluster/node1/statehist/0/data
sudo mkdir -p /home/geveodev/hpcore/hpcluster/node2/statehist/0/data/
sudo mkdir -p /home/geveodev/hpcore/hpcluster/node3/statehist/0/data
sudo mkdir -p /home/geveodev/hpcore/hpcluster/node4/statehist/0/data/
nodes=3
sudo ./cluster-create.sh $nodes
sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node1/statehist/0/data
sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node2/statehist/0/data
sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node3/statehist/0/data
sudo cp -r /home/geveodev/Desktop/Share/* /home/geveodev/hpcore/hpcluster/node4/statehist/0/data
# Setup initial state data for all nodes but one.
for (( i=1; i<$nodes; i++ ))
do
sudo mkdir -p ~/hpcore/hpcluster/node$i/statehist/0/data/
#sudo cp -r ~/Downloads/big.mkv ~/hpcore/hpcluster/node$i/statehist/0/data/
done

View File

@@ -29,6 +29,7 @@ constexpr float STAGE1_THRESHOLD = 0.5;
constexpr float STAGE2_THRESHOLD = 0.65;
constexpr float STAGE3_THRESHOLD = 0.8;
constexpr float MAJORITY_THRESHOLD = 0.8;
constexpr uint16_t MAX_RESET_TIME = 200;
consensus_context ctx;
@@ -68,6 +69,7 @@ int init()
});
ctx.prev_close_time = util::get_epoch_milliseconds();
ctx.reset_time = MAX_RESET_TIME;
return 0;
}
@@ -142,6 +144,7 @@ void consensus()
<< " hinp:" << proposal.hash_inputs.size()
<< " hout:" << proposal.hash_outputs.size()
<< " lcl:" << proposal.lcl
<< " state:" << *reinterpret_cast<const hasher::B2H *>(proposal.curr_hash_state.c_str())
<< " self:" << self
<< "\n";
}
@@ -162,28 +165,37 @@ void consensus()
// check if we're ahead/behind of consensus lcl
bool is_lcl_desync, should_request_history;
std::string majority_lcl;
uint64_t time_off = 0;
check_lcl_votes(is_lcl_desync, should_request_history, time_off, majority_lcl, votes);
check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes);
if (should_request_history)
{
//create history request message and request history from a random peer.
send_ledger_history_request(ctx.lcl, majority_lcl);
//handle minority going forward when boostrapping cluster.
//Here we are mimicking invalid min ledger scenario.
if (majority_lcl == GENESIS_LEDGER)
{
last_requested_lcl = majority_lcl;
p2p::history_response res;
res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
handle_ledger_history_response(std::move(res));
}
else
{
//create history request message and request history from a random peer.
send_ledger_history_request(ctx.lcl, majority_lcl);
}
}
if (is_lcl_desync)
{
int64_t diff = 0;
if (time_off > ctx.time_now)
diff = time_off - ctx.time_now;
else
diff = ctx.time_now - time_off;
//We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time.
//this might not make sense now after stage 1 now since we are applying a stage time resolution?.
LOG_DBG << "time off: " << std::to_string(ctx.reset_time);
timewait_stage(true, ctx.reset_time);
const uint16_t decrement = rand() % (conf::cfg.roundtime / 40);
if (decrement > ctx.reset_time)
ctx.reset_time = MAX_RESET_TIME;
else
ctx.reset_time -= decrement;
LOG_DBG << "time off: " << std::to_string(diff);
timewait_stage(true, diff);
//LOG_DBG << "time off: " << std::to_string(time_off);
return;
}
else
@@ -205,9 +217,10 @@ void consensus()
{
ctx.prev_close_time = stg_prop.time;
apply_ledger(stg_prop);
ctx.reset_time = MAX_RESET_TIME;
// We have finished a consensus round (all 4 stages).
LOG_INFO << "****Stage 3 consensus reached****";
LOG_INFO << "****Stage 3 consensus reached**** (state:" << *reinterpret_cast<const hasher::B2H *>(cons::ctx.curr_hash_state.c_str()) << ")";
}
}
}
@@ -396,10 +409,13 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
if (numvotes >= vote_threshold)
stg_prop.hash_outputs.emplace(hash);
// time is voted on a simple sorted and majority basis, since there will always be disagreement.
// time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement.
int32_t highest_time_vote = 0;
for (const auto [time, numvotes] : votes.time)
for(auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr)
{
const uint64_t time = itr->first;
const int32_t numvotes = itr->second;
if (numvotes > highest_time_vote)
{
highest_time_vote = numvotes;
@@ -407,12 +423,10 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
}
}
//todo:apply a round time resolution to increase close time reliability(for stage 1,2)
if (ctx.stage == 3)
get_ledger_time_resolution(stg_prop.time);
stg_prop.time = get_ledger_time_resolution(stg_prop.time);
else
get_stage_time_resolution(stg_prop.time);
stg_prop.time = get_stage_time_resolution(stg_prop.time);
return stg_prop;
}
@@ -476,7 +490,7 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority
/**
* Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes.
*/
void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes)
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes)
{
int32_t total_lcl_votes = 0;
@@ -488,11 +502,6 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
increment(votes.lcl, cp.lcl);
total_lcl_votes++;
}
//keep track of max time of peers, so we can reset nodes in a random time range to increase reliability.
//This is very useful especially boostrapping a node cluster.
if (cp.time > time_off)
time_off = cp.time;
}
is_desync = false;
@@ -537,7 +546,7 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &ti
if (winning_votes < MAJORITY_THRESHOLD * ctx.candidate_proposals.size())
{
// potential fork condition.
LOG_WARN << "No consensus on lcl. Possible fork condition. won:" << std::to_string(winning_votes) << " total:" << std::to_string(ctx.candidate_proposals.size());
LOG_DBG << "No consensus on lcl. Possible fork condition. won:" << std::to_string(winning_votes) << " total:" << std::to_string(ctx.candidate_proposals.size());
is_desync = true;
return;
}
@@ -738,20 +747,22 @@ void check_state(vote_counter &votes)
{
if (ctx.state_sync_lcl != ctx.lcl)
{
LOG_DBG << "State mismatch. Starting state sync...";
// Change the mode to passive and not sending out proposals till the state is synced
conf::change_operating_mode(conf::OPERATING_MODE::OBSERVING);
const hasher::B2H majority_state_hash = *reinterpret_cast<const hasher::B2H *>(majority_state.c_str());
LOG_INFO << "Starting state sync. Curr state:" << *reinterpret_cast<const hasher::B2H *>(ctx.curr_hash_state.c_str()) << " majority:" << majority_state_hash;
start_state_sync(majority_state_hash);
ctx.is_state_syncing = true;
ctx.state_sync_lcl = ctx.lcl;
}
}
else if (majority_state == ctx.curr_hash_state)
else if (majority_state == ctx.curr_hash_state && ctx.is_state_syncing)
{
LOG_INFO << "State sync complete. state:" << *reinterpret_cast<const hasher::B2H *>(ctx.curr_hash_state.c_str());
ctx.is_state_syncing = false;
ctx.state_sync_lcl.clear();
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSING);

View File

@@ -69,11 +69,11 @@ struct consensus_context
util::rollover_hashset recent_userinput_hashes;
uint8_t stage;
uint64_t novel_proposal_time;
uint64_t time_now;
uint8_t stage = 0;
uint64_t novel_proposal_time = 0;
uint64_t time_now = 0;
std::string lcl;
uint64_t led_seq_no;
uint64_t led_seq_no = 0;
std::string curr_hash_state;
std::string prev_hash_state;
@@ -83,9 +83,11 @@ struct consensus_context
//We will use this to track lcls related logic.- track state, lcl request, response.
std::map<uint64_t, ledger_cache> cache;
//ledger close time of previous hash
uint64_t prev_close_time;
bool is_state_syncing;
uint64_t prev_close_time = 0;
uint16_t reset_time = 0;
bool is_state_syncing = false;
std::string state_sync_lcl;
std::thread state_syncing_thread;
std::mutex state_syncing_mutex;
@@ -124,7 +126,7 @@ void broadcast_proposal(const p2p::proposal &p);
void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
float_t get_stage_threshold(const uint8_t stage);

View File

@@ -206,7 +206,7 @@ const ledger_history load_ledger()
if (ldg_hist.cache.empty())
{
ldg_hist.led_seq_no = 0;
ldg_hist.lcl = "0-genesis";
ldg_hist.lcl = GENESIS_LEDGER;
}
else
{
@@ -467,15 +467,11 @@ void handle_ledger_history_response(const p2p::history_response &hr)
}
last_requested_lcl = "";
const auto latest_lcl_itr = cons::ctx.cache.rbegin();
cons::ctx.lcl = latest_lcl_itr->second.lcl;
cons::ctx.led_seq_no = latest_lcl_itr->first;
if (cons::ctx.cache.empty())
{
cons::ctx.led_seq_no = 0;
cons::ctx.lcl = "0-genesis";
cons::ctx.lcl = GENESIS_LEDGER;
}
else
{

View File

@@ -9,7 +9,7 @@ namespace cons
//max ledger count
constexpr uint64_t MAX_LEDGER_SEQUENCE = 200;
constexpr const char* GENESIS_LEDGER = "0-genesis";
struct ledger_cache
{
std::string lcl;

View File

@@ -12,7 +12,7 @@ namespace cons
{
constexpr uint16_t MAX_AWAITING_REQUESTS = 1;
constexpr uint16_t MAX_RESPONSE_WAIT_CYCLES = 100;
constexpr uint16_t MAX_RESPONSE_WAIT_CYCLES = 10;
// List of state responses flatbuffer messages to be processed.
std::list<std::string> candidate_state_responses;
@@ -80,8 +80,6 @@ int create_state_response(p2p::peer_outbound_message &msg, const p2p::state_requ
void start_state_sync(const hasher::B2H state_hash_to_request)
{
std::cout << "start_state_sync() " << state_hash_to_request << "\n";
{
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.state_response_mutex);
p2p::ctx.collected_msgs.state_response.clear();
@@ -127,10 +125,7 @@ int run_state_sync_iterator()
hasher::B2H response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash());
const auto pending_resp_itr = submitted_requests.find(response_hash);
if (pending_resp_itr == submitted_requests.end())
{
std::cout << "Ignoring state response.\n";
continue;
}
// Now that we have received matching hash, remove it from the waiting list.
submitted_requests.erase(pending_resp_itr);
@@ -169,7 +164,7 @@ int run_state_sync_iterator()
{
// Reset the counter and re-submit request.
request.waiting_cycles = 0;
std::cout << "Resubmit state request\n";
LOG_DBG << "Resubmitting state request...";
submit_request(request);
}
}
@@ -192,7 +187,7 @@ int run_state_sync_iterator()
void submit_request(const backlog_item &request)
{
std::cout << "Submitting state request. type: " << request.type << " path:" << request.path << " blockid: " << request.block_id << "\n";
LOG_DBG << "Submitting state request. type:" << request.type << " path:" << request.path << " blockid:" << request.block_id;
submitted_requests.try_emplace(request.expected_hash, request);
@@ -202,14 +197,9 @@ void submit_request(const backlog_item &request)
int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp)
{
std::cout << "Recieved state fs entry response\n";
std::unordered_map<std::string, p2p::state_fs_hash_entry> state_fs_entry_list;
fbschema::p2pmsg::flatbuf_statefshashentry_to_statefshashentry(state_fs_entry_list, fs_entry_resp->entries());
for (const auto [a, b] : state_fs_entry_list)
std::cout << "Recieved fsentry: " << a << "\n";
std::unordered_map<std::string, p2p::state_fs_hash_entry> existing_fs_entries;
std::string_view root_path_sv = fbschema::flatbuff_str_to_sv(fs_entry_resp->path());
std::string root_path_str(root_path_sv.data(), root_path_sv.size());
@@ -227,12 +217,9 @@ int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry
// Request more info on fs entries that exist on both sides but are different.
for (const auto &[path, fs_entry] : existing_fs_entries)
{
std::cout << "Existing path :" << path << std::endl;
const auto fs_itr = state_fs_entry_list.find(path);
if (fs_itr != state_fs_entry_list.end())
{
std::cout << "Existing fs_entry_hash :" << fs_entry.hash << std::endl;
std::cout << "Recieved fs_entry_hash :" << fs_itr->second.hash << std::endl;
if (fs_itr->second.hash != fs_entry.hash)
{
if (fs_entry.is_file)
@@ -276,8 +263,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *
std::string_view path_sv = fbschema::flatbuff_str_to_sv(file_resp->path());
const std::string path_str(path_sv.data(), path_sv.size());
std::cout << "Recieved file hash map of " << path_str << std::endl;
std::vector<uint8_t> existing_block_hashmap;
if (statefs::get_block_hash_map(existing_block_hashmap, path_str, hasher::B2H_empty) == -1)
return -1;
@@ -288,9 +273,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *
const hasher::B2H *resp_hashes = reinterpret_cast<const hasher::B2H *>(file_resp->hash_map()->data());
auto resp_hash_count = file_resp->hash_map()->size() / hasher::HASH_SIZE;
std::cout << "Reieved file hashmap size :" << file_resp->hash_map()->size() << std::endl;
std::cout << "Existing file hashmap size :" << existing_block_hashmap.size() << std::endl;
auto insert_itr = pending_requests.begin();
for (int block_id = 0; block_id < existing_hash_count; ++block_id)
@@ -300,7 +282,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *
if (existing_hashes[block_id] != resp_hashes[block_id])
{
std::cout << "Mismatch in file block :" << block_id << std::endl;
// Insert at front to give priority to block requests while preserving block order.
pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]});
}
@@ -315,7 +296,6 @@ int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *
{
for (int block_id = existing_hash_count; block_id < resp_hash_count; ++block_id)
{
std::cout << "Missing block: " << block_id << "\n";
// Insert at front to give priority to block requests while preserving block order.
pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, path_str, block_id, resp_hashes[block_id]});
}
@@ -328,8 +308,6 @@ int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg
{
p2p::block_response block_resp = fbschema::p2pmsg::create_block_response_from_msg(*block_msg);
std::cout << "Recieved block " << block_resp.block_id << " of " << block_resp.path << "\n";
if (statefs::write_block(block_resp.path, block_resp.block_id, block_resp.data.data(), block_resp.data.size()) == -1)
return -1;

View File

@@ -64,6 +64,7 @@ int parse_cmd(int argc, char **argv)
*/
void deinit()
{
proc::deinit();
hplog::deinit();
}
@@ -115,7 +116,7 @@ void std_terminate() noexcept
int main(int argc, char **argv)
{
//seed rand
srand(time(0));
srand(util::get_epoch_milliseconds());
// Register exception handler for std exceptions.
std::set_terminate(&std_terminate);

View File

@@ -128,7 +128,6 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
}
else if (content_message_type == p2pmsg::Message_State_Response_Message)
{
LOG_INFO << "Received State Response Message\n";
std::lock_guard<std::mutex> lock(ctx.collected_msgs.state_response_mutex); // Insert state_response with lock.
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_response.push_back(std::move(response));

View File

@@ -72,7 +72,7 @@ int exec_contract(const contract_exec_args &args)
// Wait for child process (contract process) to complete execution.
const int presult = await_process_execution(contract_pid);
LOG_INFO << "Contract process ended.";
LOG_DBG << "Contract process ended.";
contract_pid = 0;
if (presult != 0)
@@ -99,7 +99,7 @@ int exec_contract(const contract_exec_args &args)
// Write the contract input message from HotPocket to the stdin (0) of the contract process.
write_contract_args(args);
LOG_INFO << "Starting contract process...";
LOG_DBG << "Starting contract process...";
// Fill process args.
char *execv_args[conf::cfg.runtime_binexec_args.size() + 1];
@@ -209,11 +209,7 @@ int stop_state_monitor()
if (htreebuilder.generate(statehash) != 0)
return -1;
std::string root_hash(reinterpret_cast<const char*>(&statehash), hasher::HASH_SIZE);
root_hash.swap(cons::ctx.curr_hash_state);
LOG_DBG << "State hash: " << std::hex << statehash << std::dec;
cons::ctx.curr_hash_state = std::string(reinterpret_cast<const char *>(&statehash), hasher::HASH_SIZE);
return 0;
}
@@ -710,4 +706,16 @@ void close_unused_vectorfds(const bool is_hp, std::vector<int> &fds)
}
}
/**
* Cleanup any running processes.
*/
void deinit()
{
if (contract_pid > 0)
kill(contract_pid, SIGINT);
if (statemon_pid > 0)
kill(statemon_pid, SIGINT);
}
} // namespace proc

View File

@@ -78,6 +78,8 @@ struct contract_exec_args
int exec_contract(const contract_exec_args &args);
void deinit();
//------Internal-use functions for this namespace.
int await_process_execution(pid_t pid);

View File

@@ -277,6 +277,8 @@ void socket_session<T>::close()
template <class T>
void socket_session<T>::on_close(const error_code ec, const int8_t type)
{
sess_handler.on_close(this);
if (type == 1)
return;

View File

@@ -37,7 +37,7 @@ void B2H::operator^=(const B2H rhs)
std::ostream &operator<<(std::ostream &output, const B2H &h)
{
output << h.data[0] << h.data[1] << h.data[2] << h.data[3];
output << h.data[0];// << h.data[1] << h.data[2] << h.data[3];
return output;
}