Consensus deadlock fixes and reliability improvements. (#187)

* Consensus time vote improvements.
* Skipped self proposal in Observer mode.
* Added docker cluster script params.
* Added state/lcl sync abandon threshold.
* Added pubkey display for cluster script.
* Added proposal latency log.
* Added sync completion check to prevent deadlocks.
This commit is contained in:
Ravin Perera
2020-12-08 15:17:27 +05:30
committed by GitHub
parent 7bf0475b6f
commit fe9e276f8d
14 changed files with 318 additions and 199 deletions

View File

@@ -115,7 +115,7 @@ namespace conf
cfg.pubidletimeout = 0;
cfg.peeridletimeout = 120;
cfg.is_consensus_public = true;
cfg.is_consensus_public = false;
cfg.is_npl_public = false;
cfg.msgforwarding = false;

View File

@@ -103,19 +103,18 @@ namespace consensus
// A consensus round consists of 4 stages (0,1,2,3).
// For a given stage, this function may get visited multiple times due to time-wait conditions.
uint64_t stage_start = 0;
if (!wait_and_proceed_stage(stage_start))
if (!wait_and_proceed_stage())
return 0; // This means the stage has been reset.
LOG_DEBUG << "Started stage " << std::to_string(ctx.stage);
// We consider stage start time as the current discreet time throughout the stage.
ctx.time_now = stage_start;
// Throughout consensus, we continously update and prune the candidate proposals for newly
// arived ones and expired ones.
revise_candidate_proposals();
// If possible, switch back to proposer mode before stage processing.
check_sync_completion();
// Get current lcl and state.
std::string lcl = ledger::ctx.get_lcl();
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
@@ -164,9 +163,7 @@ namespace consensus
// If we are in sync, vote and get the final winning votes.
// This is the consensus proposal which makes it into the ledger and contract execution
const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state, unl_hash);
// Update the unl with the unl changeset that subjected to the consensus.
unl::apply_changeset(p.unl_changeset.additions, p.unl_changeset.removals);
broadcast_proposal(p);
// Update the ledger and execute the contract using the consensus proposal.
if (update_ledger_and_execute_contract(p, lcl, state) == -1)
@@ -229,6 +226,16 @@ namespace consensus
return false;
}
/**
* Checks whether we can switch back from currently ongoing observer-mode sync operation
* that has been completed.
*/
void check_sync_completion()
{
if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER && !state_sync::ctx.is_syncing && !ledger::sync_ctx.is_syncing)
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER);
}
/**
* Moves proposals collected from the network into candidate proposals and
* cleans up any outdated proposals from the candidate set.
@@ -247,24 +254,17 @@ namespace consensus
// Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes.
for (const auto &proposal : collected_proposals)
{
auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey);
if (prop_itr != ctx.candidate_proposals.end())
{
ctx.candidate_proposals.erase(prop_itr);
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
else
{
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
ctx.candidate_proposals.erase(proposal.pubkey); // Erase if already exists.
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
// Prune any outdated proposals.
auto itr = ctx.candidate_proposals.begin();
const uint64_t time_now = util::get_epoch_milliseconds();
while (itr != ctx.candidate_proposals.end())
{
const p2p::proposal &cp = itr->second;
const uint64_t time_diff = (ctx.time_now > cp.sent_timestamp) ? (ctx.time_now - cp.sent_timestamp) : 0;
const uint64_t time_diff = (time_now > cp.sent_timestamp) ? (time_now - cp.sent_timestamp) : 0;
const int8_t stage_diff = ctx.stage - cp.stage;
// only consider recent proposals and proposals from previous stage and current stage.
@@ -277,7 +277,8 @@ namespace consensus
<< " ts:" << std::to_string(cp.time)
<< " lcl:" << cp.lcl.substr(0, 15)
<< " state:" << cp.state
<< " [from:" << ((cp.pubkey == conf::cfg.pubkey) ? "self" : util::get_hex(cp.pubkey, 1, 5)) << "]";
<< " [from:" << ((cp.pubkey == conf::cfg.pubkey) ? "self" : util::get_hex(cp.pubkey, 1, 5)) << "]"
<< "(" << std::to_string(cp.recv_timestamp > cp.sent_timestamp ? cp.recv_timestamp - cp.sent_timestamp : 0) << "ms)";
if (keep_candidate)
++itr;
@@ -290,7 +291,7 @@ namespace consensus
* Syncrhonise the stage/round time for fixed intervals and reset the stage.
* @return True if consensus can proceed in the current round. False if stage is reset.
*/
bool wait_and_proceed_stage(uint64_t &stage_start)
bool wait_and_proceed_stage()
{
// Here, nodes try to synchronise nodes stages using network clock.
// We devide universal time to windows of equal size of roundtime. Each round must be synced with the
@@ -299,23 +300,24 @@ namespace consensus
const uint64_t now = util::get_epoch_milliseconds();
// Rrounds are discreet windows of roundtime.
// This gets the start time of current round window. Stage 0 must start in the next round window.
const uint64_t current_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime);
if (ctx.stage == 0)
{
// This gets the start time of current round window. Stage 0 must start in the window after that.
const uint64_t previous_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime);
// Stage 0 must start in the next round window.
// (This makes sure stage 3 gets whichever the remaining time in the round after stages 0,1,2)
stage_start = current_round_start + conf::cfg.roundtime;
const uint64_t to_wait = stage_start - now;
ctx.round_start_time = previous_round_start + conf::cfg.roundtime;
const uint64_t to_wait = ctx.round_start_time - now;
LOG_DEBUG << "Waiting " << to_wait << "ms for next round stage 0";
LOG_DEBUG << "Waiting " << to_wait << "ms for next round stage 0.";
util::sleep(to_wait);
return true;
}
else
{
stage_start = current_round_start + (ctx.stage * ctx.stage_time);
const uint64_t stage_start = ctx.round_start_time + (ctx.stage * ctx.stage_time);
// Compute stage time wait.
// Node wait between stages to collect enough proposals from previous stages from other nodes.
@@ -325,7 +327,7 @@ namespace consensus
// it will join in next round. Otherwise it will continue particapating in this round.
if (to_wait < ctx.stage_reset_wait_threshold) //todo: self claculating/adjusting network delay
{
LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0";
LOG_DEBUG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0.";
ctx.stage = 1;
return false;
}
@@ -515,51 +517,54 @@ namespace consensus
{
// This is the proposal that stage 0 votes on.
// We report our own values in stage 0.
p2p::proposal stg_prop;
stg_prop.time = ctx.time_now;
stg_prop.stage = 0;
stg_prop.lcl = lcl;
stg_prop.state = state;
stg_prop.unl_hash = unl_hash;
crypto::random_bytes(stg_prop.nonce, ROUND_NONCE_SIZE);
p2p::proposal p;
p.time = ctx.round_start_time;
p.stage = 0;
p.lcl = lcl;
p.state = state;
p.unl_hash = unl_hash;
crypto::random_bytes(p.nonce, ROUND_NONCE_SIZE);
// Populate the proposal with set of candidate user pubkeys.
stg_prop.users.swap(ctx.candidate_users);
p.users.swap(ctx.candidate_users);
// Populate the proposal with hashes of user inputs.
for (const auto &[hash, cand_input] : ctx.candidate_user_inputs)
stg_prop.hash_inputs.emplace(hash);
p.hash_inputs.emplace(hash);
// Populate the proposal with hashes of user outputs.
for (const auto &[hash, cand_output] : ctx.candidate_user_outputs)
stg_prop.hash_outputs.emplace(hash);
p.hash_outputs.emplace(hash);
// Populate the proposal wil unl changeset.
stg_prop.unl_changeset = ctx.candidate_unl_changeset;
// Populate the proposal with unl changeset.
p.unl_changeset = ctx.candidate_unl_changeset;
return stg_prop;
return p;
}
p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash)
{
// The proposal to be emited at the end of this stage.
p2p::proposal stg_prop;
stg_prop.stage = ctx.stage;
stg_prop.state = state;
p2p::proposal p;
p.stage = ctx.stage;
p.state = state;
// we always vote for our current lcl and state regardless of what other peers are saying
// if there's a fork condition we will either request history and state from
// We always vote for our current lcl and state regardless of what other peers are saying.
// If there's a fork condition we will either request history and state from
// our peers or we will halt depending on level of consensus on the sides of the fork.
stg_prop.lcl = lcl;
p.lcl = lcl;
stg_prop.unl_hash = unl_hash;
// We always votr for our current unl hash.
p.unl_hash = unl_hash;
const uint64_t time_now = util::get_epoch_milliseconds();
// Vote for rest of the proposal fields by looking at candidate proposals.
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
// Vote for times.
// Everyone votes on an arbitrary time, as long as it's not in the future and within the round time.
if (ctx.time_now > cp.time && (ctx.time_now - cp.time) <= conf::cfg.roundtime)
// Everyone votes on the discreet time, as long as it's not in the future and within 2 round times.
if (time_now > cp.time && (time_now - cp.time) <= (conf::cfg.roundtime * 2))
increment(votes.time, cp.time);
// Vote for round nonce.
@@ -600,32 +605,32 @@ namespace consensus
// Add user pubkeys which have votes over stage threshold to proposal.
for (const auto &[pubkey, numvotes] : votes.users)
if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0))
stg_prop.users.emplace(pubkey);
p.users.emplace(pubkey);
// Add inputs which have votes over stage threshold to proposal.
for (const auto &[hash, numvotes] : votes.inputs)
if (numvotes >= required_votes || (ctx.stage == 1 && numvotes > 0))
stg_prop.hash_inputs.emplace(hash);
p.hash_inputs.emplace(hash);
// Add outputs which have votes over stage threshold to proposal.
for (const auto &[hash, numvotes] : votes.outputs)
if (numvotes >= required_votes)
stg_prop.hash_outputs.emplace(hash);
p.hash_outputs.emplace(hash);
// For the unl changeset reset required votes for majority votes.
// For the unl changeset, reset required votes for majority votes.
required_votes = ceil(MAJORITY_THRESHOLD * unl_count);
// Add unl additions which have votes over majority threshold to proposal.
for (const auto &[pubkey, numvotes] : votes.unl_additions)
if (numvotes >= required_votes)
stg_prop.unl_changeset.additions.emplace(pubkey);
p.unl_changeset.additions.emplace(pubkey);
// Add unl removals which have votes over majority threshold to proposal.
for (const auto &[pubkey, numvotes] : votes.unl_removals)
if (numvotes >= required_votes)
stg_prop.unl_changeset.removals.emplace(pubkey);
p.unl_changeset.removals.emplace(pubkey);
// time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement.
// time is voted on a simple sorted (highest to lowest) and majority basis.
uint32_t highest_time_vote = 0;
for (auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr)
{
@@ -635,9 +640,12 @@ namespace consensus
if (numvotes > highest_time_vote)
{
highest_time_vote = numvotes;
stg_prop.time = time;
p.time = time;
}
}
// If final time happens to be 0 (this can happen if there were no proposals to vote for), we set the time manually.
if (p.time == 0)
p.time = ctx.round_start_time;
// Round nonce is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement.
uint32_t highest_nonce_vote = 0;
@@ -648,31 +656,29 @@ namespace consensus
if (numvotes > highest_nonce_vote)
{
highest_time_vote = numvotes;
stg_prop.nonce = nonce;
highest_nonce_vote = numvotes;
p.nonce = nonce;
}
}
return stg_prop;
return p;
}
/**
* Broadcasts the given proposal to all connected peers if in PROPOSER mode. Otherwise
* only send to self in OBSERVER mode.
* Broadcasts the given proposal to all connected peers if in PROPOSER mode. Does not send in OBSERVER mode.
* @return 0 on success. -1 if no peers to broadcast.
*/
void broadcast_proposal(const p2p::proposal &p)
{
// In observer mode, we do not send out proposals.
if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER)
return;
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_proposal(fbuf, p);
p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public);
// In observer mode, we only send out the proposal to ourselves.
if (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER)
p2p::send_message_to_self(fbuf);
else
p2p::broadcast_message(fbuf, true, false, !conf::cfg.is_consensus_public);
LOG_DEBUG << "Proposed u/i/o:" << p.users.size()
LOG_DEBUG << "Proposed <s" << std::to_string(p.stage) << "> u/i/o:" << p.users.size()
<< "/" << p.hash_inputs.size()
<< "/" << p.hash_outputs.size()
<< " ts:" << std::to_string(p.time)
@@ -843,7 +849,8 @@ namespace consensus
}
}
// Clear candidate unl changset after consensus rounds are completed.
// Update the unl with the unl changeset that subjected to the consensus.
unl::apply_changeset(cons_prop.unl_changeset.additions, cons_prop.unl_changeset.removals);
ctx.candidate_unl_changeset.clear();
// Send any output from the previous consensus round to locally connected users.

View File

@@ -68,7 +68,7 @@ namespace consensus
p2p::contract_unl_changeset candidate_unl_changeset;
uint8_t stage = 1;
uint64_t time_now = 0;
uint64_t round_start_time = 0;
uint16_t stage_time = 0; // Time allocated to a consensus stage.
uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
@@ -105,9 +105,11 @@ namespace consensus
bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes);
void check_sync_completion();
void revise_candidate_proposals();
bool wait_and_proceed_stage(uint64_t &stage_start);
bool wait_and_proceed_stage();
void broadcast_nonunl_proposal();

View File

@@ -52,14 +52,14 @@ namespace hpfs
std::ostream &operator<<(std::ostream &output, const h32 &h)
{
const uint8_t *buf = reinterpret_cast<const uint8_t *>(&h);
for (int i = 0; i < 8; i++)
for (int i = 0; i < 5; i++) // Only print first 5 bytes in hex.
output << std::hex << std::setfill('0') << std::setw(2) << (int)buf[i];
return output;
}
// Helper class to support std::map/std::unordered_map custom hashing function.
// This is needed to use B2H as the std map container key.
// Helper func to support std::map/std::unordered_map custom hashing function.
// This is needed to use h32 as the std map container key.
size_t h32_std_key_hasher::operator()(const h32 h) const
{
// Compute individual hash values. http://stackoverflow.com/a/1646913/126995

View File

@@ -7,7 +7,7 @@
namespace hpfs
{
constexpr const char *HPFS_TRACE_ARG_ERROR = "trace=error";
constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=debug";
constexpr const char *HPFS_TRACE_ARG_DEBUG = "trace=error";
constexpr const char *HPFS_HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HPFS_HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr const char *HPFS_SESSION = "::hpfs.session";

View File

@@ -16,6 +16,12 @@ namespace ledger
constexpr uint64_t MAX_LEDGER_SEQUENCE = 256; // Max ledger block count to keep.
constexpr uint16_t SYNCER_IDLE_WAIT = 20; // lcl syncer loop sleep time (milliseconds).
// Max no. of repetitive reqeust resubmissions before abandoning the sync.
constexpr uint16_t ABANDON_THRESHOLD = 10;
// No. of milliseconds to wait before resubmitting a request.
uint16_t REQUEST_RESUBMIT_TIMEOUT;
ledger_context ctx;
sync_context sync_ctx;
bool init_success = false;
@@ -25,6 +31,8 @@ namespace ledger
*/
int init()
{
REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime;
// Filename list of the history folder.
std::list<std::string> sorted_folder_entries = util::fetch_dir_entries(conf::ctx.hist_dir);
// Sorting to make filenames in seq_no order.
@@ -138,24 +146,17 @@ namespace ledger
return;
}
const std::string current_lcl = ctx.get_lcl();
{
std::scoped_lock<std::mutex> lock(sync_ctx.target_lcl_mutex);
if (sync_ctx.target_lcl == target_lcl)
return;
sync_ctx.target_lcl = target_lcl;
sync_ctx.target_lcl_seq_no = target_seq_no;
sync_ctx.target_requested_on = 0;
sync_ctx.request_submissions = 0;
sync_ctx.is_syncing = true;
LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << current_lcl.substr(0, 15) << ")";
}
// Request history from a random peer if needed.
// We do not send a request if the target is GENESIS block (nothing to request).
if (target_lcl != GENESIS_LEDGER)
{
send_ledger_history_request(current_lcl, target_lcl);
LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << ctx.get_lcl().substr(0, 15) << ")";
}
}
@@ -168,112 +169,154 @@ namespace ledger
LOG_INFO << "lcl sync: Worker started.";
std::list<std::pair<std::string, p2p::history_request>> history_requests;
std::list<p2p::history_response> history_responses;
// Indicates whether any requests/responses were processed in the previous loop iteration.
bool prev_processed = false;
while (!sync_ctx.is_shutting_down)
{
// Wait a small delay if there were no requests/responses processed during previous iteration.
if (!prev_processed)
util::sleep(SYNCER_IDLE_WAIT);
// Indicates whether any requests/responses were processed in the loop iteration.
bool processed = false;
const std::string current_lcl = ctx.get_lcl();
// Move over the collected sync items to the local lists.
{
std::scoped_lock<std::mutex>(sync_ctx.list_mutex);
history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests);
history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses);
}
prev_processed = !history_requests.empty() || !history_responses.empty();
// Process any target lcl sync activities.
// Perform lcl sync activities.
{
std::scoped_lock<std::mutex> lock(sync_ctx.target_lcl_mutex);
if (!sync_ctx.target_lcl.empty())
send_lcl_sync_request(); // Send lcl requests if needed (or abandon if sync timeout).
// Process any history responses from other nodes.
if (!sync_ctx.target_lcl.empty() && check_lcl_sync_responses() == 1)
processed = true;
}
// Serve any history requests from other nodes.
if (check_lcl_sync_requests() == 1)
processed = true;
// Wait a small delay if there were no requests/responses processed during previous iteration.
if (!processed)
util::sleep(SYNCER_IDLE_WAIT);
}
LOG_INFO << "lcl sync: Worker stopped.";
}
/**
* Submits/resubmits lcl history requests as needed. Abandons sync if threshold reached.
*/
void send_lcl_sync_request()
{
// If target lcl is genesis lcl, Clear the ledger history and reset target sequence number.
if (sync_ctx.target_lcl == GENESIS_LEDGER)
{
LOG_INFO << "lcl sync: Target is GENESIS. Clearing our history.";
clear_ledger();
sync_ctx.clear_target();
}
else
{
// Check whether we need to send any requests or abandon the sync due to timeout.
const uint64_t time_now = util::get_epoch_milliseconds();
if ((sync_ctx.target_requested_on == 0) || // Initial request.
(time_now - sync_ctx.target_requested_on) > REQUEST_RESUBMIT_TIMEOUT) // Request resubmission.
{
if (sync_ctx.request_submissions < ABANDON_THRESHOLD)
{
// If target lcl is genesis lcl, Clear the ledger history and reset target sequence number.
if (sync_ctx.target_lcl == GENESIS_LEDGER)
{
LOG_INFO << "lcl sync: Target is GENESIS. Clearing our history.";
clear_ledger();
sync_ctx.target_lcl.clear();
sync_ctx.target_lcl_seq_no = 0;
sync_ctx.is_syncing = false;
}
// If full history mode is not enabled check the target lcl seq no
// to see whether it's too far ahead. That means no one probably has our
// lcl in their ledgers. So we should clear our entire ledger history before requesting from peers.
else if (!conf::cfg.fullhistory && current_lcl != GENESIS_LEDGER && sync_ctx.target_lcl_seq_no > (ctx.get_seq_no() + MAX_LEDGER_SEQUENCE))
// Before first request, if full history mode is not enabled check the target lcl seq no to see whether
// it's too far ahead. That means no one probably has our lcl in their ledgers. So we should clear our
// entire ledger history before requesting from peers.
if (sync_ctx.target_requested_on == 0 && !conf::cfg.fullhistory && sync_ctx.target_lcl_seq_no > (ctx.get_seq_no() + MAX_LEDGER_SEQUENCE))
{
LOG_INFO << "lcl sync: Target " << sync_ctx.target_lcl.substr(0, 15) << " is too far ahead. Clearing our history.";
clear_ledger();
}
else
{
// Scan any queued lcl history responses.
// Only process the first successful item which matches with our current lcl.
for (const p2p::history_response &hr : history_responses)
{
if (hr.requester_lcl == current_lcl)
{
std::string new_lcl;
if (handle_ledger_history_response(hr, new_lcl) != -1)
{
LOG_INFO << "lcl sync: Sync complete. New lcl:" << new_lcl.substr(0, 15);
sync_ctx.target_lcl.clear();
sync_ctx.target_lcl_seq_no = 0;
sync_ctx.is_syncing = false;
break;
}
}
}
}
send_ledger_history_request(ctx.get_lcl(), sync_ctx.target_lcl);
sync_ctx.target_requested_on = time_now;
sync_ctx.request_submissions++;
}
history_responses.clear();
}
// Serve any history requests from other nodes.
{
// Acquire lock so consensus does not update the ledger while we are reading the ledger.
std::scoped_lock<std::mutex> ledger_lock(ctx.ledger_mutex);
for (const auto &[session_id, hr] : history_requests)
else
{
// First check whether we have the required lcl available.
if (!check_required_lcl_availability(hr.required_lcl))
continue;
p2p::history_response resp;
if (ledger::retrieve_ledger_history(hr, resp) != -1)
{
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_response(fbuf, resp);
std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize());
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (peer_itr != p2p::ctx.peer_connections.end())
{
comm::comm_session *session = peer_itr->second;
session->send(msg);
}
}
LOG_INFO << "lcl sync: Resubmission threshold exceeded. Abandoning sync.";
sync_ctx.clear_target();
}
}
}
}
history_requests.clear();
/**
* Processes any lcl responses we have received from other peers.
* @return 0 if no respones were processed. 1 if at least one response was processed.
*/
int check_lcl_sync_responses()
{
// Move over the collected responses to the local list.
std::list<p2p::history_response> history_responses;
{
std::scoped_lock<std::mutex>(sync_ctx.list_mutex);
history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses);
}
const std::string current_lcl = ctx.get_lcl();
// Scan any queued lcl history responses.
// Only process the first successful item which matches with our current lcl.
for (const p2p::history_response &hr : history_responses)
{
if (hr.requester_lcl == current_lcl)
{
std::string new_lcl;
if (handle_ledger_history_response(hr, new_lcl) != -1)
{
LOG_INFO << "lcl sync: Sync complete. New lcl:" << new_lcl.substr(0, 15);
sync_ctx.clear_target();
break;
}
}
}
LOG_INFO << "lcl sync: Worker stopped.";
return history_responses.empty() ? 0 : 1;
}
/**
* Serves any lcl requests we have received from other peers.
* @return 0 if no requests were served. 1 if at least one request was served.
*/
int check_lcl_sync_requests()
{
// Move over the collected requests to the local list.
std::list<std::pair<std::string, p2p::history_request>> history_requests;
{
std::scoped_lock<std::mutex>(sync_ctx.list_mutex);
history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests);
}
// Acquire lock so consensus does not update the ledger while we are reading the ledger.
std::scoped_lock<std::mutex> ledger_lock(ctx.ledger_mutex);
for (const auto &[session_id, hr] : history_requests)
{
// First check whether we have the required lcl available.
if (!check_required_lcl_availability(hr.required_lcl))
continue;
p2p::history_response resp;
if (ledger::retrieve_ledger_history(hr, resp) != -1)
{
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_response(fbuf, resp);
std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize());
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (peer_itr != p2p::ctx.peer_connections.end())
{
comm::comm_session *session = peer_itr->second;
session->send(msg);
}
}
}
return history_requests.empty() ? 0 : 1;
}
/**

View File

@@ -14,7 +14,9 @@ namespace ledger
{
// The current target lcl that we are syncing towards.
std::string target_lcl;
uint64_t target_lcl_seq_no;
uint64_t target_lcl_seq_no = 0;
uint64_t target_requested_on = 0;
uint16_t request_submissions = 0;
std::mutex target_lcl_mutex;
// Lists holding history requests and responses collected from incoming p2p messages.
@@ -25,6 +27,15 @@ namespace ledger
std::thread lcl_sync_thread;
std::atomic<bool> is_syncing = false;
std::atomic<bool> is_shutting_down = false;
void clear_target()
{
target_lcl.clear();
target_lcl_seq_no = 0;
target_requested_on = 0;
request_submissions = 0;
is_syncing = false;
}
};
struct ledger_context
@@ -70,9 +81,15 @@ namespace ledger
void deinit();
void set_sync_target(const std::string &target_lcl);
void lcl_syncer_loop();
void set_sync_target(const std::string &target_lcl);
void send_lcl_sync_request();
int check_lcl_sync_responses();
int check_lcl_sync_requests();
const std::pair<uint64_t, std::string> get_ledger_cache_top();

View File

@@ -226,6 +226,7 @@ namespace msg::fbuf::p2pmsg
p.pubkey = flatbuff_bytes_to_sv(pubkey);
p.sent_timestamp = timestamp;
p.recv_timestamp = util::get_epoch_milliseconds();
p.time = msg.time();
p.nonce = flatbuff_bytes_to_sv(msg.nonce());
p.stage = msg.stage();

View File

@@ -47,6 +47,7 @@ namespace p2p
std::string pubkey;
uint64_t sent_timestamp = 0; // The timestamp of the sender when this proposal was sent.
uint64_t recv_timestamp = 0; // The timestamp when we received the proposal. (used for statsitics)
uint64_t time = 0; // The time value that is voted on.
uint8_t stage = 0;
std::string nonce; // Random nonce that is used to reduce lcl predictability.

View File

@@ -214,7 +214,7 @@ namespace p2p
if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.pubkey, std::move(response)));
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
}
else
{

View File

@@ -22,6 +22,9 @@ namespace state_sync
// Request loop sleep time (milliseconds).
constexpr uint16_t REQUEST_LOOP_WAIT = 10;
// Max no. of repetitive reqeust resubmissions before abandoning the sync.
constexpr uint16_t ABANDON_THRESHOLD = 20;
constexpr int FILE_PERMS = 0644;
// No. of milliseconds to wait before resubmitting a request.
@@ -93,9 +96,9 @@ namespace state_sync
while (!ctx.is_shutting_down)
{
hpfs::h32 new_state = hpfs::h32_empty;
request_loop(ctx.target_state, new_state);
const int result = request_loop(ctx.target_state, new_state);
if (ctx.is_shutting_down)
if (result == -1 || ctx.is_shutting_down)
break;
ctx.pending_requests.clear();
@@ -133,13 +136,16 @@ namespace state_sync
LOG_INFO << "State sync: Worker stopped.";
}
void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state)
int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state)
{
std::string lcl = ledger::ctx.get_lcl();
// Indicates whether any responses were processed in the previous loop iteration.
bool prev_responses_processed = false;
// No. of repetitive resubmissions so far. (This is reset whenever we receive a state response)
uint16_t resubmissions_count = 0;
// Send the initial root state request.
submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}, lcl);
@@ -162,12 +168,16 @@ namespace state_sync
prev_responses_processed = !ctx.candidate_state_responses.empty();
// Reset resubmissions counter whenever we have a resposne.
if (!ctx.candidate_state_responses.empty())
resubmissions_count = 0;
for (auto &response : ctx.candidate_state_responses)
{
if (should_stop_request_loop(current_target))
return;
return 0;
LOG_DEBUG << "State sync: Processing state response from [" << response.first.substr(0, 10) << "]";
LOG_DEBUG << "State sync: Processing state response from [" << response.first.substr(2, 10) << "]";
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data());
const msg::fbuf::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message();
@@ -201,7 +211,7 @@ namespace state_sync
if (hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/") < 1)
{
LOG_ERROR << "State sync: exiting due to hash check error.";
return;
return -1;
}
// Update the central state tracker.
@@ -209,7 +219,7 @@ namespace state_sync
LOG_DEBUG << "State sync: current:" << updated_state << " | target:" << current_target;
if (updated_state == current_target)
return;
return 0;
}
ctx.candidate_state_responses.clear();
@@ -218,7 +228,7 @@ namespace state_sync
for (auto &[hash, request] : ctx.submitted_requests)
{
if (should_stop_request_loop(current_target))
return;
return 0;
if (request.waiting_time < REQUEST_RESUBMIT_TIMEOUT)
{
@@ -227,6 +237,12 @@ namespace state_sync
}
else
{
if (++resubmissions_count > ABANDON_THRESHOLD)
{
LOG_INFO << "State sync: Resubmission threshold exceeded. Abandoning sync.";
return -1;
}
// Reset the counter and re-submit request.
request.waiting_time = 0;
LOG_DEBUG << "State sync: Resubmitting request...";
@@ -241,7 +257,7 @@ namespace state_sync
for (int i = 0; i < available_slots && !ctx.pending_requests.empty(); i++)
{
if (should_stop_request_loop(current_target))
return;
return 0;
const backlog_item &request = ctx.pending_requests.front();
submit_request(request, lcl);
@@ -249,6 +265,8 @@ namespace state_sync
}
}
}
return 0;
}
/**
@@ -299,9 +317,10 @@ namespace state_sync
std::string target_pubkey;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey);
LOG_DEBUG << "State sync: Requesting from [" << target_pubkey.substr(0, 10) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
if (!target_pubkey.empty())
LOG_DEBUG << "State sync: Requesting from [" << target_pubkey.substr(2, 10) << "]. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
}
/**

View File

@@ -62,7 +62,7 @@ namespace state_sync
void state_syncer_loop();
void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state);
int request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state);
bool should_stop_request_loop(const hpfs::h32 current_target);

View File

@@ -1,7 +1,9 @@
#!/bin/bash
# Script to generate docker container clusters for local development testing.
# Generate contract sub-directories within "hpcluster" directory for the given no. of cluster nodes.
# Usage (to generate 5-node cluster): ./cluster-create.sh 5
# Usage: To generate 5-node cluster: ./cluster-create.sh 5
# Specify log level (default: inf): ./cluster-create.sh 5 dbg
# Specify round time (default: 1000): ./cluster-create.sh 5 inf 2000
# Validate the node count arg.
if [ -n "$1" ] && [ "$1" -eq "$1" ] 2>/dev/null; then
@@ -12,6 +14,8 @@ else
fi
ncount=$1
loglevel=$2
roundtime=$3
hpcore=$(realpath ../..)
# Contract can be set with 'export CONTRACT=<name>'. Defaults to nodejs echo contract.
@@ -39,6 +43,13 @@ else # nodejs echo contract (default)
binargs="/contract/bin/echo_contract.js"
fi
if [ "$loglevel" = "" ]; then
loglevel=inf
fi
if [ "$roundtime" = "" ]; then
roundtime=1000
fi
# Delete and recreate 'hpcluster' directory.
rm -rf hpcluster > /dev/null 2>&1
mkdir hpcluster
@@ -80,8 +91,8 @@ do
appbillargs: '', \
peerport: ${peerport}, \
pubport: ${pubport}, \
roundtime: 1000, \
loglevel: 'inf', \
roundtime: $roundtime, \
loglevel: '$loglevel', \
loggers:['console', 'file'] \
}, null, 2)" > hp.cfg
rm tmp.json

View File

@@ -51,12 +51,12 @@ let nodeid=$2-1
if [ "$mode" = "info" ] || [ "$mode" = "new" ] || [ "$mode" = "update" ] || [ "$mode" = "reconfig" ] || \
[ "$mode" = "start" ] || [ "$mode" = "stop" ] || [ "$mode" = "check" ] || [ "$mode" = "log" ] || [ "$mode" = "kill" ] || \
[ "$mode" = "ssh" ] || [ "$mode" = "reboot" ] || [ "$mode" = "dns" ] || [ "$mode" = "ssl" ] || [ "$mode" = "lcl" ]; then
[ "$mode" = "ssh" ] || [ "$mode" = "reboot" ] || [ "$mode" = "dns" ] || [ "$mode" = "ssl" ] || [ "$mode" = "lcl" ] || [ "$mode" = "pubkey" ]; then
echo "mode: $mode ($contdir)"
else
echo "Invalid command. [ info | new | update | reconfig" \
" | start [N] | stop [N] | check [N] | log <N> | kill [N] | reboot <N> | ssh <N>or<command>" \
" | dns <N> <zerossl file> | ssl <N> | lcl ] expected."
" | dns <N> <zerossl file> | ssl <N> | lcl | pubkey <N> ] expected."
exit 1
fi
@@ -75,6 +75,7 @@ fi
# dns - Uploads given zerossl domain verification file to vm and starts http server for DNS check.
# ssl - Uploads matching zerossl certificate bundle from ~/Downloads/ to the contract.
# lcl - Displays the lcls of all nodes.
# pubkey - Displays the pubkey on specified vm node or entire cluster.
if [ $mode = "info" ]; then
echo "${vmaddrs[*]}" | tr ' ' '\n'
@@ -246,6 +247,23 @@ if [ $mode = "lcl" ]; then
exit 0
fi
if [ $mode = "pubkey" ]; then
command="cat $contdir/cfg/hp.cfg | grep pubkeyhex | cut -d '\"' -f4"
if [ $nodeid = -1 ]; then
for (( i=0; i<$vmcount; i++ ))
do
vmaddr=${vmaddrs[i]}
let nodeid=$i+1
echo "node"$nodeid":" $(sshpass -p $vmpass ssh $vmuser@$vmaddr $command) &
done
wait
else
vmaddr=${vmaddrs[$nodeid]}
sshpass -p $vmpass ssh $vmuser@$vmaddr $command
fi
exit 0
fi
# All code below this will only execute in 'new', 'update' or 'reconfig' mode.
# Run setup/configuration of entire cluster.
@@ -307,7 +325,7 @@ function joinarr {
arr=("${!arrname}")
skip=$2
let prevlast=$ncount-2
let prevlast=$vmcount-2
# Resetting prevlast if nothing is given to skip.
if [ $skip -lt 0 ]
then
@@ -331,7 +349,7 @@ function joinarr {
done
str="$str]"
echo $str
echo $str # This returns the result.
}
# Loop through all nodes hp.cfg.