From 95683035b98cd0fcfe745766025148de829045ce Mon Sep 17 00:00:00 2001 From: Asanka Indrajith Date: Tue, 19 Nov 2019 07:50:33 -0500 Subject: [PATCH] LCL history request and response. (#59) Detect and request missing lcl history from another random node. Sending lcl history response to a asked node. Getting lcl history response and applying it. Delete lcl that exceeds max ledger sequence. --- src/cons/cons.cpp | 54 ++-- src/cons/cons.hpp | 1 + src/cons/ledger_handler.cpp | 395 +++++++++++++++++++++--- src/cons/ledger_handler.hpp | 27 +- src/crypto.cpp | 21 ++ src/crypto.hpp | 2 + src/fbschema/ledger_helpers.cpp | 9 +- src/fbschema/ledger_helpers.hpp | 2 +- src/fbschema/ledger_schema.fbs | 3 +- src/fbschema/ledger_schema_generated.h | 47 ++- src/fbschema/p2pmsg_content.fbs | 21 +- src/fbschema/p2pmsg_content_generated.h | 303 +++++++++++++++++- src/fbschema/p2pmsg_helpers.cpp | 120 ++++++- src/fbschema/p2pmsg_helpers.hpp | 14 + src/main.cpp | 3 + src/p2p/p2p.cpp | 37 ++- src/p2p/p2p.hpp | 24 +- src/p2p/peer_session_handler.cpp | 21 ++ 18 files changed, 1006 insertions(+), 98 deletions(-) diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 32f71de5..67e14752 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -36,9 +36,10 @@ int init() ctx.stage = 0; //load lcl details from lcl history. - const ledger_history ldr_hist = load_ledger(); + ledger_history ldr_hist = load_ledger(); ctx.led_seq_no = ldr_hist.led_seq_no; ctx.lcl = ldr_hist.lcl; + ctx.lcl_list.swap(ldr_hist.lcl_list); return 0; } @@ -60,19 +61,6 @@ void consensus() ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::ctx.collected_msgs.proposals); } - LOG_DBG << "Started stage " << std::to_string(ctx.stage); - for (const auto p : ctx.candidate_proposals) - { - const bool self = p.pubkey == conf::cfg.pubkey; - LOG_DBG << "[stage" << std::to_string(p.stage) - << "] users:" << p.users.size() - << " hinp:" << p.hash_inputs.size() - << " hout:" << p.hash_outputs.size() - << " lcl:" << p.lcl - << " self:" << self; - } - LOG_DBG << "timenow: " << std::to_string(ctx.time_now); - // Throughout consensus, we move over the incoming npl messages collected via the network so far into // the candidate npl message set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming npl messages. @@ -93,7 +81,6 @@ void consensus() if (ctx.stage == 0) { // Stage 0 means begining of a consensus round. - { // Remove any useless candidate proposals so we'll have a cleaner proposal set to look at // when we transition to stage 1. @@ -123,6 +110,21 @@ void consensus() } else // Stage 1, 2, 3 { + + std::cout << "Started stage " << std::to_string(ctx.stage) << "\n"; + for (auto p : ctx.candidate_proposals) + { + bool self = p.pubkey == conf::cfg.pubkey; + LOG_DBG << "[stage" << std::to_string(p.stage) + << "] users:" << p.users.size() + << " hinp:" << p.hash_inputs.size() + << " hout:" << p.hash_outputs.size() + << " lcl:" << p.lcl + << " self:" << self + << "\n"; + } + + LOG_DBG << "timenow:" << std::to_string(ctx.time_now) << "\n"; // Initialize vote counters vote_counter votes; @@ -143,13 +145,14 @@ void consensus() if (should_request_history) { - //todo:create history request message and request request history from a random peer. + //create history request message and request history from a random peer. + send_ledger_history_request(ctx.lcl, majority_lcl); } if (is_lcl_desync) { - const bool should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4); + bool should_reset = (ctx.time_now - ctx.novel_proposal_time) > (floor(conf::cfg.roundtime) + floor(rand() % conf::cfg.roundtime)); //for now we are resetting to stage 0 to avoid possible deadlock situations - timewait_stage(true); + timewait_stage(should_reset); return; } @@ -166,13 +169,14 @@ void consensus() else ++itr; } + //ctx.candidate_proposals.clear(); if (ctx.stage == 3) { apply_ledger(stg_prop); // We have finished a consensus round (all 4 stages). - LOG_DBG << "****Stage 3 consensus reached****"; + LOG_INFO << "****Stage 3 consensus reached****"; } } @@ -407,8 +411,6 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority // Vote stages if only proposal lcl is match with node's last consensus lcl if (cp.lcl == ctx.lcl) increment(votes.stage, cp.stage); - - // todo:vote for lcl checking condtion } majority_stage = 0; @@ -426,7 +428,7 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority if (majority_stage < ctx.stage - 1) { - should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4); + should_reset = (ctx.time_now - ctx.novel_proposal_time) > (floor(conf::cfg.roundtime) + floor(rand() % conf::cfg.roundtime)); is_desync = true; LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage) @@ -496,11 +498,12 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string { LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer"; is_desync = true; - //todo:create history request message and request request history from a random peer. + should_request_history = true; return; } } + /** * Returns the consensus percentage threshold for the specified stage. * @param stage The consensus stage [1, 2, 3] @@ -533,8 +536,9 @@ void timewait_stage(const bool reset) */ void apply_ledger(const p2p::proposal &cons_prop) { - ctx.led_seq_no++; - ctx.lcl = cons::save_ledger(cons_prop, ctx.led_seq_no); + const std::tuple new_lcl = save_ledger(cons_prop); + ctx.led_seq_no = std::get<0>(new_lcl); + ctx.lcl = std::get<1>(new_lcl); // After the current ledger seq no is updated, we remove any newly expired inputs from candidate set. { diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index d9c72f6f..b46bb242 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -70,6 +70,7 @@ struct consensus_context uint64_t time_now; std::string lcl; uint64_t led_seq_no; + std::map lcl_list; consensus_context() : recent_userinput_hashes(200) { diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp index 746e9113..690117cb 100644 --- a/src/cons/ledger_handler.cpp +++ b/src/cons/ledger_handler.cpp @@ -4,60 +4,156 @@ #include "../crypto.hpp" #include "../p2p/p2p.hpp" #include "../fbschema/ledger_helpers.hpp" +#include "../fbschema/p2pmsg_helpers.hpp" #include "ledger_handler.hpp" +#include "cons.hpp" namespace cons { -std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no) +namespace p2pmsg = fbschema::p2pmsg; +std::string last_requested_lcl; + +/** + * Create and save ledger from the given proposal message. + * @param proposal consensus reached Satge 3 proposal. + * @return tuple of current lcl sequence number and file name of the saved lcl. + */ +const std::tuple save_ledger(const p2p::proposal &proposal) { + const size_t pos = proposal.lcl.find("-"); + uint64_t led_seq_no = 0; + + if (pos != std::string::npos) + { + led_seq_no = std::stoull(proposal.lcl.substr(0, pos)); //get lcl sequence number. + led_seq_no++; //current lcl sequence number. + } + else + { + //lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. + LOG_ERR << "Invalid lcl name: " << proposal.lcl << " when saving ledger."; + } + //Serialize lcl using flatbuffer ledger schema. flatbuffers::FlatBufferBuilder builder(1024); - const std::string_view ledger_str = fbschema::ledger::create_ledger_from_proposal(builder, proposal); + const std::string_view ledger_str = fbschema::ledger::create_ledger_from_proposal(builder, proposal, led_seq_no); //Get binary hash of the the serialized lcl. const std::string lcl = crypto::get_hash(ledger_str); - //Get hex from binary hash + //Get hex from binary hash. std::string lcl_hash; util::bin2hex(lcl_hash, reinterpret_cast(lcl.data()), lcl.size()); - //create file path to save lcl. - //file name -> [ledger sequnce numer]-[lcl hex] - std::string path; - const std::string seq_no = std::to_string(led_seq_no); - path.reserve(conf::ctx.histdir.size() + lcl_hash.size() + seq_no.size() + 6); - path.append(conf::ctx.histdir); - path.append("/"); - path.append(seq_no); - path.append("-"); - path.append(lcl_hash); - path.append(".lcl"); + //construct lcl file name. + //lcl file name should follow [ledger sequnce numer]-lcl[lcl hex] format. + const std::string seq_no_str = std::to_string(led_seq_no); + std::string file_name; + file_name.reserve(lcl_hash.size() + seq_no_str.size() + 1); + file_name.append(seq_no_str) + .append("-") + .append(lcl_hash); - //write lcl to file system - std::ofstream ofs(std::move(path)); - ofs.write(ledger_str.data(), ledger_str.size()); - ofs.close(); + write_ledger(file_name, ledger_str.data(), ledger_str.size()); - return lcl_hash; + cons::ctx.lcl_list.emplace(led_seq_no, file_name); + + //Remove old ledgers that exceeds max sequence range. + if (led_seq_no > MAX_LEDGER_SEQUENCE) + { + remove_old_ledgers(led_seq_no - MAX_LEDGER_SEQUENCE); + } + + return std::make_tuple(led_seq_no, std::move(file_name)); } -ledger_history load_ledger() +/** + * Remove old ledgers that exceeds max sequence range from file system and ledger history cache. + * @param led_seq_no minimum sequence number to be in history. + */ +void remove_old_ledgers(const uint64_t led_seq_no) +{ + std::map::iterator itr; + + std::string dir_path; + + dir_path.reserve(conf::ctx.histdir.size() + 1); + dir_path.append(conf::ctx.histdir) + .append("/"); + + for (itr = cons::ctx.lcl_list.begin(); + itr != cons::ctx.lcl_list.lower_bound(led_seq_no + 1); + itr++) + { + const std::string file_name = itr->second; + std::string file_path; + file_path.reserve(dir_path.size() + itr->second.size() + 4); + file_path.append(dir_path) + .append(file_name) + .append(".lcl"); + + if (boost::filesystem::exists(file_path)) + boost::filesystem::remove(file_path); + } + cons::ctx.lcl_list.erase(cons::ctx.lcl_list.begin(), cons::ctx.lcl_list.lower_bound(led_seq_no + 1)); +} + +/** + * Write ledger to file system. + * @param file_name current ledger sequence number. + * @param ledger_raw raw lcl data. + * @param ledger_size size of the raw lcl data. + */ +void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size) +{ + //create file path to save ledger. + //file name -> [ledger sequnce numer]-[lcl hex] + + std::string path; + + path.reserve(file_name.size() + conf::ctx.histdir.size() + 5); + path.append(conf::ctx.histdir) + .append("/") + .append(file_name) + .append(".lcl"); + + //write ledger to file system + std::ofstream ofs(std::move(path)); + ofs.write(ledger_raw, ledger_size); + ofs.close(); +} + +/** + * Delete ledger from file system. + * @param file_name name of ledger to be deleted. + */ +void remove_ledger(const std::string &file_name) +{ + std::string file_path; + file_path.reserve(conf::ctx.histdir.size() + file_name.size() + 5); + file_path.append(conf::ctx.histdir) + .append("/") + .append(file_name) + .append(".lcl"); + boost::filesystem::remove(file_path); +} + +/** + * Retrieve lcl(last closed ledger) information from ledger history. + * @return A ledger_history struct representing the lcl. + */ +const ledger_history load_ledger() { ledger_history ldg_hist; - ldg_hist.led_seq_no = 0; - // might need to load history in order to request response lcl history - //std::unordered_map lcl_history_files; - //Get all records at lcl history direcory and find the last closed ledger. - std::string latest_file_name; size_t latest_pos = 0; for (const auto &entry : boost::filesystem::directory_iterator(conf::ctx.histdir)) { const boost::filesystem::path file_path = entry.path(); - const std::string file_name = entry.path().filename().string(); + const std::string file_name = entry.path().stem().string(); if (boost::filesystem::is_directory(file_path)) { @@ -70,36 +166,253 @@ ledger_history load_ledger() else { const size_t pos = file_name.find("-"); - uint64_t seq_no; + uint64_t seq_no = 0; if (pos != std::string::npos) { seq_no = std::stoull(file_name.substr(0, pos)); + ldg_hist.lcl_list.emplace(seq_no, file_name); //lcl -> [seq_no-hash] } else { //lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. - LOG_ERR << "Invalid file name: " << file_name; - } - - if (seq_no > ldg_hist.led_seq_no) - { - ldg_hist.led_seq_no = seq_no; - latest_pos = pos; - latest_file_name = file_name; + LOG_ERR << "Invalid lcl file name: " << file_name << " in " << conf::ctx.histdir; } } } - + //check if there is a saved lcl file -> if no send genesis lcl. - if (latest_file_name.empty()) - ldg_hist.lcl = "genesis"; - else if ((latest_file_name.size() - 6) > latest_pos) //validation to check position is not the end of the file name. - ldg_hist.lcl = latest_file_name.substr(latest_pos + 1, (latest_file_name.size() - 6)); + if (ldg_hist.lcl_list.empty()) + { + ldg_hist.led_seq_no = 0; + ldg_hist.lcl = "0-genesis"; + } else - LOG_ERR << "Invalid latest file name: " << latest_file_name; + { + ldg_hist.led_seq_no = ldg_hist.lcl_list.rbegin()->first; + ldg_hist.lcl = ldg_hist.lcl_list.rbegin()->second; + + //Remove old ledgers that exceeds max sequence range. + if (ldg_hist.led_seq_no > MAX_LEDGER_SEQUENCE) + { + remove_old_ledgers(ldg_hist.led_seq_no - MAX_LEDGER_SEQUENCE); + } + } return ldg_hist; } +/** + * Create and send ledger history request to random node from unl list. + * @param minimum_lcl hash of the minimum lcl from which node need lcl history. + * @param required_lcl hash of the required lcl. + */ +void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl) +{ + p2p::history_request hr; + hr.required_lcl = required_lcl; + hr.minimum_lcl = minimum_lcl; + p2p::peer_outbound_message msg(std::make_unique(1024)); + p2pmsg::create_msg_from_history_request(msg.builder(), hr); + p2p::send_message_to_random_peer(msg); + + last_requested_lcl = required_lcl; + + LOG_DBG << "Ledger history request sent." + << " lcl:" << required_lcl; +} + +/** + * Check requested lcl is in node's lcl history cache. + * @param hr lcl history request information. + * @return true if requested lcl is in lcl history cache. + */ +bool check_required_lcl_availability(const p2p::history_request &hr) +{ + size_t pos = hr.required_lcl.find("-"); + uint64_t req_seq_no = 0; + + //get sequence number of required lcl + if (pos != std::string::npos) + { + req_seq_no = std::stoull(hr.required_lcl.substr(0, pos)); //get required lcl sequence number + } + + if (req_seq_no > 0) + { + const auto itr = cons::ctx.lcl_list.find(req_seq_no); + if (itr == cons::ctx.lcl_list.end()) + { + LOG_DBG << "Required lcl peer asked for is not in our lcl cache."; + //either this node is also not in consesnsus ledger or other node requesting a lcl that is older than maximum ledger range. + return false; + } + else if (itr->second != hr.required_lcl) + { + LOG_DBG << "Required lcl peer asked for is not in our lcl cache."; + //either this node or requesting node is in a fork condition. + return false; + } + } + return true; +} + +/** + * Retrieve lcl(last closed ledger) information from ledger history. + * @param hr lcl history request information. + * @return A ledger history response containing requested ledger details. + */ +const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr) +{ + p2p::history_response history_response; + + size_t pos = hr.minimum_lcl.find("-"); + uint64_t min_seq_no = 0; + + //get sequence number of minimum lcl required + if (pos != std::string::npos) + { + min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); //get required lcl sequence number + } + + const auto itr = cons::ctx.lcl_list.find(min_seq_no); + if (itr != cons::ctx.lcl_list.end()) //requested minimum lcl is not in our lcl history cache + { + LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl"; + min_seq_no = itr->first; + } + else + { + min_seq_no = cons::ctx.lcl_list.begin()->first; + } + + //copy current history cache. + std::map lcl_list = cons::ctx.lcl_list; + + //filter out cache and get raw files here. + lcl_list.erase( + lcl_list.begin(), + lcl_list.lower_bound(min_seq_no)); + + for (auto &[seq_no, lcl_hash] : lcl_list) + { + p2p::history_ledger ledger; + ledger.lcl = lcl_hash; + + std::string path; + + path.reserve(conf::ctx.histdir.size() + lcl_hash.size() + 5); + path.append(conf::ctx.histdir) + .append("/") + .append(lcl_hash) + .append(".lcl"); + + //read lcl file + std::ifstream file(path, std::ios::binary | std::ios::ate); + std::streamsize size = file.tellg(); + file.seekg(0, std::ios::beg); + + std::vector buffer(size); + if (file.read(buffer.data(), size)) + { + ledger.raw_ledger = reinterpret_cast &>(buffer); + history_response.hist_ledgers.emplace(seq_no, ledger); + } + } + + return history_response; +} + +/** + * Send ledger history response for history request. + * @param hr lcl history request information. + * @return peer outbound message object with ledger history response. + */ +p2p::peer_outbound_message send_ledger_history(const p2p::history_request &hr) +{ + p2p::peer_outbound_message msg(std::make_unique(1024)); + p2pmsg::create_msg_from_history_response(msg.builder(), retrieve_ledger_history(hr)); + + return msg; +} + +/** + * Handle recieved ledger history response. + * @param hr lcl history request information. + * @return peer outbound message object with ledger history response. + */ +void handle_ledger_history_response(const p2p::history_response &hr) +{ + //check response object contains + if (last_requested_lcl.empty()) + { + LOG_DBG << "Peer sent us a history response but we never asked for one!"; + return; + } + + //check whether recieved lcl history contains the current lcl node required. + bool have_requested_lcl = false; + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + if (last_requested_lcl == ledger.lcl) + { + have_requested_lcl = true; + break; + } + } + + if (!have_requested_lcl) + { + LOG_DBG << "Peer sent us a history response but not containing the lcl we asked for!"; + return; + } + + //Check integrity of recieved lcl list. + //By checking recieved lcl hashes matches lcl content by applying hashing for each raw content. + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + const size_t pos = ledger.lcl.find("-"); + std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1)); + + //Get binary hash of the the serialized lcl. + const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size()); + + //Get hex from binary hash + std::string lcl_hash; + + util::bin2hex(lcl_hash, + reinterpret_cast(lcl.data()), + lcl.size()); + + //LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash; + + //recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it + if (lcl_hash != rec_lcl_hash) + { + LOG_WARN << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes"; + //todo: we should penalize peer who send this? + return; + } + } + + //Execution to here means the history data sent checks out + //Save recieved lcl in file system and update lcl history cache + for (auto &[seq_no, ledger] : hr.hist_ledgers) + { + auto prev_dup_itr = cons::ctx.lcl_list.find(seq_no); + if (prev_dup_itr != cons::ctx.lcl_list.end()) + { + remove_ledger(prev_dup_itr->second); + cons::ctx.lcl_list.erase(prev_dup_itr); + } + write_ledger(ledger.lcl, reinterpret_cast(&ledger.raw_ledger[0]), ledger.raw_ledger.size()); + cons::ctx.lcl_list.emplace(seq_no, ledger.lcl); + } + + last_requested_lcl = ""; + const auto latest_lcl_itr = cons::ctx.lcl_list.rbegin(); + cons::ctx.lcl = latest_lcl_itr->second; + cons::ctx.led_seq_no = latest_lcl_itr->first; +} + } // namespace cons \ No newline at end of file diff --git a/src/cons/ledger_handler.hpp b/src/cons/ledger_handler.hpp index 65dcdb08..c92e5acd 100644 --- a/src/cons/ledger_handler.hpp +++ b/src/cons/ledger_handler.hpp @@ -1,20 +1,43 @@ #ifndef _HP_CONS_LEDGER_ #define _HP_CONS_LEDGER_ +#include "../pchheader.hpp" #include "../p2p/p2p.hpp" namespace cons { +//max ledger count +constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; + struct ledger_history { std::string lcl; uint64_t led_seq_no; + std::map lcl_list; }; -std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no); +extern std::string last_requested_lcl; -ledger_history load_ledger(); +const std::tuple save_ledger(const p2p::proposal &proposal); + +void remove_old_ledgers(const uint64_t led_seq_no); + +void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size); + +void remove_ledger(const std::string &file_name); + +const ledger_history load_ledger(); + +void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl); + +bool check_required_lcl_availability(const p2p::history_request &hr); + +const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr); + +p2p::peer_outbound_message send_ledger_history(const p2p::history_request &hr); + +void handle_ledger_history_response(const p2p::history_response &hr); } diff --git a/src/crypto.cpp b/src/crypto.cpp index 729f2ff3..3c9cb286 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -151,6 +151,27 @@ std::string get_hash(std::string_view data) return hash; } +/** + * Generate blake2b hash for a given message. + * @param data unsigned char array pointer to hash data. + * @param data_length hash data length. + * @return The blake2b hash of the pointed buffer. + */ +std::string get_hash(const unsigned char * data, size_t data_length) +{ + std::string hash; + hash.resize(crypto_generichash_blake2b_BYTES); + + crypto_generichash_blake2b( + reinterpret_cast(hash.data()), + hash.length(), + data, + data_length, + NULL, 0); + + return hash; +} + /** * Generates blake2b hash for the given set of strings using stream hashing. */ diff --git a/src/crypto.hpp b/src/crypto.hpp index 4f5fe6f0..fa8bf9ad 100644 --- a/src/crypto.hpp +++ b/src/crypto.hpp @@ -31,6 +31,8 @@ int verify_hex(std::string_view msg, std::string_view sighex, std::string_view p std::string get_hash(std::string_view data); +std::string get_hash(const unsigned char * data, size_t data_length); + std::string get_hash(std::string_view s1, std::string_view s2); } // namespace crypto diff --git a/src/fbschema/ledger_helpers.cpp b/src/fbschema/ledger_helpers.cpp index 797ec455..6d68b55c 100644 --- a/src/fbschema/ledger_helpers.cpp +++ b/src/fbschema/ledger_helpers.cpp @@ -12,16 +12,17 @@ namespace fbschema::ledger * Create ledger from the given proposal struct. * @param p The proposal struct to be placed in ledger. */ -std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p) +std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no) { flatbuffers::Offset ledger = ledger::CreateLedger( builder, + seq_no, p.time, sv_to_flatbuff_bytes(builder, p.lcl), - stringlist_to_flatbuf_bytearrayvector(builder, p.users), 0, 0 - //p2p::hashbuffermap_to_flatbuf_rawinputs(builder, p.raw_inputs), - //stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs) + stringlist_to_flatbuf_bytearrayvector(builder, p.users), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs), + stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs) ); builder.Finish(ledger); // Finished building message content to get serialised content. diff --git a/src/fbschema/ledger_helpers.hpp b/src/fbschema/ledger_helpers.hpp index 8cc3707b..4188fcef 100644 --- a/src/fbschema/ledger_helpers.hpp +++ b/src/fbschema/ledger_helpers.hpp @@ -9,7 +9,7 @@ namespace fbschema::ledger { -std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p); +std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no); } #endif \ No newline at end of file diff --git a/src/fbschema/ledger_schema.fbs b/src/fbschema/ledger_schema.fbs index df07179d..755ac7ea 100644 --- a/src/fbschema/ledger_schema.fbs +++ b/src/fbschema/ledger_schema.fbs @@ -3,10 +3,11 @@ include "common_schema.fbs"; namespace fbschema.ledger; table Ledger { + seq_no:uint64; time:uint64; lcl:[ubyte]; users: [ByteArray]; - inputs: [RawInputList]; + inputs: [ByteArray]; outputs: [ByteArray]; } diff --git a/src/fbschema/ledger_schema_generated.h b/src/fbschema/ledger_schema_generated.h index 4bbd253c..6332d8f6 100644 --- a/src/fbschema/ledger_schema_generated.h +++ b/src/fbschema/ledger_schema_generated.h @@ -1,8 +1,8 @@ // automatically generated by the FlatBuffers compiler, do not modify -#ifndef FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_ -#define FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_ +#ifndef FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ +#define FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ #include "flatbuffers/flatbuffers.h" @@ -17,12 +17,19 @@ struct RawInputList; struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_TIME = 4, - VT_LCL = 6, - VT_USERS = 8, - VT_INPUTS = 10, - VT_OUTPUTS = 12 + VT_SEQ_NO = 4, + VT_TIME = 6, + VT_LCL = 8, + VT_USERS = 10, + VT_INPUTS = 12, + VT_OUTPUTS = 14 }; + uint64_t seq_no() const { + return GetField(VT_SEQ_NO, 0); + } + bool mutate_seq_no(uint64_t _seq_no) { + return SetField(VT_SEQ_NO, _seq_no, 0); + } uint64_t time() const { return GetField(VT_TIME, 0); } @@ -41,11 +48,11 @@ struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { flatbuffers::Vector> *mutable_users() { return GetPointer> *>(VT_USERS); } - const flatbuffers::Vector> *inputs() const { - return GetPointer> *>(VT_INPUTS); + const flatbuffers::Vector> *inputs() const { + return GetPointer> *>(VT_INPUTS); } - flatbuffers::Vector> *mutable_inputs() { - return GetPointer> *>(VT_INPUTS); + flatbuffers::Vector> *mutable_inputs() { + return GetPointer> *>(VT_INPUTS); } const flatbuffers::Vector> *outputs() const { return GetPointer> *>(VT_OUTPUTS); @@ -55,6 +62,7 @@ struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && + VerifyField(verifier, VT_SEQ_NO) && VerifyField(verifier, VT_TIME) && VerifyOffset(verifier, VT_LCL) && verifier.VerifyVector(lcl()) && @@ -74,6 +82,9 @@ struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { struct LedgerBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; + void add_seq_no(uint64_t seq_no) { + fbb_.AddElement(Ledger::VT_SEQ_NO, seq_no, 0); + } void add_time(uint64_t time) { fbb_.AddElement(Ledger::VT_TIME, time, 0); } @@ -83,7 +94,7 @@ struct LedgerBuilder { void add_users(flatbuffers::Offset>> users) { fbb_.AddOffset(Ledger::VT_USERS, users); } - void add_inputs(flatbuffers::Offset>> inputs) { + void add_inputs(flatbuffers::Offset>> inputs) { fbb_.AddOffset(Ledger::VT_INPUTS, inputs); } void add_outputs(flatbuffers::Offset>> outputs) { @@ -103,13 +114,15 @@ struct LedgerBuilder { inline flatbuffers::Offset CreateLedger( flatbuffers::FlatBufferBuilder &_fbb, + uint64_t seq_no = 0, uint64_t time = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset>> users = 0, - flatbuffers::Offset>> inputs = 0, + flatbuffers::Offset>> inputs = 0, flatbuffers::Offset>> outputs = 0) { LedgerBuilder builder_(_fbb); builder_.add_time(time); + builder_.add_seq_no(seq_no); builder_.add_outputs(outputs); builder_.add_inputs(inputs); builder_.add_users(users); @@ -119,17 +132,19 @@ inline flatbuffers::Offset CreateLedger( inline flatbuffers::Offset CreateLedgerDirect( flatbuffers::FlatBufferBuilder &_fbb, + uint64_t seq_no = 0, uint64_t time = 0, const std::vector *lcl = nullptr, const std::vector> *users = nullptr, - const std::vector> *inputs = nullptr, + const std::vector> *inputs = nullptr, const std::vector> *outputs = nullptr) { auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; auto users__ = users ? _fbb.CreateVector>(*users) : 0; - auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; + auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; auto outputs__ = outputs ? _fbb.CreateVector>(*outputs) : 0; return fbschema::ledger::CreateLedger( _fbb, + seq_no, time, lcl__, users__, @@ -245,4 +260,4 @@ inline void FinishSizePrefixedLedgerBuffer( } // namespace ledger } // namespace fbschema -#endif // FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_ +#endif // FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index a46158b1..9cc3d67c 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -12,7 +12,7 @@ table UserSubmittedMessageGroup { messages:[UserSubmittedMessage]; } -union Message { NonUnl_Proposal_Message, Proposal_Message, Npl_Message } //message content type +union Message { NonUnl_Proposal_Message, Proposal_Message, Npl_Message, History_Request_Message, History_Response_Message } //message content type table Content { message:Message; @@ -35,6 +35,25 @@ table Npl_Message { //NPL type message schema data:[ubyte]; } +table History_Request_Message { //Ledger History request type message schema + minimum_lcl:[ubyte]; + required_lcl:[ubyte]; +} + +table History_Response_Message { //Ledger History request type message schema + hist_ledgers:[HistoryLedgerPair]; +} + +table HistoryLedgerPair { //A key, value pair of byte[]. + seq_no:uint64; + ledger:HistoryLedger; +} + +table HistoryLedger { + lcl:[ubyte]; + raw_ledger:[ubyte]; +} + table StateDifference { //Represent state difference by tracking created,updated and deleted state files. created: [BytesKeyValuePair]; //list of { fn => hash } updated: [BytesKeyValuePair]; diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index cdc26094..6a0e1535 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -23,6 +23,14 @@ struct Proposal_Message; struct Npl_Message; +struct History_Request_Message; + +struct History_Response_Message; + +struct HistoryLedgerPair; + +struct HistoryLedger; + struct StateDifference; struct State; @@ -32,16 +40,20 @@ enum Message { Message_NonUnl_Proposal_Message = 1, Message_Proposal_Message = 2, Message_Npl_Message = 3, + Message_History_Request_Message = 4, + Message_History_Response_Message = 5, Message_MIN = Message_NONE, - Message_MAX = Message_Npl_Message + Message_MAX = Message_History_Response_Message }; -inline const Message (&EnumValuesMessage())[4] { +inline const Message (&EnumValuesMessage())[6] { static const Message values[] = { Message_NONE, Message_NonUnl_Proposal_Message, Message_Proposal_Message, - Message_Npl_Message + Message_Npl_Message, + Message_History_Request_Message, + Message_History_Response_Message }; return values; } @@ -52,13 +64,15 @@ inline const char * const *EnumNamesMessage() { "NonUnl_Proposal_Message", "Proposal_Message", "Npl_Message", + "History_Request_Message", + "History_Response_Message", nullptr }; return names; } inline const char *EnumNameMessage(Message e) { - if (e < Message_NONE || e > Message_Npl_Message) return ""; + if (e < Message_NONE || e > Message_History_Response_Message) return ""; const size_t index = static_cast(e); return EnumNamesMessage()[index]; } @@ -79,6 +93,14 @@ template<> struct MessageTraits { static const Message enum_value = Message_Npl_Message; }; +template<> struct MessageTraits { + static const Message enum_value = Message_History_Request_Message; +}; + +template<> struct MessageTraits { + static const Message enum_value = Message_History_Response_Message; +}; + bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type); bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); @@ -247,6 +269,12 @@ struct Content FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const Npl_Message *message_as_Npl_Message() const { return message_type() == Message_Npl_Message ? static_cast(message()) : nullptr; } + const History_Request_Message *message_as_History_Request_Message() const { + return message_type() == Message_History_Request_Message ? static_cast(message()) : nullptr; + } + const History_Response_Message *message_as_History_Response_Message() const { + return message_type() == Message_History_Response_Message ? static_cast(message()) : nullptr; + } void *mutable_message() { return GetPointer(VT_MESSAGE); } @@ -271,6 +299,14 @@ template<> inline const Npl_Message *Content::message_as() const { return message_as_Npl_Message(); } +template<> inline const History_Request_Message *Content::message_as() const { + return message_as_History_Request_Message(); +} + +template<> inline const History_Response_Message *Content::message_as() const { + return message_as_History_Response_Message(); +} + struct ContentBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; @@ -545,6 +581,257 @@ inline flatbuffers::Offset CreateNpl_MessageDirect( data__); } +struct History_Request_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_MINIMUM_LCL = 4, + VT_REQUIRED_LCL = 6 + }; + const flatbuffers::Vector *minimum_lcl() const { + return GetPointer *>(VT_MINIMUM_LCL); + } + flatbuffers::Vector *mutable_minimum_lcl() { + return GetPointer *>(VT_MINIMUM_LCL); + } + const flatbuffers::Vector *required_lcl() const { + return GetPointer *>(VT_REQUIRED_LCL); + } + flatbuffers::Vector *mutable_required_lcl() { + return GetPointer *>(VT_REQUIRED_LCL); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_MINIMUM_LCL) && + verifier.VerifyVector(minimum_lcl()) && + VerifyOffset(verifier, VT_REQUIRED_LCL) && + verifier.VerifyVector(required_lcl()) && + verifier.EndTable(); + } +}; + +struct History_Request_MessageBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_minimum_lcl(flatbuffers::Offset> minimum_lcl) { + fbb_.AddOffset(History_Request_Message::VT_MINIMUM_LCL, minimum_lcl); + } + void add_required_lcl(flatbuffers::Offset> required_lcl) { + fbb_.AddOffset(History_Request_Message::VT_REQUIRED_LCL, required_lcl); + } + explicit History_Request_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + History_Request_MessageBuilder &operator=(const History_Request_MessageBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHistory_Request_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> minimum_lcl = 0, + flatbuffers::Offset> required_lcl = 0) { + History_Request_MessageBuilder builder_(_fbb); + builder_.add_required_lcl(required_lcl); + builder_.add_minimum_lcl(minimum_lcl); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHistory_Request_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *minimum_lcl = nullptr, + const std::vector *required_lcl = nullptr) { + auto minimum_lcl__ = minimum_lcl ? _fbb.CreateVector(*minimum_lcl) : 0; + auto required_lcl__ = required_lcl ? _fbb.CreateVector(*required_lcl) : 0; + return fbschema::p2pmsg::CreateHistory_Request_Message( + _fbb, + minimum_lcl__, + required_lcl__); +} + +struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_HIST_LEDGERS = 4 + }; + const flatbuffers::Vector> *hist_ledgers() const { + return GetPointer> *>(VT_HIST_LEDGERS); + } + flatbuffers::Vector> *mutable_hist_ledgers() { + return GetPointer> *>(VT_HIST_LEDGERS); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_HIST_LEDGERS) && + verifier.VerifyVector(hist_ledgers()) && + verifier.VerifyVectorOfTables(hist_ledgers()) && + verifier.EndTable(); + } +}; + +struct History_Response_MessageBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_hist_ledgers(flatbuffers::Offset>> hist_ledgers) { + fbb_.AddOffset(History_Response_Message::VT_HIST_LEDGERS, hist_ledgers); + } + explicit History_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + History_Response_MessageBuilder &operator=(const History_Response_MessageBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHistory_Response_Message( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset>> hist_ledgers = 0) { + History_Response_MessageBuilder builder_(_fbb); + builder_.add_hist_ledgers(hist_ledgers); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHistory_Response_MessageDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector> *hist_ledgers = nullptr) { + auto hist_ledgers__ = hist_ledgers ? _fbb.CreateVector>(*hist_ledgers) : 0; + return fbschema::p2pmsg::CreateHistory_Response_Message( + _fbb, + hist_ledgers__); +} + +struct HistoryLedgerPair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_SEQ_NO = 4, + VT_LEDGER = 6 + }; + uint64_t seq_no() const { + return GetField(VT_SEQ_NO, 0); + } + bool mutate_seq_no(uint64_t _seq_no) { + return SetField(VT_SEQ_NO, _seq_no, 0); + } + const HistoryLedger *ledger() const { + return GetPointer(VT_LEDGER); + } + HistoryLedger *mutable_ledger() { + return GetPointer(VT_LEDGER); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_SEQ_NO) && + VerifyOffset(verifier, VT_LEDGER) && + verifier.VerifyTable(ledger()) && + verifier.EndTable(); + } +}; + +struct HistoryLedgerPairBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_seq_no(uint64_t seq_no) { + fbb_.AddElement(HistoryLedgerPair::VT_SEQ_NO, seq_no, 0); + } + void add_ledger(flatbuffers::Offset ledger) { + fbb_.AddOffset(HistoryLedgerPair::VT_LEDGER, ledger); + } + explicit HistoryLedgerPairBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + HistoryLedgerPairBuilder &operator=(const HistoryLedgerPairBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHistoryLedgerPair( + flatbuffers::FlatBufferBuilder &_fbb, + uint64_t seq_no = 0, + flatbuffers::Offset ledger = 0) { + HistoryLedgerPairBuilder builder_(_fbb); + builder_.add_seq_no(seq_no); + builder_.add_ledger(ledger); + return builder_.Finish(); +} + +struct HistoryLedger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_LCL = 4, + VT_RAW_LEDGER = 6 + }; + const flatbuffers::Vector *lcl() const { + return GetPointer *>(VT_LCL); + } + flatbuffers::Vector *mutable_lcl() { + return GetPointer *>(VT_LCL); + } + const flatbuffers::Vector *raw_ledger() const { + return GetPointer *>(VT_RAW_LEDGER); + } + flatbuffers::Vector *mutable_raw_ledger() { + return GetPointer *>(VT_RAW_LEDGER); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_LCL) && + verifier.VerifyVector(lcl()) && + VerifyOffset(verifier, VT_RAW_LEDGER) && + verifier.VerifyVector(raw_ledger()) && + verifier.EndTable(); + } +}; + +struct HistoryLedgerBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_lcl(flatbuffers::Offset> lcl) { + fbb_.AddOffset(HistoryLedger::VT_LCL, lcl); + } + void add_raw_ledger(flatbuffers::Offset> raw_ledger) { + fbb_.AddOffset(HistoryLedger::VT_RAW_LEDGER, raw_ledger); + } + explicit HistoryLedgerBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + HistoryLedgerBuilder &operator=(const HistoryLedgerBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateHistoryLedger( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> lcl = 0, + flatbuffers::Offset> raw_ledger = 0) { + HistoryLedgerBuilder builder_(_fbb); + builder_.add_raw_ledger(raw_ledger); + builder_.add_lcl(lcl); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateHistoryLedgerDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *lcl = nullptr, + const std::vector *raw_ledger = nullptr) { + auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; + auto raw_ledger__ = raw_ledger ? _fbb.CreateVector(*raw_ledger) : 0; + return fbschema::p2pmsg::CreateHistoryLedger( + _fbb, + lcl__, + raw_ledger__); +} + struct StateDifference FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_CREATED = 4, @@ -756,6 +1043,14 @@ inline bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Mess auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } + case Message_History_Request_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } + case Message_History_Response_Message: { + auto ptr = reinterpret_cast(obj); + return verifier.VerifyTable(ptr); + } default: return false; } } diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index 9d077ede..1da6d07a 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -154,6 +154,21 @@ const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal return nup; } +/** + * Creates a history response stuct from the given histrory response message. + * @param msg Flatbuffer History response message received from the peer. + * @return A History response struct representing the message. + */ +const p2p::history_response create_history_response_from_msg(const History_Response_Message &msg) +{ + p2p::history_response hr; + + if (msg.hist_ledgers()) + hr.hist_ledgers = flatbuf_historyledgermap_to_historyledgermap(msg.hist_ledgers()); + + return hr; +} + /** * Creates a proposal stuct from the given proposal message. * @param The Flatbuffer poporal received from the peer. @@ -181,6 +196,24 @@ const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const return p; } +/** + * Creates a history request struct from the given history request message. + * @param msg Flatbuffer History request message received from the peer. + * @return A History request struct representing the message. + */ +const p2p::history_request create_history_request_from_msg(const History_Request_Message &msg) +{ + p2p::history_request hr; + + if (msg.minimum_lcl()) + hr.minimum_lcl = flatbuff_bytes_to_sv(msg.minimum_lcl()); + + if (msg.required_lcl()) + hr.required_lcl = flatbuff_bytes_to_sv(msg.required_lcl()); + + return hr; +} + //---Message creation helpers---// void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup) @@ -201,7 +234,7 @@ void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_b } /** - * Ctreat proposal peer message from the given proposal struct. + * Create proposal peer message from the given proposal struct. * @param container_builder Flatbuffer builder for the container message. * @param p The proposal struct to be placed in the container message. */ @@ -250,6 +283,51 @@ void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builde create_containermsg_from_content(container_builder, builder, lcl, true); } +/** + * Create history request message from the given history request struct. + * @param container_builder Flatbuffer builder for the container message. + * @param hr The History request struct to be placed in the container message. + */ +void create_msg_from_history_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_request &hr) +{ + flatbuffers::FlatBufferBuilder builder(1024); + + flatbuffers::Offset hrmsg = + CreateHistory_Request_Message( + builder, + sv_to_flatbuff_bytes(builder, hr.minimum_lcl), + sv_to_flatbuff_bytes(builder, hr.required_lcl)); + + flatbuffers::Offset message = CreateContent(builder, Message_History_Request_Message, hrmsg.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to sign it and place it inside a container message. + create_containermsg_from_content(container_builder, builder, nullptr, true); +} + +/** + * Create history response message from the given history response struct. + * @param container_builder Flatbuffer builder for the container message. + * @param hr The History response struct to be placed in the container message. + */ +void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr) +{ + flatbuffers::FlatBufferBuilder builder(1024); + + flatbuffers::Offset hrmsg = + CreateHistory_Response_Message( + builder, + historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers)); + + flatbuffers::Offset message = CreateContent(builder, Message_History_Response_Message, hrmsg.Union()); + builder.Finish(message); // Finished building message content to get serialised content. + + // Now that we have built the content message, + // we need to sign it and place it inside a container message. + create_containermsg_from_content(container_builder, builder, nullptr, true); +} + /** * Creates a Flatbuffer container message from the given Content message. * @param container_builder The Flatbuffer builder to which the final container message should be written to. @@ -346,4 +424,44 @@ usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, cons return builder.CreateVector(fbvec); } +const std::map +flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec) +{ + std::map map; + + for (const HistoryLedgerPair *pair : *fbvec) + { + std::list msglist; + + p2p::history_ledger ledger; + + ledger.lcl = flatbuff_bytes_to_sv(pair->ledger()->lcl()); + auto raw = pair->ledger()->raw_ledger(); + ledger.raw_ledger = std::vector(raw->begin(), raw->end()); + + map.emplace(pair->seq_no(), std::move(ledger)); + } + return map; +} + +const flatbuffers::Offset>> +historyledgermap_to_flatbuf_historyledgermap(flatbuffers::FlatBufferBuilder &builder, const std::map &map) +{ + std::vector> fbvec; + fbvec.reserve(map.size()); + for (auto const &[seq_no, ledger] : map) + { + flatbuffers::Offset history_ledger = CreateHistoryLedger( + builder, + sv_to_flatbuff_bytes(builder, ledger.lcl), + builder.CreateVector(ledger.raw_ledger)); + + fbvec.push_back(CreateHistoryLedgerPair( + builder, + seq_no, + history_ledger)); + } + return builder.CreateVector(fbvec); +} + } // namespace fbschema::p2pmsg \ No newline at end of file diff --git a/src/fbschema/p2pmsg_helpers.hpp b/src/fbschema/p2pmsg_helpers.hpp index 50ad2072..88735c92 100644 --- a/src/fbschema/p2pmsg_helpers.hpp +++ b/src/fbschema/p2pmsg_helpers.hpp @@ -25,12 +25,20 @@ const p2p::nonunl_proposal create_nonunl_proposal_from_msg(const NonUnl_Proposal const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, const uint64_t timestamp, const flatbuffers::Vector *lcl); +const p2p::history_request create_history_request_from_msg(const History_Request_Message &msg); + +const p2p::history_response create_history_response_from_msg(const History_Response_Message &msg); + //---Message creation helpers---// void create_msg_from_nonunl_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::nonunl_proposal &nup); void create_msg_from_proposal(flatbuffers::FlatBufferBuilder &container_builder, const p2p::proposal &p); +void create_msg_from_history_request(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_request &hr); + +void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_builder, const p2p::history_response &hr); + void create_msg_from_npl_output(flatbuffers::FlatBufferBuilder &container_builder, const p2p::npl_message &npl, std::string_view lcl); void create_containermsg_from_content( @@ -46,6 +54,12 @@ flatbuf_usermsgsmap_to_usermsgsmap(const flatbuffers::Vector>> usermsgsmap_to_flatbuf_usermsgsmap(flatbuffers::FlatBufferBuilder &builder, const std::unordered_map> &map); +const std::map +flatbuf_historyledgermap_to_historyledgermap(const flatbuffers::Vector> *fbvec); + +const flatbuffers::Offset>> +historyledgermap_to_flatbuf_historyledgermap(flatbuffers::FlatBufferBuilder &builder, const std::map &map); + } // namespace fbschema::p2pmsg #endif \ No newline at end of file diff --git a/src/main.cpp b/src/main.cpp index 81757547..346c1707 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -113,6 +113,9 @@ void std_terminate() noexcept int main(int argc, char **argv) { + //seed rand + srand(time(0)); + // Register exception handler for std exceptions. std::set_terminate(&std_terminate); diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 96bcfc81..cc298516 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -71,7 +71,8 @@ void peer_connection_watchdog() } } - util::sleep(conf::cfg.roundtime * 4); + //util::sleep(conf::cfg.roundtime * 4); + util::sleep(200); } } @@ -97,4 +98,38 @@ void broadcast_message(const peer_outbound_message msg, bool send_to_self) } } +/** + * Send the given message to a random peer from currently connected outbound peers. + * @param msg peer outbound message to be sent to peer + */ +void send_message_to_random_peer(peer_outbound_message msg) +{ + size_t connected_peers = ctx.peer_connections.size(); + if (connected_peers == 0) + { + LOG_DBG << "No peers to send (not even self)."; + return; + } + else if (connected_peers == 1 && ctx.peer_connections.begin()->second->is_self) + { + LOG_DBG << "Only self is connected."; + return; + } + + //Send while locking the peer_connections. + std::lock_guard lock(p2p::ctx.peer_connections_mutex); + + // Initialize random number generator with current timestamp. + int random_peer_index = (rand() % connected_peers); // select a random peer index. + auto it = ctx.peer_connections.begin(); + std::advance(it, random_peer_index); //move iterator to point to random selected peer. + + //send message to selecte peer. + auto session = it->second; + if (!session->is_self) + { + session->send(msg); + } +} + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 5d9b9df3..bcbc2991 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -8,7 +8,7 @@ namespace p2p { - + struct proposal { std::string pubkey; @@ -26,6 +26,23 @@ struct nonunl_proposal std::unordered_map> user_messages; }; +struct history_request +{ + std::string minimum_lcl; + std::string required_lcl; +}; + +struct history_ledger +{ + std::string lcl; + std::vector raw_ledger; +}; + +struct history_response +{ + std::map hist_ledgers; +}; + struct npl_message { std::string data; @@ -56,6 +73,7 @@ struct connected_context // Peer connection watchdog runs on this thread. std::thread peer_watchdog_thread; }; + extern connected_context ctx; struct listener_context @@ -85,6 +103,10 @@ void peer_connection_watchdog(); void broadcast_message(const peer_outbound_message msg, bool send_to_self); +void send_message_to_random_peer(peer_outbound_message msg); + +void send_message_to_peer(std::string peer_session_id, peer_outbound_message msg); + } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 90d678aa..3927fe2d 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -11,6 +11,7 @@ #include "../sock/socket_session.hpp" #include "p2p.hpp" #include "peer_session_handler.hpp" +#include "../cons/ledger_handler.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -109,6 +110,26 @@ void peer_session_handler::on_message(sock::socket_session(container_buf_ptr), container_buf_size); ctx.collected_msgs.npl_messages.push_back(std::move(npl_message)); } + else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message + { + LOG_DBG << "Received history request message type from peer."; + + const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); + //first check node has the required lcl available. -> if so send lcl history accordingly. + bool req_lcl_avail = cons::check_required_lcl_availability(hr); + if (req_lcl_avail > 0) + { + p2p::peer_outbound_message hr_msg = cons::send_ledger_history(hr); + session->send(hr_msg); + } + } + else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message + { + LOG_DBG << "Received history response message type from peer."; + + cons::handle_ledger_history_response( + p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message())); + } else { session->increment_metric(sock::SESSION_THRESHOLDS::MAX_BADMSGS_PER_MINUTE, 1);