From 4fefb7ca714c0d753db7d0a4eadd78f47e2eb17a Mon Sep 17 00:00:00 2001 From: Ravin Perera <33562092+ravinsp@users.noreply.github.com> Date: Mon, 10 Feb 2020 14:27:37 +0100 Subject: [PATCH] Refactored stage sync logic. (#86) * Cleaned up stage time sync logic and avoided extra missed rounds. * Moved stage sync time logic to beginning of consensus stage. * Removed check_majority_stage. * Re-organised lcl sync flow. --- examples/random_contract/.gitignore | 1 + examples/random_contract/rnd_contract | Bin 16656 -> 0 bytes .../{rnd_contrac.c => rnd_contract.c} | 6 +- src/conf.cpp | 12 +- src/conf.hpp | 1 + src/cons/cons.cpp | 292 +++++++----------- src/cons/cons.hpp | 15 +- src/cons/ledger_handler.cpp | 71 ++--- src/cons/ledger_handler.hpp | 9 +- src/p2p/p2p.cpp | 39 ++- src/p2p/p2p.hpp | 6 +- src/p2p/peer_session_handler.cpp | 4 +- src/proc.cpp | 4 +- src/sock/socket_session.cpp | 2 +- 14 files changed, 216 insertions(+), 246 deletions(-) create mode 100644 examples/random_contract/.gitignore delete mode 100755 examples/random_contract/rnd_contract rename examples/random_contract/{rnd_contrac.c => rnd_contract.c} (90%) diff --git a/examples/random_contract/.gitignore b/examples/random_contract/.gitignore new file mode 100644 index 00000000..eceb15dd --- /dev/null +++ b/examples/random_contract/.gitignore @@ -0,0 +1 @@ +rnd_contract \ No newline at end of file diff --git a/examples/random_contract/rnd_contract b/examples/random_contract/rnd_contract deleted file mode 100755 index 97e396402703687bfd8832960f90998d589b5f3b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16656 zcmeHOdvqMtdB3x($7&_5*4D!|hInjjupP9LjQqkjNPaO&WMjaPq=5{p-IcWUYFFKz zH8zPri6Y8Yc4{|-V>q-p4JCxbNl8y=eVjOzF#(h0G^S}&l8|!@HAI3?R1&~7!Rqh3 zbHCN>csJ=e?LR%o*VcUZ`+bl5-N)RWd*{wQTe`P;G)-{wifaXNt$rKnP=W(%R0gC& ztPxeXE)okx1^6lJ0Y8(~Lr{!f!?CUKK0uT;B?tN59!>WY>`yk}K=OCs8wh$pjq zn)j?()x4@blu3nF$a<51(!GBBEixo(R^>{{8^9Ar@-1Csuh0JUmQT#LF4?~PoqKNB zp8d&nq}zl)?l+myf*!)=;BVPw`z~Gk&KM65Acg69uzDdRxs|xiE`r}y0$*GL?<#@c zUjnBxiwtCGdBE>-cl0 zy8$e6gW&7&SFB#!fzJ|ig|6$!3BmVc1bMLNLMw&YwR4+^Vq$&qj1^1o+_ou^O2&4EdlKZXZy=Ra z?o3;?s3NtO2mieCN;$dVX{by*rm%*1yq50tZz(=| zNxX`Deg`}yo`1QYT4?|x8JFpjGCrk#k9@BFWB#QDwm!{)6c4A<4%|6EPB?JoTAEBb za2jKr&N^_KyEySUqu(T&n>g_}Bb??dPM=hMDgp3(NU}Q|IL(Wk`W-mW^&}f~;LhK} zJr118;56*OxlKuSp94q7 zc|h_W;%R734om)ABY5h%$wA4#Lp*iakne z9Xq@Cmp_EM8T)H5mt@21(3)<%7dF4GTdVN7Gr<;S3!-nXN}xz);x5<9}$Z# zsJ(sqbZ@9`!55|f+fPud&+xF1lj8oBJ<`px^RPFLpZ6NMDdYIWb%yqw@#1-FE*vgY z4r|C^DSyKoZiJf1wiqKD-fuwe7K*3F7~60T+(a#EJYnGST=_ESX%x;d1@@a0aEbbm z`t6gw(4d_0`CkuWDCCYkjHWVjPagn@8e?TkmcqoyZ8_UEJ|bmaYq>EtN@8Oyh*-am z!ceE(7$-*V`6E=q#48_8PfvayBgxo5N?m_+SUP#`AEoqbkSc+(--sgn4!j+ujx=)r z1SbzKR(>L*t!nvrqb4iw5PJCuoEkT5rTip)oj5fRx? zfi}foZpVL`qUHD(!g8{lki2iGila0(zejbTa4v;XZuHyaeatQd;FS_QG6No^`zdd; zltP-(e&KgA!k3^vu??aJ-#@@?qq@a++*B6vXkP3r63(F?aI)H;$hx&z(GecEPe|a;J8UN0%MXoiq-iVJ4*0 z$CcAJjfo#nS;vjYaU(wd`QhPVj7nXDWo7P}yo1~`$6xiq)5&G0a?gy7lC^Q@zWdAR zjvJAz#;CDY7IEj-jj{a{mofGgD$>~hG%1gbQY)d!7Rwqw3ufdPU4?ZAnn)TPpJ>XB zo+O(wSu^0$gLVp`LG&Yt#-u`S^hxC3Y(j@N%+_W9Joy1dL@**EvQFyX{qmdIff3o3 zz^O^s$(F9$_XP5e&UyLV^t2q###r`j)VR7`-U?Rv#7`i2;lzQ?PjueemHW5OTRJg! zEcsqQP7TdOZ|=-}*p+*)d)aGpjz0dOcj5-v7^81ldfPAfe(uhl>(0HmIrnzwbi>QW z$O+9@`^)SbG>3os_RiZnZ}0p}r+MPQ%>3~iOzqO2n&mb1tDDV)`(pZ%jJ{qzmxTvn zx|P!D(JwWS*sI5)aqC*YNcVhHL{iyAR8OWXogN8wQimN4YHhxE9W9G#^8T0qnx3W^ z>kR>0%39Y=Mg{aIpn0w!p;} zxYz<0Ti{{~Tx@}hE%5)X1$e)j_orzIMu{o!kxr>a7w?UpQJbOjZ1wy;_g0CQtD9qsl0X@ zR`$Fu<-O)Pwz?Qp0OP|d5RQ9F>G7U3m;VukvmTbEvVJH|LRdaZzC-!La+>ARa!TQlL6^)j`64NBLmY520B0A?4>*%_W`ll-H|1 zMLN@zU+Qlp9ib7^>VJwDO)_iz-y#c-WIFs4Bq@`O;eUvja>?xQFDGXelKG_nK4N?t zy>qz3znRqhS`wt+zn+*Htp^H&{$s=hC1d$lP&u=;K}h!a|AQPfXt{XZkMIoeJz zU+{l{)aGgyuo3?$QoB_9BAEO9t4Oj$`z){n{*%NsYt#&f{68ipB$>nhpAyp|naBOV zB&Jm|Px;RfvqEcz%@IHCa+h5#V;lEBK$4Z(p8XdA8Uuc_v~swXM`SP!g%qQbWtScQ*#)KH&N4Qgp55!wnyqSU|= ztrr15%!AGP?{ww&ym4^G&t^OXHtQmDHd7yly9KDemxk^3Oc z&ee-$7CBM9LYjPyva4m*1KsL%l%0JHL20#=b>FYr!`-;G>$~#`-h=yw>aUb673MuS zJk-#W?G;)BDS8i{zPs^T@T1MSO|q{JHCP#;$)@riJUr9@%>@Z58Xsz`^+5A-D%X4P zQOKe~TVivM4>jPGy3m%=pzTgy zwF}{=1;Ri*OxMrD@NBnz!05Vp1Oyd;a&2JNLR4pVNV{&H9rcy-y6UMa9djSpIFFpx zm*>OqU>TuU~qVN!tmB=KI`>^p5;xxQO~0K ztZ&;7Jd5ie@q`*YS2Xy#eIvfxef{73mG8@*tL6w#nfE(%@pR4t(JHJ8Uz2BP{a^U% zeUA|SGrGK@L~nz7yeVC=-phQtZv%$6rXiX*&hvnT(|sa_mgz~PBD-)5<_44MEbEx0 z13H9c!n#z)r(_I7P>YQ?@Pv_dZa&`CZ&iYvb<1+I8$0YX%}YK7+$F@sZu7n zUDZyFmrZ6fvFHjJhHMSlh=VCRFeuy2Lx-QtWNZ&L!C*MrW)qkZcePF0ZL-om?CmZr z2y!62$3&AmDo=gg(w`cLwWPzl*<@@(UkrmN9swIpNBTF|=OOjxKE1hDo*ru+OsA|^#7d=`ac&H~D}(64xw7n_ zDZ7B~JG(rxjVtlT(5cdjtgmfn_CJSDtI%Qs*=p8Srk3}428h7%FO2O+p zr_75bBCiVz@hZXV&qCavU#Ax0)q>aSg?J4f_??KI_ljDf7g=|CMIe8U+G*t#b%NLb zg?La*(Zhl(dGXx1zRm@L_2LBNo}zetem}u!<;7cI-bX0J8*#o_cS17n#fdgv_dA7N zI?<*V)1NtaS*VX+^CHPBcz2^v=oP#R;>LOR!;P!M*@ZCLhj$ZZ1oSgrCjL{N2$@0g zU;@U2{<_1}kJKH6A9mpj70&xU?2DG68qS#-s(q+DO0?M0inRX#aPrUlNTf??Fa9)~ za^>@p62#>Y{))w4s$O50erAi$1=+(4E#~KGDXza@oIEG}TsTgCtn}S+zAEtx$L+h) z&xPZv8Wot0b~vrZKTikqf$P#&;e{T~i=Fd*S?XUnK0glJc|MT6nZSb>&q2Mgl-?5h zjS|1`{=KV&J}qvGmHT)JoSr<3>Hi!!#p%B9epN#McO~#TG;Co_?Uwim@M7hz06xnz zNAUg-w;LVMFQ$JJ=_68}$LtLsA;Kl}vl0)8LDlauRlsNo{o{}aaewX5bq+<+)3UxZ z^X*5K^uZE-o&-*Q-2Hx}g#M2t9=Krb`(g?GGbM0Z`BQtk%cX^OVU6qx(BgHGs*97{ zBmH0aJ4IVF#rkCp{1iR^9!XmnE8E+Pb;8W~anl+wad&b#(B-d^&?~e-%)OnXzct3ac*x zTq4(31vAoi`7yx^FAw;;zX>Q5%a0ZcA#@yywvYYu2M2{3^6Q2|$n~v&{e^>DC7MZ@ z{WvLyZwu@%9cC2pfr5MxrAJwlp77OY4>KB$%1}#FN2)I`3U%aH6@`##_GB`uLiVQ; zg>w0+g#2)$5T{Qr3K7%97ZN;}sGEy^4^jAaMxh6Lu~vwQP-gD{9?C)SWr(ExoWl|+ zmL3!#oFj~dIyZJTTj4&%Vc#Isla1q3Dr{GUi0cpIJCsm#Zxa4&iWR1m?8XW*j@6`- zF(FIG5@9k>xxs`bLb7Q?$cFk-U@cljh3KAyQc4cXP^@3wh5b=DVa_(SZ%5mL^C%%a z5RV`x8r30mhX`RR7{KifEY_3lgJw9{hj~M>@nmm`^WmPJbZj?s_--u5d9;#qa{FK8 z>B$jOAfBo0#OoHO?k7mnb*9U~(5@)k^ZJIV?o>vef8Q8xL&j;(>ma6!Sy559{bu0r z;hhzi&+8?oOO+iji?BskkhX)vu3A3+apj0e%2H;Qb*#s97v%Jgk9l6t<*BlF$FE><7c$hQY|raHrc+AM z-F{sEFDm!p6z+PZm1yd9t%rkuj7>zx)=XE`Qzs%n$^Y~;t?nhc*QWe>r*Z&<003wac z>&%7inZ5y$(>}jm(QO+phdr \n", argv[0]); @@ -23,12 +22,14 @@ int main(int argc, char **argv) char buf[128]; read(STDIN_FILENO, buf, 128); + // Read timestamp mentioned in contract args json. char tsbuf[14]; memcpy(tsbuf, &buf[100], 13); tsbuf[13] = '\0'; int ts = atoi(tsbuf); //printf("args input: %.13s\n", tsbuf); + // Use contract args timestamp to initialize random seed. srand(ts); FILE *f = fopen(argv[1], "rb+"); @@ -71,7 +72,7 @@ int main(int argc, char **argv) int end_block = (offset + bytestowrite) / (4 * 1024 * 1024); for (int i = start_block; i <= end_block; ++i) { - //printf("@@@ pid %d wrote to block %d ... %d bytes\n", pid, i, n); + // printf("@@@ pid %d wrote to block %d ... %d bytes\n", pid, i, n); fflush(stdout); } } @@ -79,4 +80,5 @@ int main(int argc, char **argv) // done! fclose(f); + return 0; } diff --git a/src/conf.cpp b/src/conf.cpp index 0a37737a..7f9238d1 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -2,6 +2,7 @@ #include "conf.hpp" #include "crypto.hpp" #include "util.hpp" +#include "hplog.hpp" #include #include @@ -33,8 +34,8 @@ int init() // Append self peer to peer list. const std::string portstr = std::to_string(cfg.peerport); - const std::string peerid = "0.0.0.0:" + portstr; - cfg.peers.emplace(std::move(peerid), std::make_pair("0.0.0.0", portstr)); + cfg.self_peer_id = "0.0.0.0:" + portstr; + cfg.peers.emplace(cfg.self_peer_id, std::make_pair("0.0.0.0", portstr)); // Append self pubkey to unl list. cfg.unl.emplace(cfg.pubkey); @@ -608,10 +609,15 @@ int is_schema_valid(const rapidjson::Document &d) void change_operating_mode(const OPERATING_MODE mode) { // Do not allow to change the mode if the node was started as an observer. - if (cfg.startup_mode == OPERATING_MODE::OBSERVER) + if (cfg.startup_mode == OPERATING_MODE::OBSERVER || cfg.current_mode == mode) return; cfg.current_mode = mode; + + if (mode == OPERATING_MODE::OBSERVER) + LOG_DBG << "Switched to OBSERVER mode."; + else + LOG_DBG << "Switched back to PROPOSER mode."; } } // namespace conf diff --git a/src/conf.hpp b/src/conf.hpp index 1e90ea72..94e361b3 100644 --- a/src/conf.hpp +++ b/src/conf.hpp @@ -48,6 +48,7 @@ struct contract_config std::vector runtime_binexec_args; // Contract binary execution args used during runtime. std::vector runtime_appbill_args; // Appbill execution args used during runtime. OPERATING_MODE current_mode; // Current operating mode of the contract (Observer/Proposer) + std::string self_peer_id; // Peer session id of this node. (format: selfip:port) // Config elements which are loaded from the config file. diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 7193e44f..c399d832 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -30,7 +30,6 @@ constexpr float STAGE1_THRESHOLD = 0.5; constexpr float STAGE2_THRESHOLD = 0.65; constexpr float STAGE3_THRESHOLD = 0.8; constexpr float MAJORITY_THRESHOLD = 0.8; -constexpr uint16_t MAX_RESET_TIME = 200; consensus_context ctx; @@ -43,7 +42,7 @@ int init() ledger_history ldr_hist = load_ledger(); ctx.led_seq_no = ldr_hist.led_seq_no; ctx.lcl = ldr_hist.lcl; - ctx.cache.swap(ldr_hist.cache); + ctx.ledger_cache.swap(ldr_hist.cache); hasher::B2H root_hash = hasher::B2H_empty; if (statefs::compute_hash_tree(root_hash, true) == -1) @@ -54,9 +53,9 @@ int init() std::string str_root_hash(reinterpret_cast(&root_hash), hasher::HASH_SIZE); str_root_hash.swap(ctx.curr_hash_state); - if (!ctx.cache.empty()) + if (!ctx.ledger_cache.empty()) { - ctx.prev_hash_state = ctx.cache.rbegin()->second.state; + ctx.prev_hash_state = ctx.ledger_cache.rbegin()->second.state; } else { @@ -70,7 +69,13 @@ int init() }); ctx.prev_close_time = util::get_epoch_milliseconds(); - ctx.reset_time = MAX_RESET_TIME; + + // We allocate 1/5 of the round time to each stage expect stage 3. For stage 3 we allocate 2/5. + // Stage 3 is allocated an extra stage_time unit becayse a node needs enough time to + // catch up from lcl/state desync. + ctx.stage_time = conf::cfg.roundtime / 5; + ctx.stage_reset_wait_threshold = conf::cfg.roundtime / 10; + return 0; } @@ -79,6 +84,9 @@ void consensus() // A consensus round consists of 4 stages (0,1,2,3). // For a given stage, this function may get visited multiple times due to time-wait conditions. + if (!wait_and_proceed_stage()) + return; // This means the stage has been reset. + // Get the latest current time. ctx.time_now = util::get_epoch_milliseconds(); std::list collected_proposals; @@ -92,7 +100,7 @@ void consensus() } //Copy collected propsals to candidate set of proposals. - //Add mpropsals of new nodes and Replace messages from old nodes to reflect current status of nodes. + //Add propsals of new nodes and replace proposals from old nodes to reflect current status of nodes. for (const auto &proposal : collected_proposals) { auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey); @@ -102,7 +110,9 @@ void consensus() ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); } else + { ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal)); + } } // Throughout consensus, we move over the incoming npl messages collected via the network so far into // the candidate npl message set (move and append). This is to have a private working set for the consensus @@ -121,6 +131,8 @@ void consensus() p2p::ctx.collected_msgs.npl_messages.clear(); } + LOG_DBG << "Started stage " << std::to_string(ctx.stage); + if (ctx.stage == 0) // Stage 0 means begining of a consensus round. { // Broadcast non-unl proposals (NUP) containing inputs from locally connected users. @@ -132,153 +144,137 @@ void consensus() // In stage 0 we create a novel proposal and broadcast it. const p2p::proposal stg_prop = create_stage0_proposal(); - broadcast_proposal(stg_prop); } else // Stage 1, 2, 3 { - LOG_DBG << "Started stage " << std::to_string(ctx.stage) << "\n"; for (auto &[pubkey, proposal] : ctx.candidate_proposals) { bool self = proposal.pubkey == conf::cfg.pubkey; - LOG_DBG << "[stage" << std::to_string(proposal.stage) + LOG_DBG << "Proposal [stage" << std::to_string(proposal.stage) << "] users:" << proposal.users.size() << " hinp:" << proposal.hash_inputs.size() << " hout:" << proposal.hash_outputs.size() - << " lcl:" << proposal.lcl + << " lcl:" << proposal.lcl.substr(0, 15) << " state:" << *reinterpret_cast(proposal.curr_hash_state.c_str()) - << " self:" << self - << "\n"; + << " self:" << self; } // Initialize vote counters vote_counter votes; - // check if we're ahead/behind of consensus stage - bool is_stage_desync, reset_to_stage0; - uint8_t majority_stage; - check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes); - if (is_stage_desync) - { - timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 20)); - return; - } - // check if we're ahead/behind of consensus lcl bool is_lcl_desync, should_request_history; std::string majority_lcl; check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); - if (should_request_history) - { - // TODO: If we are in a lcl fork condition try to rollback state with the help of - // state_restore to rollback state checkpoints before requesting new state. - - //handle minority going forward when boostrapping cluster. - //Here we are mimicking invalid min ledger scenario. - if (majority_lcl == GENESIS_LEDGER) - { - last_requested_lcl = majority_lcl; - p2p::history_response res; - res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; - handle_ledger_history_response(std::move(res)); - } - else - { - //create history request message and request history from a random peer. - send_ledger_history_request(ctx.lcl, majority_lcl); - } - } if (is_lcl_desync) { - //Resetting to stage 0 to avoid possible deadlock situations by resetting every node in random time using max time. - //todo: This might not needed with current synchronization with network clock. - // LOG_DBG << "time off: " << std::to_string(ctx.reset_time); - timewait_stage(true, ctx.reset_time); - const uint16_t decrement = rand() % (conf::cfg.roundtime / 40); + ctx.is_lcl_syncing = true; - if (decrement > ctx.reset_time) - ctx.reset_time = MAX_RESET_TIME; - else - ctx.reset_time -= decrement; + if (should_request_history) + { + // TODO: If we are in a lcl fork condition try to rollback state with the help of + // state_restore to rollback state checkpoints before requesting new state. - return; + // Handle minority going forward when boostrapping cluster. + // Here we are mimicking invalid min ledger scenario. + if (majority_lcl == GENESIS_LEDGER) + { + ctx.last_requested_lcl = majority_lcl; + p2p::history_response res; + res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; + handle_ledger_history_response(std::move(res)); + } + else + { + //create history request message and request history from a random peer. + send_ledger_history_request(ctx.lcl, majority_lcl); + } + } } else { - //Node is in sync with current lcl ->switch to proposer mode. - conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); - } + const bool lcl_syncing_just_finished = ctx.is_lcl_syncing; + ctx.is_lcl_syncing = false; - if (ctx.stage == 1 || (ctx.stage == 3 && ctx.is_state_syncing)) - check_state(votes); + if (lcl_syncing_just_finished || ctx.stage == 1 || (ctx.stage == 3 && ctx.is_state_syncing)) + check_state(votes); - if (!ctx.is_state_syncing) - { - // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. - const p2p::proposal stg_prop = create_stage123_proposal(votes); - - broadcast_proposal(stg_prop); - - if (ctx.stage == 3) + if (!ctx.is_state_syncing) { - ctx.prev_close_time = stg_prop.time; - apply_ledger(stg_prop); - ctx.reset_time = MAX_RESET_TIME; + conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); - // node has finished a consensus round (all 4 stages). - LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl - << " state:" << *reinterpret_cast(cons::ctx.curr_hash_state.c_str()) << ")"; + // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. + const p2p::proposal stg_prop = create_stage123_proposal(votes); + + broadcast_proposal(stg_prop); + + if (ctx.stage == 3) + { + ctx.prev_close_time = stg_prop.time; + apply_ledger(stg_prop); + + // node has finished a consensus round (all 4 stages). + LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl.substr(0, 15) + << " state:" << *reinterpret_cast(cons::ctx.curr_hash_state.c_str()) << ")"; + } } } } - // Node has finished a consensus stage. - // Transition to next stage. + // Node has finished a consensus stage. Transition to next stage. ctx.stage = (ctx.stage + 1) % 4; +} - //Here nodes try to synchronise nodes stages using network clock. - uint64_t now = util::get_epoch_milliseconds(); +/** + * Syncrhonise the stage/round time for fixed intervals and reset the stage. + * @return True if consensus can proceed in the current round. False if stage is reset. + */ +bool wait_and_proceed_stage() +{ + // Here, nodes try to synchronise nodes stages using network clock. + // We devide universal time to windows of equal size of roundtime. Each round must be synced with the + // start of a window. - // round start is the floor - uint64_t round_start = ((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime; + const uint64_t now = util::get_epoch_milliseconds(); - uint64_t next_stage_start = 0; + // Rrounds are divided into windows of roundtime. + // This gets the start time of current round window. Stage 0 must start in the next window. + const uint64_t current_round_start = (((uint64_t)(now / conf::cfg.roundtime)) * conf::cfg.roundtime); - // Compute start time of next stage. - // Last stage (stage 3) waiting twice as any other stage's waiting time. - // This is for a node to catch up from lcl/state desync, - // becuase we are waiting (round time + time to next stage) to sync. - if (ctx.stage == 3) - next_stage_start = round_start + conf::cfg.roundtime; - else - next_stage_start = round_start + (int64_t)(ctx.stage * ((double)conf::cfg.roundtime / 5.0)); - - // Compute stage time wait. - // Node wait between stages to collect enough proposals from previous stages from other nodes. - int64_t to_wait = next_stage_start - now; - - LOG_DBG << "now = " << now << ", roundtime = " << conf::cfg.roundtime << ", round_start = " << round_start << ", next_stage_start = " << next_stage_start << ", to_wait = " << to_wait; - - // If a node doesn't have enough time (due to network delay) to recieve/send reliable stage proposals for next stage, - // it will continue particapating in this round, otherwise will join in next round(s). - if (to_wait < floor(conf::cfg.roundtime / 10)) //todo: self claculating/adjusting network delay + if (ctx.stage == 0) { - uint64_t next_round = round_start; - while (to_wait < floor(conf::cfg.roundtime / 10)) - { - next_round += conf::cfg.roundtime; - to_wait = next_round - now; - } + // Stage 0 must start in the next round window. + const uint64_t stage_start = current_round_start + conf::cfg.roundtime; + const int64_t to_wait = stage_start - now; - LOG_INFO << "we missed a round, waiting " << to_wait << " and resetting to stage 0"; - ctx.stage = 0; + LOG_DBG << "Waiting " << std::to_string(to_wait) << "ms for next round stage 0"; util::sleep(to_wait); + return true; } else { - // after a stage proposal we will just busy wait for proposals. - util::sleep(to_wait); + const uint64_t stage_start = current_round_start + (ctx.stage * ctx.stage_time); + + // Compute stage time wait. + // Node wait between stages to collect enough proposals from previous stages from other nodes. + const int64_t to_wait = stage_start - now; + + // If a node doesn't have enough time (eg. due to network delay) to recieve/send reliable stage proposals for next stage, + // it will continue particapating in this round, otherwise will join in next round. + if (to_wait < ctx.stage_reset_wait_threshold) //todo: self claculating/adjusting network delay + { + LOG_DBG << "Missed stage " << std::to_string(ctx.stage) << " window. Resetting to stage 0"; + ctx.stage = 0; + return false; + } + else + { + LOG_DBG << "Waiting " << std::to_string(to_wait) << "ms for stage " << std::to_string(ctx.stage); + util::sleep(to_wait); + return true; + } } } @@ -478,7 +474,6 @@ p2p::proposal create_stage0_proposal() // The proposal we are going to emit in stage 0. p2p::proposal stg_prop; stg_prop.time = ctx.time_now; - ctx.novel_proposal_time = ctx.time_now; stg_prop.stage = 0; stg_prop.lcl = ctx.lcl; stg_prop.curr_hash_state = ctx.curr_hash_state; @@ -588,13 +583,14 @@ p2p::proposal create_stage123_proposal(vote_counter &votes) */ void broadcast_proposal(const p2p::proposal &p) { - // In observer mode, we do not send out any proposals. - if (conf::cfg.current_mode == conf::OPERATING_MODE::OBSERVER) - return; - p2p::peer_outbound_message msg(std::make_shared(1024)); p2pmsg::create_msg_from_proposal(msg.builder(), p); - p2p::broadcast_message(msg, true); + + // In observer mode, we only send out the proposal to ourselves. + if (conf::cfg.current_mode == conf::OPERATING_MODE::OBSERVER) + p2p::send_message_to_self(msg); + else + p2p::broadcast_message(msg, true); // LOG_DBG << "Proposed [stage" << std::to_string(p.stage) // << "] users:" << p.users.size() @@ -602,42 +598,6 @@ void broadcast_proposal(const p2p::proposal &p) // << " hout:" << p.hash_outputs.size(); } -/** - * Check whether our current stage is ahead or behind of the majority stage. - */ -void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes) -{ - // Stage votes. - for (const auto &[pubkey, cp] : ctx.candidate_proposals) - { - // Vote stages if only proposal lcl is match with node's last consensus lcl - if (cp.lcl == ctx.lcl) - increment(votes.stage, cp.stage); - } - - majority_stage = 0; - is_desync = false; - - int32_t highest_votes = 0; - for (const auto [stage, votes] : votes.stage) - { - if (votes > highest_votes) - { - highest_votes = votes; - majority_stage = stage; - } - } - - if (majority_stage < ctx.stage - 1) - { - should_reset = (ctx.time_now - ctx.novel_proposal_time) > (floor(conf::cfg.roundtime) + floor(rand() % conf::cfg.roundtime)); - is_desync = true; - - LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) - << " is ahead of majority stage:" << std::to_string(majority_stage); - } -} - /** * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. */ @@ -721,21 +681,6 @@ float_t get_stage_threshold(const uint8_t stage) return -1; } -/** -* Awiat/Sleep consensus to time milliseconds and reset consensus. -* @param reset reset consensus stage to 0 or not. -* @param time milliseconds to sleep/await. -*/ -void timewait_stage(const bool reset, const uint64_t time) -{ - if (reset) - { - ctx.stage = 0; - } - - util::sleep(time); -} - /** * Calculate the effective ledger close time * After adjusting the ledger close time based on the current resolution, @@ -861,7 +806,7 @@ void dispatch_user_outputs(const p2p::proposal &cons_prop) /** * Check state against the winning and canonical state - * @param cons_prop The proposal that achieved consensus. + * @param votes The voting table. */ void check_state(vote_counter &votes) { @@ -882,19 +827,17 @@ void check_state(vote_counter &votes) } } - if (ctx.stage == 1 || ctx.stage == 3) + if (ctx.is_state_syncing) { - if (ctx.is_state_syncing) - { - std::lock_guard lock(cons::ctx.state_syncing_mutex); - hasher::B2H root_hash = hasher::B2H_empty; - int ret = statefs::compute_hash_tree(root_hash); - std::string str_root_hash(reinterpret_cast(&root_hash), hasher::HASH_SIZE); - str_root_hash.swap(ctx.curr_hash_state); - } + std::lock_guard lock(cons::ctx.state_syncing_mutex); + hasher::B2H root_hash = hasher::B2H_empty; + int ret = statefs::compute_hash_tree(root_hash); + std::string str_root_hash(reinterpret_cast(&root_hash), hasher::HASH_SIZE); + str_root_hash.swap(ctx.curr_hash_state); } - if (ctx.stage == 1 && majority_state != ctx.curr_hash_state) + // We do not initiate state sync in stage 3 because the majority state is likely to get changed soon. + if (ctx.stage < 3 && majority_state != ctx.curr_hash_state) { if (ctx.state_sync_lcl != ctx.lcl) { @@ -916,7 +859,6 @@ void check_state(vote_counter &votes) ctx.is_state_syncing = false; ctx.state_sync_lcl.clear(); - conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); } } diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index 53276572..2d9a3c47 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -70,7 +70,6 @@ struct consensus_context util::rollover_hashset recent_userinput_hashes; uint8_t stage = 0; - uint64_t novel_proposal_time = 0; uint64_t time_now = 0; std::string lcl; uint64_t led_seq_no = 0; @@ -81,11 +80,14 @@ struct consensus_context //contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE. //this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range. //We will use this to track lcls related logic.- track state, lcl request, response. - std::map cache; - //ledger close time of previous hash + std::map ledger_cache; + std::string last_requested_lcl; + bool is_lcl_syncing = false; + //ledger close time of previous hash uint64_t prev_close_time = 0; - uint16_t reset_time = 0; + uint16_t stage_time = 0; // Time allocated to a consensus stage. + uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage. bool is_state_syncing = false; std::string state_sync_lcl; @@ -99,7 +101,6 @@ struct consensus_context struct vote_counter { - std::map stage; std::map time; std::map lcl; std::map users; @@ -114,6 +115,8 @@ int init(); void consensus(); +bool wait_and_proceed_stage(); + void broadcast_nonunl_proposal(); void verify_and_populate_candidate_user_inputs(); @@ -126,8 +129,6 @@ p2p::proposal create_stage123_proposal(vote_counter &votes); void broadcast_proposal(const p2p::proposal &p); -void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes); - void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); float_t get_stage_threshold(const uint8_t stage); diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index affea34d..c41ec637 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -13,7 +13,6 @@ namespace cons { namespace p2pmsg = fbschema::p2pmsg; -std::string last_requested_lcl; /** * Create and save ledger from the given proposal message. @@ -60,10 +59,10 @@ const std::tuple save_ledger(const p2p::proposal &p write_ledger(file_name, ledger_str.data(), ledger_str.size()); - ledger_cache c; + ledger_cache_entry c; c.lcl = file_name; c.state = proposal.curr_hash_state; - cons::ctx.cache.emplace(led_seq_no, std::move(c)); + cons::ctx.ledger_cache.emplace(led_seq_no, std::move(c)); //Remove old ledgers that exceeds max sequence range. if (led_seq_no > MAX_LEDGER_SEQUENCE) @@ -80,7 +79,7 @@ const std::tuple save_ledger(const p2p::proposal &p */ void remove_old_ledgers(const uint64_t led_seq_no) { - std::map::iterator itr; + std::map::iterator itr; std::string dir_path; @@ -88,8 +87,8 @@ void remove_old_ledgers(const uint64_t led_seq_no) dir_path.append(conf::ctx.hist_dir) .append("/"); - for (itr = cons::ctx.cache.begin(); - itr != cons::ctx.cache.lower_bound(led_seq_no + 1); + for (itr = cons::ctx.ledger_cache.begin(); + itr != cons::ctx.ledger_cache.lower_bound(led_seq_no + 1); itr++) { const std::string file_name = itr->second.lcl; @@ -102,9 +101,9 @@ void remove_old_ledgers(const uint64_t led_seq_no) if (boost::filesystem::exists(file_path)) boost::filesystem::remove(file_path); } - - if (!cons::ctx.cache.empty()) - cons::ctx.cache.erase(cons::ctx.cache.begin(), cons::ctx.cache.lower_bound(led_seq_no + 1)); + + if (!cons::ctx.ledger_cache.empty()) + cons::ctx.ledger_cache.erase(cons::ctx.ledger_cache.begin(), cons::ctx.ledger_cache.lower_bound(led_seq_no + 1)); } /** @@ -187,7 +186,7 @@ const ledger_history load_ledger() { const uint8_t *ledger_buf_ptr = reinterpret_cast(buffer.data()); const fbschema::ledger::Ledger *ledger = fbschema::ledger::GetLedger(ledger_buf_ptr); - ledger_cache c; + ledger_cache_entry c; c.lcl = file_name; c.state = fbschema::flatbuff_bytes_to_sv(ledger->state()); @@ -237,10 +236,9 @@ void send_ledger_history_request(const std::string &minimum_lcl, const std::stri p2pmsg::create_msg_from_history_request(msg.builder(), hr); p2p::send_message_to_random_peer(msg); - last_requested_lcl = required_lcl; + ctx.last_requested_lcl = required_lcl; - LOG_DBG << "Ledger history request sent." - << " lcl:" << required_lcl; + LOG_DBG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15); } /** @@ -261,8 +259,8 @@ bool check_required_lcl_availability(const p2p::history_request &hr) if (req_seq_no > 0) { - const auto itr = cons::ctx.cache.find(req_seq_no); - if (itr == cons::ctx.cache.end()) + const auto itr = cons::ctx.ledger_cache.find(req_seq_no); + if (itr == cons::ctx.ledger_cache.end()) { LOG_DBG << "Required lcl peer asked for is not in our lcl cache."; //either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current @@ -278,7 +276,7 @@ bool check_required_lcl_availability(const p2p::history_request &hr) } else { - return false; //Very rare case: node asking for the genisis lcl. + return false; //Very rare case: node asking for the genisis lcl. } return true; } @@ -301,20 +299,20 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request & min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); //get required lcl sequence number } - const auto itr = cons::ctx.cache.find(min_seq_no); - if (itr != cons::ctx.cache.end()) //requested minimum lcl is not in our lcl history cache + const auto itr = cons::ctx.ledger_cache.find(min_seq_no); + if (itr != cons::ctx.ledger_cache.end()) //requested minimum lcl is not in our lcl history cache { min_seq_no = itr->first; //check whether minimum lcl node ask for is same as this node's. //eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition. if (hr.minimum_lcl != itr->second.lcl) { - LOG_DBG << "Invalid minimum ledger. Recieved min hash: "<< min_lcl_hash << " Node hash: " << itr->second.lcl; + LOG_DBG << "Invalid minimum ledger. Recieved min hash: " << min_lcl_hash << " Node hash: " << itr->second.lcl; history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; return history_response; } } - else if (min_seq_no > cons::ctx.cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. + else if (min_seq_no > cons::ctx.ledger_cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence. { LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: " << min_lcl_hash; history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER; @@ -323,21 +321,21 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request & else { LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl"; - min_seq_no = cons::ctx.cache.begin()->first; + min_seq_no = cons::ctx.ledger_cache.begin()->first; } //LOG_DBG << "history request min seq: " << std::to_string(min_seq_no); //copy current history cache. - std::map led_cache = cons::ctx.cache; + std::map led_cache = cons::ctx.ledger_cache; //filter out cache and get raw files here. led_cache.erase( led_cache.begin(), led_cache.lower_bound(min_seq_no)); - //Get raw content of lcls that going to be send. - for (auto &[seq_no, cache] : led_cache) + //Get raw content of lcls that going to be send. + for (auto &[seq_no, cache] : led_cache) { p2p::history_ledger ledger; ledger.lcl = cache.lcl; @@ -388,7 +386,7 @@ p2p::peer_outbound_message send_ledger_history(const p2p::history_request &hr) void handle_ledger_history_response(const p2p::history_response &hr) { //check response object contains - if (last_requested_lcl.empty()) + if (ctx.last_requested_lcl.empty()) { LOG_DBG << "Peer sent us a history response but we never asked for one!"; return; @@ -399,7 +397,8 @@ void handle_ledger_history_response(const p2p::history_response &hr) // This means we are in a fork ledger.Remove/rollback current ledger. // Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger . remove_ledger(ctx.lcl); - cons::ctx.cache.erase(ctx.cache.rbegin()->first); + cons::ctx.ledger_cache.erase(ctx.ledger_cache.rbegin()->first); + LOG_DBG << "Invalid min ledger. Removed last ledger."; } else { @@ -407,7 +406,7 @@ void handle_ledger_history_response(const p2p::history_response &hr) bool have_requested_lcl = false; for (auto &[seq_no, ledger] : hr.hist_ledgers) { - if (last_requested_lcl == ledger.lcl) + if (ctx.last_requested_lcl == ledger.lcl) { have_requested_lcl = true; break; @@ -453,32 +452,34 @@ void handle_ledger_history_response(const p2p::history_response &hr) //Save recieved lcl in file system and update lcl history cache for (auto &[seq_no, ledger] : hr.hist_ledgers) { - auto prev_dup_itr = cons::ctx.cache.find(seq_no); - if (prev_dup_itr != cons::ctx.cache.end()) + auto prev_dup_itr = cons::ctx.ledger_cache.find(seq_no); + if (prev_dup_itr != cons::ctx.ledger_cache.end()) { remove_ledger(prev_dup_itr->second.lcl); - cons::ctx.cache.erase(prev_dup_itr); + cons::ctx.ledger_cache.erase(prev_dup_itr); } write_ledger(ledger.lcl, reinterpret_cast(&ledger.raw_ledger[0]), ledger.raw_ledger.size()); - ledger_cache l; + ledger_cache_entry l; l.lcl = ledger.lcl; l.state = ledger.state; - cons::ctx.cache.emplace(seq_no, std::move(l)); + cons::ctx.ledger_cache.emplace(seq_no, std::move(l)); } - last_requested_lcl = ""; + ctx.last_requested_lcl = ""; - if (cons::ctx.cache.empty()) + if (cons::ctx.ledger_cache.empty()) { cons::ctx.led_seq_no = 0; cons::ctx.lcl = GENESIS_LEDGER; } else { - const auto latest_lcl_itr = cons::ctx.cache.rbegin(); + const auto latest_lcl_itr = cons::ctx.ledger_cache.rbegin(); cons::ctx.lcl = latest_lcl_itr->second.lcl; cons::ctx.led_seq_no = latest_lcl_itr->first; } + + LOG_DBG << "Finished lcl sync. New lcl: " << cons::ctx.lcl.substr(0, 15); } } // namespace cons \ No newline at end of file diff --git a/src/cons/ledger_handler.hpp b/src/cons/ledger_handler.hpp index 8d79b22b..170e8158 100644 --- a/src/cons/ledger_handler.hpp +++ b/src/cons/ledger_handler.hpp @@ -10,24 +10,19 @@ namespace cons //max ledger count constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; constexpr const char* GENESIS_LEDGER = "0-genesis"; -struct ledger_cache +struct ledger_cache_entry { std::string lcl; std::string state; }; -extern ledger_cache cache; - struct ledger_history { std::string lcl; uint64_t led_seq_no; - std::map cache; + std::map cache; }; - -extern std::string last_requested_lcl; - const std::tuple save_ledger(const p2p::proposal &proposal); void remove_old_ledgers(const uint64_t led_seq_no); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 4eb2ea8a..e3bf5c20 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -77,8 +77,10 @@ void peer_connection_watchdog() /** * Broadcasts the given message to all currently connected outbound peers. + * @param msg Peer outbound message to be broadcasted. + * @param send_to_self Whether to also send the message to self (this node). */ -void broadcast_message(const peer_outbound_message msg, bool send_to_self) +void broadcast_message(const peer_outbound_message msg, const bool send_to_self) { if (ctx.peer_connections.size() == 0) { @@ -89,6 +91,7 @@ void broadcast_message(const peer_outbound_message msg, bool send_to_self) //Broadcast while locking the peer_connections. std::lock_guard lock(ctx.peer_connections_mutex); + for (const auto &[k, session] : ctx.peer_connections) { if (!send_to_self && session->is_self) @@ -98,15 +101,33 @@ void broadcast_message(const peer_outbound_message msg, bool send_to_self) } /** - * Send the given message to a random peer from currently connected outbound peers. - * @param msg peer outbound message to be sent to peer + * Sends the given message to self (this node). + * @param msg Peer outbound message to be sent to self. */ -void send_message_to_random_peer(peer_outbound_message msg) +void send_message_to_self(const peer_outbound_message msg) { //Send while locking the peer_connections. std::lock_guard lock(p2p::ctx.peer_connections_mutex); - size_t connected_peers = ctx.peer_connections.size(); + // Find the peer session connected to self. + const auto peer_itr = ctx.peer_connections.find(conf::cfg.self_peer_id); + if (peer_itr != ctx.peer_connections.end()) + { + const auto session = peer_itr->second; + session->send(msg); + } +} + +/** + * Sends the given message to a random peer (except self). + * @param msg Peer outbound message to be sent to peer. + */ +void send_message_to_random_peer(const peer_outbound_message msg) +{ + //Send while locking the peer_connections. + std::lock_guard lock(p2p::ctx.peer_connections_mutex); + + const size_t connected_peers = ctx.peer_connections.size(); if (connected_peers == 0) { LOG_DBG << "No peers to send (not even self)."; @@ -121,13 +142,13 @@ void send_message_to_random_peer(peer_outbound_message msg) while (true) { // Initialize random number generator with current timestamp. - int random_peer_index = (rand() % connected_peers); // select a random peer index. + const int random_peer_index = (rand() % connected_peers); // select a random peer index. auto it = ctx.peer_connections.begin(); std::advance(it, random_peer_index); //move iterator to point to random selected peer. - //send message to selecte peer. - auto session = it->second; - if (!session->is_self) + //send message to selected peer. + const auto session = it->second; + if (!session->is_self) // Exclude self peer. { session->send(msg); break; diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index df50f882..30d92adf 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -141,11 +141,11 @@ void start_peer_connections(); void peer_connection_watchdog(); -void broadcast_message(const peer_outbound_message msg, bool send_to_self); +void broadcast_message(const peer_outbound_message msg, const bool send_to_self); -void send_message_to_random_peer(peer_outbound_message msg); +void send_message_to_self(const peer_outbound_message msg); -void send_message_to_peer(std::string peer_session_id, peer_outbound_message msg); +void send_message_to_random_peer(const peer_outbound_message msg); } // namespace p2p diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 16f44c86..c51fbb0b 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -136,8 +136,8 @@ void peer_session_handler::on_message(sock::socket_sessionmessage_as_History_Request_Message()); //first check node has the required lcl available. -> if so send lcl history accordingly. - bool req_lcl_avail = cons::check_required_lcl_availability(hr); - if (req_lcl_avail > 0) + const bool req_lcl_avail = cons::check_required_lcl_availability(hr); + if (req_lcl_avail) { p2p::peer_outbound_message hr_msg = cons::send_ledger_history(hr); session->send(hr_msg); diff --git a/src/proc.cpp b/src/proc.cpp index 79dc2f89..7c32889c 100644 --- a/src/proc.cpp +++ b/src/proc.cpp @@ -118,7 +118,7 @@ int exec_contract(const contract_exec_args &args) execv_args[len - 1] = NULL; int ret = execv(execv_args[0], execv_args); - LOG_ERR << "Contract process execv failed: " << ret; + LOG_ERR << errno << ": Contract process execv failed."; exit(1); } else @@ -187,7 +187,7 @@ int start_state_monitor() execv_args[3] = NULL; int ret = execv(execv_args[0], execv_args); - LOG_ERR << "State monitor execv failed: " << ret; + LOG_ERR << errno << ": State monitor execv failed."; exit(1); } else if (pid < 0) diff --git a/src/sock/socket_session.cpp b/src/sock/socket_session.cpp index 5a85441a..f4e7eedd 100644 --- a/src/sock/socket_session.cpp +++ b/src/sock/socket_session.cpp @@ -118,7 +118,7 @@ void socket_session::run(const std::string &&address, const std::string &&por this->uniqueid.append(address).append(":").append(port); // This indicates the connection is a self connection (node connects to the same node through server port) - if (address == "0.0.0.0") + if (this->uniqueid == conf::cfg.self_peer_id) this->is_self = true; // Set the timeout.