From 7bf0475b6fefdfec7fb9391f95b96009e5d658b2 Mon Sep 17 00:00:00 2001 From: Savinda Senevirathne Date: Tue, 8 Dec 2020 15:05:38 +0530 Subject: [PATCH] Subject unl list to consensus. (#186) * Unl hash in consensus proposals. * Unl syncing and sync request serving. * Adding unl hash to the ledger block. --- src/conf.cpp | 4 - src/consensus.cpp | 66 +++++- src/consensus.hpp | 9 +- src/main.cpp | 21 +- src/msg/fbuf/ledger_helpers.cpp | 1 + src/msg/fbuf/ledger_schema.fbs | 1 + src/msg/fbuf/ledger_schema_generated.h | 25 +- src/msg/fbuf/p2pmsg_content.fbs | 14 +- src/msg/fbuf/p2pmsg_content_generated.h | 197 +++++++++++++++- src/msg/fbuf/p2pmsg_helpers.cpp | 75 ++++++ src/msg/fbuf/p2pmsg_helpers.hpp | 8 + src/p2p/p2p.hpp | 12 + src/p2p/peer_session_handler.cpp | 36 +++ src/unl.cpp | 291 ++++++++++++++++++++++-- src/unl.hpp | 45 +++- 15 files changed, 740 insertions(+), 65 deletions(-) diff --git a/src/conf.cpp b/src/conf.cpp index ef68d3f1..879daab3 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -2,7 +2,6 @@ #include "conf.hpp" #include "crypto.hpp" #include "util/util.hpp" -#include "unl.hpp" namespace conf { @@ -489,9 +488,6 @@ namespace conf return -1; } - // Populate unl. - unl::init(cfg.unl); - // Populate runtime contract execution args. if (!cfg.binargs.empty()) util::split_string(cfg.runtime_binexec_args, cfg.binargs, " "); diff --git a/src/consensus.cpp b/src/consensus.cpp index 025fb547..fd6e6e37 100644 --- a/src/consensus.cpp +++ b/src/consensus.cpp @@ -120,6 +120,7 @@ namespace consensus std::string lcl = ledger::ctx.get_lcl(); const uint64_t lcl_seq_no = ledger::ctx.get_seq_no(); const size_t unl_count = unl::count(); + std::string unl_hash = unl::get_hash(); hpfs::h32 state = state_common::ctx.get_state(); vote_counter votes; @@ -130,24 +131,24 @@ namespace consensus if (verify_and_populate_candidate_user_inputs(lcl_seq_no) == -1) return -1; - const p2p::proposal new_round_prop = create_stage0_proposal(lcl, state); + const p2p::proposal new_round_prop = create_stage0_proposal(lcl, state, unl_hash); broadcast_proposal(new_round_prop); } else if (ctx.stage == 1) { - if (is_in_sync(lcl, unl_count, votes)) + if (is_in_sync(lcl, unl_hash, unl_count, votes)) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state); + const p2p::proposal p = create_stage123_proposal(STAGE1_THRESHOLD, votes, lcl, unl_count, state, unl_hash); broadcast_proposal(p); } } else if (ctx.stage == 2) { - if (is_in_sync(lcl, unl_count, votes)) + if (is_in_sync(lcl, unl_hash, unl_count, votes)) { // If we are in sync, vote and broadcast the winning votes to next stage. - const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state); + const p2p::proposal p = create_stage123_proposal(STAGE2_THRESHOLD, votes, lcl, unl_count, state, unl_hash); broadcast_proposal(p); } @@ -158,14 +159,14 @@ namespace consensus } else if (ctx.stage == 3) { - if (is_in_sync(lcl, unl_count, votes)) + if (is_in_sync(lcl, unl_hash, unl_count, votes)) { // If we are in sync, vote and get the final winning votes. // This is the consensus proposal which makes it into the ledger and contract execution - const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state); + const p2p::proposal p = create_stage123_proposal(STAGE3_THRESHOLD, votes, lcl, unl_count, state, unl_hash); // Update the unl with the unl changeset that subjected to the consensus. - unl::update(p.unl_changeset.additions, p.unl_changeset.removals); + unl::apply_changeset(p.unl_changeset.additions, p.unl_changeset.removals); // Update the ledger and execute the contract using the consensus proposal. if (update_ledger_and_execute_contract(p, lcl, state) == -1) @@ -178,7 +179,7 @@ namespace consensus return 0; } - bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes) + bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes) { // Check if we're ahead/behind of consensus lcl. bool is_lcl_desync = false; @@ -206,8 +207,19 @@ namespace consensus state_sync::set_target(majority_state); } + // Check unl hash with the majority unl hash. + bool is_unl_desync = false; + std::string majority_unl; + check_unl_votes(is_unl_desync, majority_unl, votes, unl_hash); + // Start unl sync if we are out-of-sync with majority unl. + if (is_unl_desync) + { + conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER); + unl::set_sync_target(majority_unl); + } + // Proceed further only if both lcl and state are in sync with majority. - if (!is_lcl_desync && !is_state_desync) + if (!is_lcl_desync && !is_state_desync && !is_unl_desync) { conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER); return true; @@ -499,7 +511,7 @@ namespace consensus return 0; } - p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state) + p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state, std::string_view unl_hash) { // This is the proposal that stage 0 votes on. // We report our own values in stage 0. @@ -508,6 +520,7 @@ namespace consensus stg_prop.stage = 0; stg_prop.lcl = lcl; stg_prop.state = state; + stg_prop.unl_hash = unl_hash; crypto::random_bytes(stg_prop.nonce, ROUND_NONCE_SIZE); // Populate the proposal with set of candidate user pubkeys. @@ -527,7 +540,7 @@ namespace consensus return stg_prop; } - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state) + p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash) { // The proposal to be emited at the end of this stage. p2p::proposal stg_prop; @@ -539,6 +552,8 @@ namespace consensus // our peers or we will halt depending on level of consensus on the sides of the fork. stg_prop.lcl = lcl; + stg_prop.unl_hash = unl_hash; + // Vote for rest of the proposal fields by looking at candidate proposals. for (const auto &[pubkey, cp] : ctx.candidate_proposals) { @@ -751,6 +766,33 @@ namespace consensus is_desync = (state_common::ctx.get_state() != majority_state); } + /** + * Check unl against the winning and canonical unl + * @param is_desync Is unl is in desync. + * @param majority_unl The majority unl. + * @param votes The voting table. + * @param unl_hash Hash of the current unl list. + */ + void check_unl_votes(bool &is_desync, std::string &majority_unl, vote_counter &votes, std::string_view unl_hash) + { + for (const auto &[pubkey, cp] : ctx.candidate_proposals) + { + increment(votes.unl, cp.unl_hash); + } + + uint32_t winning_votes = 0; + for (const auto [unl, votes] : votes.unl) + { + if (votes > winning_votes) + { + winning_votes = votes; + majority_unl = unl; + } + } + + is_desync = (unl_hash != majority_unl); + } + /** * Update the ledger and execute the contract after consensus. * @param cons_prop The proposal that reached consensus. diff --git a/src/consensus.hpp b/src/consensus.hpp index fd67d655..efeb9da3 100644 --- a/src/consensus.hpp +++ b/src/consensus.hpp @@ -88,6 +88,7 @@ namespace consensus std::map inputs; std::map outputs; std::map state; + std::map unl; std::map unl_additions; std::map unl_removals; }; @@ -102,7 +103,7 @@ namespace consensus int consensus(); - bool is_in_sync(std::string_view lcl, const size_t unl_count, vote_counter &votes); + bool is_in_sync(std::string_view lcl, std::string_view unl_hash, const size_t unl_count, vote_counter &votes); void revise_candidate_proposals(); @@ -114,9 +115,9 @@ namespace consensus int verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no); - p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state); + p2p::proposal create_stage0_proposal(std::string_view lcl, hpfs::h32 state, std::string_view unl_hash); - p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state); + p2p::proposal create_stage123_proposal(const float_t vote_threshold, vote_counter &votes, std::string_view lcl, const size_t unl_count, const hpfs::h32 state, std::string_view unl_hash); void broadcast_proposal(const p2p::proposal &p); @@ -124,6 +125,8 @@ namespace consensus void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes); + void check_unl_votes(bool &is_desync, std::string &majority_unl, vote_counter &votes, std::string_view unl_hash); + void timewait_stage(const bool reset, const uint64_t time); uint64_t get_ledger_time_resolution(const uint64_t time); diff --git a/src/main.cpp b/src/main.cpp index 323800c6..8723c474 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -17,6 +17,7 @@ #include "state/state_common.hpp" #include "state/state_sync.hpp" #include "state/state_serve.hpp" +#include "unl.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -76,6 +77,7 @@ void deinit() state_sync::deinit(); state_serve::deinit(); sc::deinit(); + unl::deinit(); ledger::deinit(); } @@ -197,15 +199,16 @@ int main(int argc, char **argv) << (conf::cfg.operating_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer"); LOG_INFO << "Public key: " << conf::cfg.pubkeyhex.substr(2); // Public key without 'ed' prefix. - if (ledger::init() || - sc::init() || - state_common::init() != 0 || - state_serve::init() != 0 || - state_sync::init() != 0 || - consensus::init() != 0 || - read_req::init() != 0 || - p2p::init() != 0 || - usr::init() != 0) + if (ledger::init() == -1 || + unl::init() == -1 || + sc::init() == -1 || + state_common::init() == -1 || + state_serve::init() == -1 || + state_sync::init() == -1 || + consensus::init() == -1 || + read_req::init() == -1 || + p2p::init() == -1 || + usr::init() == -1) { deinit(); return -1; diff --git a/src/msg/fbuf/ledger_helpers.cpp b/src/msg/fbuf/ledger_helpers.cpp index ce37c34b..d22332de 100644 --- a/src/msg/fbuf/ledger_helpers.cpp +++ b/src/msg/fbuf/ledger_helpers.cpp @@ -25,6 +25,7 @@ namespace msg::fbuf::ledger p.time, sv_to_flatbuff_bytes(builder, p.lcl), hash_to_flatbuff_bytes(builder, p.state), + sv_to_flatbuff_bytes(builder, p.unl_hash), stringlist_to_flatbuf_bytearrayvector(builder, p.users), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs), diff --git a/src/msg/fbuf/ledger_schema.fbs b/src/msg/fbuf/ledger_schema.fbs index b1100a4e..5031be0d 100644 --- a/src/msg/fbuf/ledger_schema.fbs +++ b/src/msg/fbuf/ledger_schema.fbs @@ -7,6 +7,7 @@ table LedgerBlock { time:uint64; lcl:[ubyte]; state:[ubyte]; + unl:[ubyte]; users: [ByteArray]; inputs: [ByteArray]; outputs: [ByteArray]; diff --git a/src/msg/fbuf/ledger_schema_generated.h b/src/msg/fbuf/ledger_schema_generated.h index ed5a8076..0751b615 100644 --- a/src/msg/fbuf/ledger_schema_generated.h +++ b/src/msg/fbuf/ledger_schema_generated.h @@ -25,10 +25,11 @@ struct LedgerBlock FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_TIME = 6, VT_LCL = 8, VT_STATE = 10, - VT_USERS = 12, - VT_INPUTS = 14, - VT_OUTPUTS = 16, - VT_UNL_CHANGESET = 18 + VT_UNL = 12, + VT_USERS = 14, + VT_INPUTS = 16, + VT_OUTPUTS = 18, + VT_UNL_CHANGESET = 20 }; uint64_t seq_no() const { return GetField(VT_SEQ_NO, 0); @@ -54,6 +55,12 @@ struct LedgerBlock FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_state() { return GetPointer *>(VT_STATE); } + const flatbuffers::Vector *unl() const { + return GetPointer *>(VT_UNL); + } + flatbuffers::Vector *mutable_unl() { + return GetPointer *>(VT_UNL); + } const flatbuffers::Vector> *users() const { return GetPointer> *>(VT_USERS); } @@ -86,6 +93,8 @@ struct LedgerBlock FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { verifier.VerifyVector(lcl()) && VerifyOffset(verifier, VT_STATE) && verifier.VerifyVector(state()) && + VerifyOffset(verifier, VT_UNL) && + verifier.VerifyVector(unl()) && VerifyOffset(verifier, VT_USERS) && verifier.VerifyVector(users()) && verifier.VerifyVectorOfTables(users()) && @@ -117,6 +126,9 @@ struct LedgerBlockBuilder { void add_state(flatbuffers::Offset> state) { fbb_.AddOffset(LedgerBlock::VT_STATE, state); } + void add_unl(flatbuffers::Offset> unl) { + fbb_.AddOffset(LedgerBlock::VT_UNL, unl); + } void add_users(flatbuffers::Offset>> users) { fbb_.AddOffset(LedgerBlock::VT_USERS, users); } @@ -146,6 +158,7 @@ inline flatbuffers::Offset CreateLedgerBlock( uint64_t time = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset> state = 0, + flatbuffers::Offset> unl = 0, flatbuffers::Offset>> users = 0, flatbuffers::Offset>> inputs = 0, flatbuffers::Offset>> outputs = 0, @@ -157,6 +170,7 @@ inline flatbuffers::Offset CreateLedgerBlock( builder_.add_outputs(outputs); builder_.add_inputs(inputs); builder_.add_users(users); + builder_.add_unl(unl); builder_.add_state(state); builder_.add_lcl(lcl); return builder_.Finish(); @@ -168,12 +182,14 @@ inline flatbuffers::Offset CreateLedgerBlockDirect( uint64_t time = 0, const std::vector *lcl = nullptr, const std::vector *state = nullptr, + const std::vector *unl = nullptr, const std::vector> *users = nullptr, const std::vector> *inputs = nullptr, const std::vector> *outputs = nullptr, flatbuffers::Offset unl_changeset = 0) { auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto state__ = state ? _fbb.CreateVector(*state) : 0; + auto unl__ = unl ? _fbb.CreateVector(*unl) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; auto outputs__ = outputs ? _fbb.CreateVector>(*outputs) : 0; @@ -183,6 +199,7 @@ inline flatbuffers::Offset CreateLedgerBlockDirect( time, lcl__, state__, + unl__, users__, inputs__, outputs__, diff --git a/src/msg/fbuf/p2pmsg_content.fbs b/src/msg/fbuf/p2pmsg_content.fbs index 2a43aa8a..e6efa0dd 100644 --- a/src/msg/fbuf/p2pmsg_content.fbs +++ b/src/msg/fbuf/p2pmsg_content.fbs @@ -38,7 +38,9 @@ union Message { Peer_Requirement_Announcement_Message, Peer_List_Request_Message, Peer_List_Response_Message, - Available_Capacity_Announcement_Message + Available_Capacity_Announcement_Message, + Unl_Request_Message, + Unl_Response_Message } //message content type table Content { @@ -62,6 +64,7 @@ table Proposal_Message { //Proposal type message schema hash_inputs:[ByteArray]; hash_outputs:[ByteArray]; state: [ubyte]; + unl_hash: [ubyte]; // Hash of the current unl list. unl_changeset: Unl_Changeset; } @@ -73,6 +76,15 @@ table History_Request_Message { //Ledger History request type message schema required_lcl:[ubyte]; } +table Unl_Request_Message { + required_unl: [ubyte]; +} + +table Unl_Response_Message { + requester_unl:[ubyte]; // Unl hash of the sender. + unl_list: [ByteArray]; +} + enum Ledger_Response_Error : ubyte { None = 0, diff --git a/src/msg/fbuf/p2pmsg_content_generated.h b/src/msg/fbuf/p2pmsg_content_generated.h index 1a4c9821..fce65ac9 100644 --- a/src/msg/fbuf/p2pmsg_content_generated.h +++ b/src/msg/fbuf/p2pmsg_content_generated.h @@ -42,6 +42,12 @@ struct Npl_MessageBuilder; struct History_Request_Message; struct History_Request_MessageBuilder; +struct Unl_Request_Message; +struct Unl_Request_MessageBuilder; + +struct Unl_Response_Message; +struct Unl_Response_MessageBuilder; + struct History_Response_Message; struct History_Response_MessageBuilder; @@ -99,11 +105,13 @@ enum Message { Message_Peer_List_Request_Message = 11, Message_Peer_List_Response_Message = 12, Message_Available_Capacity_Announcement_Message = 13, + Message_Unl_Request_Message = 14, + Message_Unl_Response_Message = 15, Message_MIN = Message_NONE, - Message_MAX = Message_Available_Capacity_Announcement_Message + Message_MAX = Message_Unl_Response_Message }; -inline const Message (&EnumValuesMessage())[14] { +inline const Message (&EnumValuesMessage())[16] { static const Message values[] = { Message_NONE, Message_Peer_Challenge_Response_Message, @@ -118,13 +126,15 @@ inline const Message (&EnumValuesMessage())[14] { Message_Peer_Requirement_Announcement_Message, Message_Peer_List_Request_Message, Message_Peer_List_Response_Message, - Message_Available_Capacity_Announcement_Message + Message_Available_Capacity_Announcement_Message, + Message_Unl_Request_Message, + Message_Unl_Response_Message }; return values; } inline const char * const *EnumNamesMessage() { - static const char * const names[15] = { + static const char * const names[17] = { "NONE", "Peer_Challenge_Response_Message", "Peer_Challenge_Message", @@ -139,13 +149,15 @@ inline const char * const *EnumNamesMessage() { "Peer_List_Request_Message", "Peer_List_Response_Message", "Available_Capacity_Announcement_Message", + "Unl_Request_Message", + "Unl_Response_Message", nullptr }; return names; } inline const char *EnumNameMessage(Message e) { - if (flatbuffers::IsOutRange(e, Message_NONE, Message_Available_Capacity_Announcement_Message)) return ""; + if (flatbuffers::IsOutRange(e, Message_NONE, Message_Unl_Response_Message)) return ""; const size_t index = static_cast(e); return EnumNamesMessage()[index]; } @@ -206,6 +218,14 @@ template<> struct MessageTraits struct MessageTraits { + static const Message enum_value = Message_Unl_Request_Message; +}; + +template<> struct MessageTraits { + static const Message enum_value = Message_Unl_Response_Message; +}; + bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); @@ -649,6 +669,12 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::Available_Capacity_Announcement_Message *message_as_Available_Capacity_Announcement_Message() const { return message_type() == msg::fbuf::p2pmsg::Message_Available_Capacity_Announcement_Message ? static_cast(message()) : nullptr; } + const msg::fbuf::p2pmsg::Unl_Request_Message *message_as_Unl_Request_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Unl_Request_Message ? static_cast(message()) : nullptr; + } + const msg::fbuf::p2pmsg::Unl_Response_Message *message_as_Unl_Response_Message() const { + return message_type() == msg::fbuf::p2pmsg::Message_Unl_Response_Message ? static_cast(message()) : nullptr; + } void *mutable_message() { return GetPointer(VT_MESSAGE); } @@ -713,6 +739,14 @@ template<> inline const msg::fbuf::p2pmsg::Available_Capacity_Announcement_Messa return message_as_Available_Capacity_Announcement_Message(); } +template<> inline const msg::fbuf::p2pmsg::Unl_Request_Message *Content::message_as() const { + return message_as_Unl_Request_Message(); +} + +template<> inline const msg::fbuf::p2pmsg::Unl_Response_Message *Content::message_as() const { + return message_as_Unl_Response_Message(); +} + struct ContentBuilder { typedef Content Table; flatbuffers::FlatBufferBuilder &fbb_; @@ -882,7 +916,8 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_HASH_INPUTS = 12, VT_HASH_OUTPUTS = 14, VT_STATE = 16, - VT_UNL_CHANGESET = 18 + VT_UNL_HASH = 18, + VT_UNL_CHANGESET = 20 }; uint8_t stage() const { return GetField(VT_STAGE, 0); @@ -926,6 +961,12 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector *mutable_state() { return GetPointer *>(VT_STATE); } + const flatbuffers::Vector *unl_hash() const { + return GetPointer *>(VT_UNL_HASH); + } + flatbuffers::Vector *mutable_unl_hash() { + return GetPointer *>(VT_UNL_HASH); + } const msg::fbuf::p2pmsg::Unl_Changeset *unl_changeset() const { return GetPointer(VT_UNL_CHANGESET); } @@ -949,6 +990,8 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { verifier.VerifyVectorOfTables(hash_outputs()) && VerifyOffset(verifier, VT_STATE) && verifier.VerifyVector(state()) && + VerifyOffset(verifier, VT_UNL_HASH) && + verifier.VerifyVector(unl_hash()) && VerifyOffset(verifier, VT_UNL_CHANGESET) && verifier.VerifyTable(unl_changeset()) && verifier.EndTable(); @@ -980,6 +1023,9 @@ struct Proposal_MessageBuilder { void add_state(flatbuffers::Offset> state) { fbb_.AddOffset(Proposal_Message::VT_STATE, state); } + void add_unl_hash(flatbuffers::Offset> unl_hash) { + fbb_.AddOffset(Proposal_Message::VT_UNL_HASH, unl_hash); + } void add_unl_changeset(flatbuffers::Offset unl_changeset) { fbb_.AddOffset(Proposal_Message::VT_UNL_CHANGESET, unl_changeset); } @@ -1003,10 +1049,12 @@ inline flatbuffers::Offset CreateProposal_Message( flatbuffers::Offset>> hash_inputs = 0, flatbuffers::Offset>> hash_outputs = 0, flatbuffers::Offset> state = 0, + flatbuffers::Offset> unl_hash = 0, flatbuffers::Offset unl_changeset = 0) { Proposal_MessageBuilder builder_(_fbb); builder_.add_time(time); builder_.add_unl_changeset(unl_changeset); + builder_.add_unl_hash(unl_hash); builder_.add_state(state); builder_.add_hash_outputs(hash_outputs); builder_.add_hash_inputs(hash_inputs); @@ -1025,12 +1073,14 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( const std::vector> *hash_inputs = nullptr, const std::vector> *hash_outputs = nullptr, const std::vector *state = nullptr, + const std::vector *unl_hash = nullptr, flatbuffers::Offset unl_changeset = 0) { auto nonce__ = nonce ? _fbb.CreateVector(*nonce) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; auto hash_inputs__ = hash_inputs ? _fbb.CreateVector>(*hash_inputs) : 0; auto hash_outputs__ = hash_outputs ? _fbb.CreateVector>(*hash_outputs) : 0; auto state__ = state ? _fbb.CreateVector(*state) : 0; + auto unl_hash__ = unl_hash ? _fbb.CreateVector(*unl_hash) : 0; return msg::fbuf::p2pmsg::CreateProposal_Message( _fbb, stage, @@ -1040,6 +1090,7 @@ inline flatbuffers::Offset CreateProposal_MessageDirect( hash_inputs__, hash_outputs__, state__, + unl_hash__, unl_changeset); } @@ -1151,6 +1202,132 @@ inline flatbuffers::Offset CreateHistory_Request_Messag required_lcl__); } +struct Unl_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Unl_Request_MessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_REQUIRED_UNL = 4 + }; + const flatbuffers::Vector *required_unl() const { + return GetPointer *>(VT_REQUIRED_UNL); + } + flatbuffers::Vector *mutable_required_unl() { + return GetPointer *>(VT_REQUIRED_UNL); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_REQUIRED_UNL) && + verifier.VerifyVector(required_unl()) && + verifier.EndTable(); + } +}; + +struct Unl_Request_MessageBuilder { + typedef Unl_Request_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_required_unl(flatbuffers::Offset> required_unl) { + fbb_.AddOffset(Unl_Request_Message::VT_REQUIRED_UNL, required_unl); + } + explicit Unl_Request_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateUnl_Request_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> required_unl = 0) { + Unl_Request_MessageBuilder builder_(_fbb); + builder_.add_required_unl(required_unl); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateUnl_Request_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *required_unl = nullptr) { + auto required_unl__ = required_unl ? _fbb.CreateVector(*required_unl) : 0; + return msg::fbuf::p2pmsg::CreateUnl_Request_Message( + _fbb, + required_unl__); +} + +struct Unl_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef Unl_Response_MessageBuilder Builder; + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_REQUESTER_UNL = 4, + VT_UNL_LIST = 6 + }; + const flatbuffers::Vector *requester_unl() const { + return GetPointer *>(VT_REQUESTER_UNL); + } + flatbuffers::Vector *mutable_requester_unl() { + return GetPointer *>(VT_REQUESTER_UNL); + } + const flatbuffers::Vector> *unl_list() const { + return GetPointer> *>(VT_UNL_LIST); + } + flatbuffers::Vector> *mutable_unl_list() { + return GetPointer> *>(VT_UNL_LIST); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_REQUESTER_UNL) && + verifier.VerifyVector(requester_unl()) && + VerifyOffset(verifier, VT_UNL_LIST) && + verifier.VerifyVector(unl_list()) && + verifier.VerifyVectorOfTables(unl_list()) && + verifier.EndTable(); + } +}; + +struct Unl_Response_MessageBuilder { + typedef Unl_Response_Message Table; + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_requester_unl(flatbuffers::Offset> requester_unl) { + fbb_.AddOffset(Unl_Response_Message::VT_REQUESTER_UNL, requester_unl); + } + void add_unl_list(flatbuffers::Offset>> unl_list) { + fbb_.AddOffset(Unl_Response_Message::VT_UNL_LIST, unl_list); + } + explicit Unl_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateUnl_Response_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> requester_unl = 0, + flatbuffers::Offset>> unl_list = 0) { + Unl_Response_MessageBuilder builder_(_fbb); + builder_.add_unl_list(unl_list); + builder_.add_requester_unl(requester_unl); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateUnl_Response_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *requester_unl = nullptr, + const std::vector> *unl_list = nullptr) { + auto requester_unl__ = requester_unl ? _fbb.CreateVector(*requester_unl) : 0; + auto unl_list__ = unl_list ? _fbb.CreateVector>(*unl_list) : 0; + return msg::fbuf::p2pmsg::CreateUnl_Response_Message( + _fbb, + requester_unl__, + unl_list__); +} + struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef History_Response_MessageBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { @@ -2209,6 +2386,14 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case Message_Unl_Request_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case Message_Unl_Response_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return true; } } diff --git a/src/msg/fbuf/p2pmsg_helpers.cpp b/src/msg/fbuf/p2pmsg_helpers.cpp index 1bf738e9..30141a95 100644 --- a/src/msg/fbuf/p2pmsg_helpers.cpp +++ b/src/msg/fbuf/p2pmsg_helpers.cpp @@ -230,6 +230,7 @@ namespace msg::fbuf::p2pmsg p.nonce = flatbuff_bytes_to_sv(msg.nonce()); p.stage = msg.stage(); p.lcl = flatbuff_bytes_to_sv(lcl); + p.unl_hash = flatbuff_bytes_to_sv(msg.unl_hash()); p.state = flatbuff_bytes_to_sv(msg.state()); const auto unl_changeset = msg.unl_changeset(); @@ -388,6 +389,7 @@ namespace msg::fbuf::p2pmsg stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs), hash_to_flatbuff_bytes(builder, p.state), + sv_to_flatbuff_bytes(builder, p.unl_hash), unl_changeset); const flatbuffers::Offset message = CreateContent(builder, Message_Proposal_Message, proposal.Union()); @@ -683,6 +685,79 @@ namespace msg::fbuf::p2pmsg create_containermsg_from_content(container_builder, builder, lcl, false); } + /** + * Create unl request message from the given unl sync request struct. + * @param container_builder Flatbuffer builder for the container message. + * @param unl_sync_message The Unl sync request struct to be placed in the container message. + */ + void create_msg_from_unl_sync_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::unl_sync_request &unl_sync_message) + { + flatbuffers::FlatBufferBuilder builder(1024); + + flatbuffers::Offset unl_req_msg = + CreateUnl_Request_Message( + builder, + sv_to_flatbuff_bytes(builder, unl_sync_message.required_unl)); + + flatbuffers::Offset message = CreateContent(builder, Message_Unl_Request_Message, unl_req_msg.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to place it inside a container message. + create_containermsg_from_content(container_builder, builder, {}, false); + } + + /** + * Creates a unl sync request struct from the given unl request message. + * @param unl_req_message Flatbuffer unl request message received from the peer. + * @return A unl sync request struct representing the message. + */ + const p2p::unl_sync_request create_unl_sync_request_from_msg(const Unl_Request_Message &unl_req_message) + { + p2p::unl_sync_request unl_sync; + unl_sync.required_unl = flatbuff_bytes_to_sv(unl_req_message.required_unl()); + + return unl_sync; + } + + /** + * Create unl response message from the given unl sync response struct. + * @param container_builder Flatbuffer builder for the container message. + * @param unl_response The unl sync response struct to be placed in the container message. + */ + void create_msg_from_unl_sync_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::unl_sync_response &unl_response) + { + flatbuffers::FlatBufferBuilder builder(1024); + + flatbuffers::Offset unl_res_msg = + CreateUnl_Response_Message( + builder, + sv_to_flatbuff_bytes(builder, unl_response.requester_unl), + stringlist_to_flatbuf_bytearrayvector(builder, unl_response.unl_list)); + + flatbuffers::Offset message = CreateContent(builder, Message_Unl_Response_Message, unl_res_msg.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to place it inside a container message. + create_containermsg_from_content(container_builder, builder, {}, false); + } + + /** + * Creates a unl sync response struct from the given unl response message. + * @param unl_response_message Flatbuffer unl response message received from the peer. + * @return A unl sync response struct representing the message. + */ + const p2p::unl_sync_response create_unl_sync_response_from_msg(const Unl_Response_Message &unl_response_message) + { + p2p::unl_sync_response unl_sync_response; + + unl_sync_response.requester_unl = flatbuff_bytes_to_sv(unl_response_message.requester_unl()); + unl_sync_response.unl_list = flatbuf_bytearrayvector_to_stringlist(unl_response_message.unl_list()); + + return unl_sync_response; + } + /** * Creates a Flatbuffer container message from the given Content message. * @param container_builder The Flatbuffer builder to which the final container message should be written to. diff --git a/src/msg/fbuf/p2pmsg_helpers.hpp b/src/msg/fbuf/p2pmsg_helpers.hpp index dacc6c7b..a1d0147a 100644 --- a/src/msg/fbuf/p2pmsg_helpers.hpp +++ b/src/msg/fbuf/p2pmsg_helpers.hpp @@ -38,6 +38,10 @@ namespace msg::fbuf::p2pmsg const p2p::state_request create_state_request_from_msg(const State_Request_Message &msg); + const p2p::unl_sync_request create_unl_sync_request_from_msg(const Unl_Request_Message &unl_req_message); + + const p2p::unl_sync_response create_unl_sync_response_from_msg(const Unl_Response_Message &unl_response_message); + const std::vector create_peer_list_response_from_msg(const Peer_List_Response_Message &msg); //---Message creation helpers---// @@ -57,6 +61,10 @@ namespace msg::fbuf::p2pmsg void create_msg_from_state_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::state_request &hr, std::string_view lcl); + void create_msg_from_unl_sync_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::unl_sync_request &unl_sync_message); + + void create_msg_from_unl_sync_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::unl_sync_response &unl_response); + void create_msg_from_fsentry_response( flatbuffers::FlatBufferBuilder &container_builder, const std::string_view path, std::vector &hash_nodes, hpfs::h32 expected_hash, std::string_view lcl); diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 0443de8a..7b22f768 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -51,6 +51,7 @@ namespace p2p uint8_t stage = 0; std::string nonce; // Random nonce that is used to reduce lcl predictability. std::string lcl; + std::string unl_hash; // Hash of the current unl list. hpfs::h32 state; std::set users; std::set hash_inputs; @@ -69,6 +70,17 @@ namespace p2p std::string required_lcl; }; + struct unl_sync_request + { + std::string required_unl; + }; + + struct unl_sync_response + { + std::string requester_unl; // Unl hash of the sender. + std::set unl_list; + }; + struct history_ledger_block { std::string lcl; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index d9df93c8..48ae58dc 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -13,6 +13,7 @@ #include "../ledger.hpp" #include "peer_comm_session.hpp" #include "p2p.hpp" +#include "../unl.hpp" namespace p2pmsg = msg::fbuf::p2pmsg; @@ -256,6 +257,41 @@ namespace p2p } } } + else if (content_message_type == p2pmsg::Message_Unl_Request_Message) //message is a unl request message. + { + // Check the cap and insert request with lock. + std::scoped_lock lock(unl::sync_ctx.list_mutex); + + // If max number of unl requests reached skip the rest. + if (unl::sync_ctx.collected_unl_sync_requests.size() < unl::UNL_REQ_LIST_CAP) + { + const p2p::unl_sync_request unl_request = p2pmsg::create_unl_sync_request_from_msg(*content->message_as_Unl_Request_Message()); + unl::sync_ctx.collected_unl_sync_requests.push_back(std::make_pair(session.pubkey, std::move(unl_request))); + } + else + { + LOG_DEBUG << "Unl request rejected. Maximum unl request count reached. " << session.display_name(); + } + } + else if (content_message_type == p2pmsg::Message_Unl_Response_Message) //message is a unl response message. + { + if (unl::sync_ctx.is_syncing) // Only accept history responses if ledger is syncing. + { + // Check the cap and insert response with lock. + std::scoped_lock lock(unl::sync_ctx.list_mutex); + + // If max number of unl responses reached skip the rest. + if (unl::sync_ctx.collected_unl_sync_responses.size() < unl::UNL_RES_LIST_CAP) + { + const p2p::unl_sync_response unl_response = p2pmsg::create_unl_sync_response_from_msg(*content->message_as_Unl_Response_Message()); + unl::sync_ctx.collected_unl_sync_responses.push_back(std::move(unl_response)); + } + else + { + LOG_DEBUG << "Unl response rejected. Maximum unl response count reached. " << session.display_name(); + } + } + } else { session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1); diff --git a/src/unl.cpp b/src/unl.cpp index 226fe226..b575ee15 100644 --- a/src/unl.cpp +++ b/src/unl.cpp @@ -4,6 +4,7 @@ #include "unl.hpp" #include "crypto.hpp" #include "p2p/p2p.hpp" +#include "./msg/fbuf/p2pmsg_helpers.hpp" /** * Manages the UNL public keys of this node. @@ -13,18 +14,43 @@ namespace unl std::set list; // List of binary pubkeys of UNL. std::string json_list; // Stringified json array of UNL. (To be fed into the contract args) std::shared_mutex unl_mutex; + std::string hash; + sync_context sync_ctx; + bool init_success = false; + constexpr uint16_t SYNCER_IDLE_WAIT = 20; // unl syncer loop sleep time (milliseconds). + + // Max no. of repetitive reqeust resubmissions before abandoning the sync. + constexpr uint16_t ABANDON_THRESHOLD = 10; + + // No. of milliseconds to wait before resubmitting a request. + uint16_t REQUEST_RESUBMIT_TIMEOUT; /** - * Called by conf during startup to populate configured unl list. + * Performs startup activitites related to unl list. + * @return 0 for successful initialization. -1 for failure. */ - void init(const std::set &init_list) + int init() { - if (init_list.empty()) - return; + if (conf::cfg.unl.empty()) + return -1; std::unique_lock lock(unl_mutex); - list = init_list; + list = conf::cfg.unl; update_json_list(); + hash = calculate_hash(list); + sync_ctx.unl_sync_thread = std::thread(unl_syncer_loop); + REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime; + init_success = true; + return 0; + } + + void deinit() + { + if (init_success) + { + sync_ctx.is_shutting_down = true; + sync_ctx.unl_sync_thread.join(); + } } size_t count() @@ -57,36 +83,65 @@ namespace unl } /** - * Called by contract to update unl at runtime. + * Called by consensus to apply unl changesets that reached consensus. */ - void update(const std::set &additions, const std::set &removals) + void apply_changeset(const std::set &additions, const std::set &removals) { if (additions.empty() && removals.empty()) return; - std::unique_lock lock(unl_mutex); - const size_t initial_count = list.size(); + bool is_updated = false; + { + std::unique_lock lock(unl_mutex); + for (const std::string &pubkey : additions) + { + const auto [ele, is_success] = list.emplace(pubkey); + if (is_success) + is_updated = true; + } - for (const std::string &pubkey : additions) - list.emplace(pubkey); + for (const std::string &pubkey : removals) + { + if (list.erase(pubkey)) + is_updated = true; + } - for (const std::string &pubkey : removals) - list.erase(pubkey); - - update_json_list(); - conf::persist_unl_update(list); - - const size_t updated_count = list.size(); - - // Unlock unique lock. A shared lock is applied to the list inside the update unl connection function - // because it use unl::exists function call. - lock.unlock(); + update_json_list(); + conf::persist_unl_update(list); + hash = calculate_hash(list); + LOG_INFO << "UNL updated. Count:" << list.size(); + } // Update the is_unl flag of peer sessions. - if (initial_count != updated_count) + if (is_updated) p2p::update_unl_connections(); + } - LOG_INFO << "UNL updated. Count:" << updated_count; + /** + * Replace the unl list from the received new unl list after verifying it. + * @param new_list The received unl list from a random peer. + * @return Returns -1 on verification failure and 0 on successful replacement. + */ + int verify_and_replace(const std::set &new_list) + { + const std::string new_unl_hash = calculate_hash(new_list); + if (new_unl_hash != sync_ctx.target_unl) + { + LOG_INFO << "Hash verification on received unl list failed."; + return -1; + } + + { + std::unique_lock lock(unl_mutex); + list = new_list; + update_json_list(); + conf::persist_unl_update(list); + hash = new_unl_hash; + } + + // Update the is_unl flag of peer sessions. + p2p::update_unl_connections(); + return 0; } void update_json_list() @@ -111,4 +166,192 @@ namespace unl json_list = os.str(); } + std::string get_hash() + { + std::shared_lock lock(unl_mutex); + return hash; + } + + /** + * Calculate hash of the given set. + * @param unl_list UNL list. + * @return Returns the generated hash of the given list. + */ + std::string calculate_hash(const std::set &new_list) + { + std::vector unl_vector(new_list.begin(), new_list.end()); + return crypto::get_hash(unl_vector); + } + + /** + * Set sync target to the given unl hash and start syncing. + * @param target_unl_hash The majority unl from the consensus. + */ + void set_sync_target(std::string_view target_unl_hash) + { + if (sync_ctx.is_shutting_down) + return; + + std::scoped_lock lock(sync_ctx.target_unl_mutex); + if (sync_ctx.target_unl != target_unl_hash) + { + sync_ctx.is_syncing = true; + sync_ctx.target_unl = target_unl_hash; + sync_ctx.target_requested_on = 0; + sync_ctx.request_submissions = 0; + LOG_INFO << "unl sync: Syncing for target:" << hash_bin2hex(sync_ctx.target_unl).substr(0, 10) << " (current:" << hash_bin2hex(get_hash()).substr(0, 10) << ")"; + } + } + + /** + * Create and send unl request to random node from the unl list. + */ + void send_unl_sync_request() + { + const uint64_t time_now = util::get_epoch_milliseconds(); + // Check whether we need to send any requests or abandon the sync due to timeout. + if ((sync_ctx.target_requested_on == 0) || // Initial request. + (time_now - sync_ctx.target_requested_on) > REQUEST_RESUBMIT_TIMEOUT) // Request resubmission. + { + if (sync_ctx.request_submissions < ABANDON_THRESHOLD) + { + p2p::unl_sync_request unl_sync_message; + unl_sync_message.required_unl = sync_ctx.target_unl; + + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_unl_sync_request(fbuf, unl_sync_message); + + std::string target_pubkey; + p2p::send_message_to_random_peer(fbuf, target_pubkey); + + LOG_DEBUG << "UNL list requested from [" << target_pubkey.substr(0, 10) << "]. Required unl hash:" << hash_bin2hex(sync_ctx.target_unl).substr(0, 10); + sync_ctx.target_requested_on = time_now; + sync_ctx.request_submissions++; + } + else + { + LOG_INFO << "unl sync: Resubmission threshold exceeded. Abandoning sync."; + sync_ctx.clear_target(); + } + } + } + + /** + * Perform unl syncing and serving. + */ + void unl_syncer_loop() + { + util::mask_signal(); + + LOG_INFO << "unl sync: Worker started."; + + while (!sync_ctx.is_shutting_down) + { + // Indicates whether any requests/responses were processed in the previous loop iteration. + bool prev_processed = false; + { + std::scoped_lock lock(sync_ctx.target_unl_mutex); + if (!sync_ctx.target_unl.empty()) + send_unl_sync_request(); + + if (!sync_ctx.target_unl.empty() && check_unl_sync_responses() == 1) + prev_processed = true; + } + + if (check_unl_sync_requests() == 1) + prev_processed = true; + + // Wait a small delay if there were no requests/responses processed during previous iteration. + if (!prev_processed) + util::sleep(SYNCER_IDLE_WAIT); + } + + LOG_INFO << "unl sync: Worker stopped."; + } + + std::string hash_bin2hex(std::string_view hash) + { + // Get hex from binary hash. + std::string unl_hash_hex; + util::bin2hex(unl_hash_hex, + reinterpret_cast(hash.data()), + hash.size()); + return unl_hash_hex; + } + + /** + * Process any unl sync requests received. + * @return Returns 0 if no requests were processed and returns 1 if atleast one request is served. + */ + int check_unl_sync_requests() + { + // Move over the collected sync requests to the local list. + std::list> unl_requests; + { + std::scoped_lock(sync_ctx.list_mutex); + unl_requests.splice(unl_requests.end(), sync_ctx.collected_unl_sync_requests); + } + + const std::string unl_hash = get_hash(); + + std::shared_lock lock(unl_mutex); + for (const auto &[session_id, unl_request] : unl_requests) + { + // First check whether we are at the required unl state. + if (unl_request.required_unl != unl_hash) + continue; + + p2p::unl_sync_response resp; + resp.requester_unl = unl_hash; + resp.unl_list = list; + + flatbuffers::FlatBufferBuilder fbuf(1024); + p2pmsg::create_msg_from_unl_sync_response(fbuf, resp); + + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + + // Find the peer that we should send the unl response to. + std::scoped_lock lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(session_id); + + if (peer_itr != p2p::ctx.peer_connections.end()) + { + comm::comm_session *session = peer_itr->second; + session->send(msg); + } + } + + return unl_requests.empty() ? 0 : 1; + } + + /** + * Check for any unl sync responses received. + * @return Returns 0 if no responses were processed and returns 1 if atleast one response was processed. + */ + int check_unl_sync_responses() + { + // Move over the collected sync response to the local list. + std::list unl_responses; + { + std::scoped_lock(sync_ctx.list_mutex); + unl_responses.splice(unl_responses.end(), sync_ctx.collected_unl_sync_responses); + } + + if (!sync_ctx.target_unl.empty()) + { + // Scan any queued unl sync responses. + // Only process the first successful item which matches with our target unl. + for (const p2p::unl_sync_response &unl : unl_responses) + { + if (unl.requester_unl == sync_ctx.target_unl && verify_and_replace(unl.unl_list) != -1) + { + LOG_INFO << "unl sync: Sync complete. New unl:" << hash_bin2hex(sync_ctx.target_unl).substr(0, 10); + sync_ctx.clear_target(); + } + } + } + return unl_responses.empty() ? 0 : 1; + } + } // namespace unl diff --git a/src/unl.hpp b/src/unl.hpp index edfab66e..cb3f1d47 100644 --- a/src/unl.hpp +++ b/src/unl.hpp @@ -2,19 +2,60 @@ #define _HP_UNL_ #include "pchheader.hpp" +#include "p2p/p2p.hpp" /** * Manages the UNL public keys of this node. */ namespace unl { + struct sync_context + { + // The current target unl that we are syncing towards. + std::string target_unl; + std::mutex target_unl_mutex; + + // Lists holding unl requests and responses collected from incoming p2p messages. + std::list> collected_unl_sync_requests; + std::list collected_unl_sync_responses; + std::mutex list_mutex; + + uint64_t target_requested_on = 0; + uint64_t request_submissions = 0; + + std::thread unl_sync_thread; + std::atomic is_syncing = false; + std::atomic is_shutting_down = false; + + void clear_target() + { + target_unl.clear(); + target_requested_on = 0; + request_submissions = 0; + is_syncing = false; + } + }; + extern sync_context sync_ctx; + constexpr uint16_t UNL_REQ_LIST_CAP = 64; // Maximum unl request count. + constexpr uint16_t UNL_RES_LIST_CAP = 64; // Maximum unl response count. + size_t count(); std::set get(); std::string get_json(); bool exists(const std::string &bin_pubkey); - void init(const std::set &init_list); - void update(const std::set &additions, const std::set &removals); + int init(); + void deinit(); + void apply_changeset(const std::set &additions, const std::set &removals); void update_json_list(); + std::string get_hash(); + std::string calculate_hash(const std::set &new_list); + void set_sync_target(std::string_view target_unl_hash); + void send_unl_sync_request(); + void unl_syncer_loop(); + std::string hash_bin2hex(std::string_view hash); + int verify_and_replace(const std::set &new_list); + int check_unl_sync_requests(); + int check_unl_sync_responses(); } // namespace unl