mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Consensus with network clock (#71)
This commit is contained in:
BIN
examples/rndcontract/rnd_contract
Executable file
BIN
examples/rndcontract/rnd_contract
Executable file
Binary file not shown.
@@ -186,8 +186,9 @@ void consensus()
|
||||
}
|
||||
if (is_lcl_desync)
|
||||
{
|
||||
//We are resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time.
|
||||
LOG_DBG << "time off: " << std::to_string(ctx.reset_time);
|
||||
//Resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time.
|
||||
//todo: This might not needed with current synchronization with network clock.
|
||||
// LOG_DBG << "time off: " << std::to_string(ctx.reset_time);
|
||||
timewait_stage(true, ctx.reset_time);
|
||||
const uint16_t decrement = rand() % (conf::cfg.roundtime / 40);
|
||||
|
||||
@@ -219,19 +220,60 @@ void consensus()
|
||||
apply_ledger(stg_prop);
|
||||
ctx.reset_time = MAX_RESET_TIME;
|
||||
|
||||
// We have finished a consensus round (all 4 stages).
|
||||
LOG_INFO << "****Stage 3 consensus reached**** (state:" << *reinterpret_cast<const hasher::B2H *>(cons::ctx.curr_hash_state.c_str()) << ")";
|
||||
// node has finished a consensus round (all 4 stages).
|
||||
LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl
|
||||
<< " state:" << *reinterpret_cast<const hasher::B2H *>(cons::ctx.curr_hash_state.c_str()) << ")";
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// We have finished a consensus stage.
|
||||
|
||||
// Node has finished a consensus stage.
|
||||
// Transition to next stage.
|
||||
ctx.stage = (ctx.stage + 1) % 4;
|
||||
|
||||
// after a stage proposal we will just busy wait for proposals.
|
||||
util::sleep(conf::cfg.roundtime / 4);
|
||||
//Here nodes try to synchronise nodes stages using network clock.
|
||||
uint64_t now = util::get_epoch_milliseconds();
|
||||
|
||||
// round start is the floor
|
||||
uint64_t round_start = ((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime;
|
||||
|
||||
uint64_t next_stage_start = 0;
|
||||
|
||||
// Compute start time of next stage.
|
||||
// Last stage (stage 3) waiting twice as any other stage's waiting time.
|
||||
// This is for a node to catch up from lcl/state desync,
|
||||
// becuase we are waiting (round time + time to next stage) to sync.
|
||||
if (ctx.stage == 3)
|
||||
next_stage_start = round_start + conf::cfg.roundtime;
|
||||
else
|
||||
next_stage_start = round_start + (int64_t)(ctx.stage * ((double)conf::cfg.roundtime / 5.0));
|
||||
|
||||
// Compute stage time wait.
|
||||
// Node wait between stages to collect enough proposals from previous stages from other nodes.
|
||||
int64_t to_wait = next_stage_start - now;
|
||||
|
||||
LOG_DBG << "now = " << now << ", roundtime = " << conf::cfg.roundtime << ", round_start = " << round_start << ", next_stage_start = " << next_stage_start << ", to_wait = " << to_wait;
|
||||
|
||||
// If a node doesn't have enough time (due to network delay) to recieve/send reliable stage proposals for next stage,
|
||||
// it will continue particapating in this round, otherwise will join in next round(s).
|
||||
if (to_wait < floor(conf::cfg.roundtime / 10)) //todo: self claculating/adjusting network delay
|
||||
{
|
||||
uint64_t next_round = round_start;
|
||||
while (to_wait < floor(conf::cfg.roundtime / 10))
|
||||
{
|
||||
next_round += conf::cfg.roundtime;
|
||||
to_wait = next_round - now;
|
||||
}
|
||||
|
||||
LOG_INFO << "we missed a round, waiting " << to_wait << " and resetting to stage 0";
|
||||
ctx.stage = 0;
|
||||
util::sleep(to_wait);
|
||||
}
|
||||
else
|
||||
{
|
||||
// after a stage proposal we will just busy wait for proposals.
|
||||
util::sleep(to_wait);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -411,7 +453,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
|
||||
|
||||
// time is voted on a simple sorted (highest to lowest) and majority basis, since there will always be disagreement.
|
||||
int32_t highest_time_vote = 0;
|
||||
for(auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr)
|
||||
for (auto itr = votes.time.rbegin(); itr != votes.time.rend(); ++itr)
|
||||
{
|
||||
const uint64_t time = itr->first;
|
||||
const int32_t numvotes = itr->second;
|
||||
@@ -872,4 +914,4 @@ void increment(std::map<T, int32_t> &counter, const T &candidate)
|
||||
counter.try_emplace(candidate, 1);
|
||||
}
|
||||
|
||||
} // namespace cons
|
||||
} // namespace cons
|
||||
|
||||
@@ -12,11 +12,9 @@ namespace cons
|
||||
{
|
||||
|
||||
// Max number of requests that can be awaiting response at any given time.
|
||||
constexpr uint16_t MAX_AWAITING_REQUESTS = 4;
|
||||
constexpr uint16_t MAX_AWAITING_REQUESTS = 1;
|
||||
// Syncing loop sleep delay.
|
||||
constexpr uint16_t SYNC_LOOP_WAIT = 50;
|
||||
// No of loop cycles to wait for a response before resubmitting request.
|
||||
constexpr uint16_t MAX_AWAITING_CYCLES = 1000 / SYNC_LOOP_WAIT;
|
||||
constexpr uint16_t SYNC_LOOP_WAIT = 100;
|
||||
|
||||
// List of state responses flatbuffer messages to be processed.
|
||||
std::list<std::string> candidate_state_responses;
|
||||
@@ -37,7 +35,7 @@ void request_state_from_peer(const std::string &path, const bool is_file, const
|
||||
|
||||
p2p::peer_outbound_message msg(std::make_unique<flatbuffers::FlatBufferBuilder>(1024));
|
||||
fbschema::p2pmsg::create_msg_from_state_request(msg.builder(), sr, lcl);
|
||||
p2p::send_message_to_random_peer(msg);
|
||||
p2p::send_message_to_random_peer(msg); //todo: send to a node that hold the majority state to improve reliability of retrieving state.
|
||||
}
|
||||
|
||||
int create_state_response(p2p::peer_outbound_message &msg, const p2p::state_request &sr)
|
||||
@@ -159,7 +157,8 @@ int run_state_sync_iterator()
|
||||
// Check for long-awaited responses and re-request them.
|
||||
for (auto &[hash, request] : submitted_requests)
|
||||
{
|
||||
if (request.waiting_cycles < MAX_AWAITING_CYCLES)
|
||||
// We wait for half of round time before each request is resubmitted.
|
||||
if (request.waiting_cycles < (conf::cfg.roundtime / (SYNC_LOOP_WAIT * 2)))
|
||||
{
|
||||
// Increment counter.
|
||||
request.waiting_cycles++;
|
||||
|
||||
@@ -7,17 +7,26 @@ mode=$1
|
||||
|
||||
hpcore=$(realpath ../..)
|
||||
|
||||
if [ "$mode" = "new" ] || [ "$mode" = "run" ] || [ "$mode" = "update" ]; then
|
||||
if [ "$mode" = "new" ] || [ "$mode" = "run" ] || [ "$mode" = "update" ] || [ "$mode" = "kill" ]; then
|
||||
echo ""
|
||||
else
|
||||
echo "Invalid command. new | run | update expected."
|
||||
echo "Invalid command. new | run | update | kill expected."
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if [ $mode = "run" ]; then
|
||||
let nodeid=$2-1
|
||||
vmip=${vmips[$nodeid]}
|
||||
sshpass -p $vmpass ssh geveo@$vmip 'sudo ./hpcore run contract'
|
||||
sshpass -p $vmpass ssh geveo@$vmip 'nohup sudo ./hpcore run contract'
|
||||
sshpass -p $vmpass ssh geveo@$vmip 'tail -f nohup.out'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
if [ $mode = "kill" ]; then
|
||||
let nodeid=$2-1
|
||||
vmip=${vmips[$nodeid]}
|
||||
sshpass -p $vmpass ssh geveo@$vmip 'sudo kill $(pidof hpcore)'
|
||||
sshpass -p $vmpass ssh geveo@$vmip 'sudo kill $(pidof hpstatemon)'
|
||||
exit 0
|
||||
fi
|
||||
|
||||
@@ -80,7 +89,7 @@ do
|
||||
mypeers=$(joinarr peers $j)
|
||||
myunl=$(joinarr pubkeys $j)
|
||||
|
||||
node -p "JSON.stringify({...require('./cfg/node$n.json'),binary:'/usr/bin/node',binargs:'/home/geveo/contract.js',peers:${mypeers},unl:${myunl}}, null, 2)" > ./cfg/node$n.cfg
|
||||
node -p "JSON.stringify({...require('./cfg/node$n.json'),binary:'/usr/bin/node',binargs:'/home/geveo/contract.js',peers:${mypeers},unl:${myunl},loggers:['console', 'file']}, null, 2)" > ./cfg/node$n.cfg
|
||||
|
||||
# Copy local cfg file back to remote vm.
|
||||
vmip=${vmips[j]}
|
||||
|
||||
@@ -20,7 +20,7 @@ while true; do
|
||||
|
||||
echo 'starting ...'
|
||||
STARTTIME=`date +%s`
|
||||
nohup ~/hpcore run ~/contract > $PIPE 2>> $PIPE 3>MARKER &
|
||||
nohup sudo ~/hpcore run ~/contract > $PIPE 2>> $PIPE 3>MARKER &
|
||||
PID=$!
|
||||
sleep 1
|
||||
|
||||
|
||||
@@ -13,6 +13,7 @@ if [ $mode = "new" ]; then
|
||||
sshpass -p $vmpass scp $hpcore/build/hpcore \
|
||||
$hpcore/build/hpstatemon \
|
||||
$hpcore/examples/echocontract/contract.js \
|
||||
$hpcore/examples/rndcontract/rnd_contract \
|
||||
/usr/local/lib/x86_64-linux-gnu/libfuse3.so.3 \
|
||||
/usr/local/bin/fusermount3 \
|
||||
./consensus-test-continuous.sh \
|
||||
@@ -25,5 +26,7 @@ else
|
||||
sshpass -p $vmpass scp $hpcore/build/hpcore \
|
||||
$hpcore/build/hpstatemon \
|
||||
$hpcore/examples/echocontract/contract.js \
|
||||
$hpcore/examples/rndcontract/rnd_contract \
|
||||
./consensus-test-continuous.sh \
|
||||
geveo@$vmip:~/
|
||||
fi
|
||||
Reference in New Issue
Block a user