From b598025346c2dbdd17ff874712e8df953c099773 Mon Sep 17 00:00:00 2001 From: Asanka Indrajith Date: Fri, 1 Nov 2019 05:51:25 -0400 Subject: [PATCH] Loading and saving ledger and lcl consensus. (#51) * Ledger saving, loading, consensus with ledger sequence number. * std terminate exception handler. --- CMakeLists.txt | 2 + src/cons/cons.cpp | 124 +++++++++++- src/cons/cons.hpp | 19 +- src/cons/ledger_handler.cpp | 108 +++++++++++ src/cons/ledger_handler.hpp | 21 ++ src/crypto.cpp | 2 +- src/fbschema/ledger_helpers.cpp | 30 +++ src/fbschema/ledger_helpers.hpp | 16 ++ src/fbschema/ledger_schema.fbs | 18 ++ src/fbschema/ledger_schema_generated.h | 248 ++++++++++++++++++++++++ src/fbschema/p2pmsg_content.fbs | 2 +- src/fbschema/p2pmsg_content_generated.h | 18 +- src/fbschema/p2pmsg_helpers.cpp | 3 +- src/fbschema/p2pmsg_helpers.hpp | 2 +- src/main.cpp | 55 +++++- src/p2p/p2p.cpp | 2 +- src/p2p/p2p.hpp | 6 +- src/p2p/peer_session_handler.cpp | 2 +- 18 files changed, 636 insertions(+), 42 deletions(-) create mode 100644 src/cons/ledger_handler.cpp create mode 100644 src/cons/ledger_handler.hpp create mode 100644 src/fbschema/ledger_helpers.cpp create mode 100644 src/fbschema/ledger_helpers.hpp create mode 100644 src/fbschema/ledger_schema.fbs create mode 100644 src/fbschema/ledger_schema_generated.h diff --git a/CMakeLists.txt b/CMakeLists.txt index b55cfc5c..e8d38c49 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -13,6 +13,7 @@ add_executable(hpcore src/hplog.cpp src/fbschema/common_helpers.cpp src/fbschema/p2pmsg_helpers.cpp + src/fbschema/ledger_helpers.cpp src/jsonschema/usrmsg_helpers.cpp src/sock/socket_client.cpp src/sock/socket_server.cpp @@ -24,6 +25,7 @@ add_executable(hpcore src/usr/usr.cpp src/proc.cpp src/cons/cons.cpp + src/cons/ledger_handler.cpp src/main.cpp ) diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 1daae7cb..8920bfb4 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -1,6 +1,8 @@ #include #include #include +#include +#include #include "../conf.hpp" #include "../usr/usr.hpp" #include "../p2p/p2p.hpp" @@ -9,6 +11,7 @@ #include "../hplog.hpp" #include "../crypto.hpp" #include "../proc.hpp" +#include "ledger_handler.hpp" #include "cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -33,6 +36,19 @@ void increment(std::map &counter, const T &candidate) counter.try_emplace(candidate, 1); } +int init() +{ + //set start stage + ctx.stage = 0; + + //load lcl detals from lcl history. + const ledger_history ldr_hist = load_ledger(); + ctx.led_seq_no = ldr_hist.led_seq_no; + ctx.lcl = ldr_hist.lcl; + + return 0; +} + void consensus() { // A consensus round consists of 4 stages (0,1,2,3). @@ -50,6 +66,22 @@ void consensus() ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals); } + std::cout << "Started stage " << std::to_string(ctx.stage) << "\n"; + for (auto p : ctx.candidate_proposals) + { + bool self = p.pubkey == conf::cfg.pubkey; + std::cout << "[stage" << std::to_string(p.stage) + << "] users:" << p.users.size() + << " rinp:" << p.raw_inputs.size() + << " hinp:" << p.hash_inputs.size() + << " rout:" << p.raw_outputs.size() + << " hout:" << p.hash_outputs.size() + << " lcl:" << p.lcl + << " self:" << self + << "\n"; + } + std::cout << "timenow:" << std::to_string(ctx.time_now) << "\n"; + if (ctx.stage == 0) { // Stage 0 means begining of a consensus round. @@ -87,16 +119,33 @@ void consensus() // Initialize vote counters vote_counter votes; - // check if we're ahead/behind of consensus - bool is_desync, reset_to_stage0; - int8_t majority_stage; - check_majority_stage(is_desync, reset_to_stage0, majority_stage, votes); - if (is_desync) + // check if we're ahead/behind of consensus stage + bool is_stage_desync, reset_to_stage0; + uint8_t majority_stage; + check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes); + if (is_stage_desync) { timewait_stage(reset_to_stage0); return; } + // check if we're ahead/behind of consensus lcl + bool is_lcl_desync, should_request_history; + std::string majority_lcl; + check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes); + + if (should_request_history) + { + //todo:create history request message and request request history from a random peer. + } + if (is_lcl_desync) + { + bool should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4); + //for now we are resetting to stage 0 to avoid possible deadlock situations + timewait_stage(true); + return; + } + // In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds. const p2p::proposal stg_prop = create_stage123_proposal(votes); broadcast_proposal(stg_prop); @@ -349,7 +398,7 @@ int broadcast_proposal(const p2p::proposal &p) /** * Check whether our current stage is ahead or behind of the majority stage. */ -void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes) +void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes) { // Stage votes. for (const p2p::proposal &cp : ctx.candidate_proposals) @@ -392,11 +441,70 @@ void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_ } } +/** + * Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes. + */ +void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes) +{ + // Stage votes. + int32_t total_lcl_votes = 0; + + for (const p2p::proposal &cp : ctx.candidate_proposals) + { + // only consider recent proposals and proposals from previous stage. + if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && (cp.stage == ctx.stage - 1)) + { + increment(votes.lcl, cp.lcl); + total_lcl_votes++; + } + } + + is_desync = false; + should_request_history = false; + + if (total_lcl_votes < (0.8 * conf::cfg.unl.size())) + { + LOG_DBG << "Not enough peers proposing to perform consensus" << std::to_string(total_lcl_votes) << " needed " << std::to_string(0.8 * conf::cfg.unl.size()); + is_desync = true; + return; + } + + int32_t winning_votes = 0; + for (const auto [lcl, votes] : votes.lcl) + { + if (votes > winning_votes) + { + winning_votes = votes; + majority_lcl = lcl; + } + } + + double wining_votes_unl_ratio = winning_votes / conf::cfg.unl.size(); + if (wining_votes_unl_ratio < 0.8) + { + // potential fork condition. + LOG_DBG << "No consensus on lcl. Possible fork condition."; + is_desync = true; + return; + } + + //if winning lcl is not matched node lcl, + //that means vode is not on the consensus ledger. + //Should request history from a peer. + if (ctx.lcl != majority_lcl) + { + LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer"; + is_desync = true; + //todo:create history request message and request request history from a random peer. + should_request_history = true; + return; + } +} /** * Returns the consensus percentage threshold for the specified stage. * @param stage The consensus stage [1, 2, 3] */ -float_t get_stage_threshold(int8_t stage) +float_t get_stage_threshold(uint8_t stage) { switch (stage) { @@ -425,6 +533,8 @@ void timewait_stage(bool reset) void apply_ledger(const p2p::proposal &cons_prop) { // todo:write lcl. + ctx.led_seq_no++; + ctx.lcl = cons::save_ledger(cons_prop, ctx.led_seq_no); // Send any output from the previous consensus round to users. for (const std::string &hash : cons_prop.hash_outputs) diff --git a/src/cons/cons.hpp b/src/cons/cons.hpp index b6a15517..0ced1e7f 100644 --- a/src/cons/cons.hpp +++ b/src/cons/cons.hpp @@ -26,10 +26,11 @@ struct consensus_context std::list candidate_proposals; std::unordered_map> candidate_users; - int8_t stage; - int64_t novel_proposal_time; - int64_t time_now; + uint8_t stage; + uint64_t novel_proposal_time; + uint64_t time_now; std::string lcl; + uint64_t led_seq_no; std::string novel_proposal; std::map> possible_inputs; @@ -42,8 +43,8 @@ struct consensus_context struct vote_counter { - std::map stage; - std::map time; + std::map stage; + std::map time; std::map lcl; std::map users; std::map inputs; @@ -52,11 +53,13 @@ struct vote_counter extern consensus_context ctx; +int init(); + void consensus(); void apply_ledger(const p2p::proposal &proposal); -float_t get_stage_threshold(int8_t stage); +float_t get_stage_threshold(uint8_t stage); void timewait_stage(bool reset); @@ -68,7 +71,9 @@ p2p::proposal create_stage123_proposal(vote_counter &votes); int broadcast_proposal(const p2p::proposal &p); -void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes); +void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes); + +void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes); void run_contract_binary(int64_t time); diff --git a/src/cons/ledger_handler.cpp b/src/cons/ledger_handler.cpp new file mode 100644 index 00000000..74a1b8f2 --- /dev/null +++ b/src/cons/ledger_handler.cpp @@ -0,0 +1,108 @@ + +#include +#include +#include +#include +#include "../conf.hpp" +#include "../crypto.hpp" +#include "../p2p/p2p.hpp" +#include "../fbschema/ledger_helpers.hpp" +#include "ledger_handler.hpp" + +namespace cons +{ + +const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no) +{ + //Serialize lcl using flatbuffer ledger schema. + flatbuffers::FlatBufferBuilder builder(1024); + const std::string_view ledger_str = fbschema::ledger::create_ledger_from_proposal(builder, proposal); + + //Get binary hash of the the serialized lcl. + const std::string lcl = crypto::get_hash(ledger_str); + + //Get hex from binary hash + std::string lcl_hash; + util::bin2hex(lcl_hash, + reinterpret_cast(lcl.data()), + lcl.size()); + + //create file path to save lcl. + //file name -> [ledger sequnce numer]-[lcl hex] + std::string path; + std::string seq_no = std::to_string(led_seq_no); + path.reserve(conf::ctx.histDir.size() + lcl_hash.size() + seq_no.size() + 6); + path.append(conf::ctx.histDir); + path.append("/"); + path.append(seq_no); + path.append("-"); + path.append(lcl_hash); + path.append(".lcl"); + + //write lcl to file system + std::ofstream ofs(std::move(path)); + ofs.write(ledger_str.data(), ledger_str.size()); + ofs.close(); + + return (lcl_hash); +} + +const ledger_history load_ledger() +{ + ledger_history ldg_hist; + ldg_hist.led_seq_no = 0; + // might need to load history in order to request response lcl history + //std::unordered_map lcl_history_files; + + //Get all records at lcl history direcory and find the last closed ledger. + std::string latest_file_name; + std::string::size_type latest_pos = 0; + for (auto &entry : boost::filesystem::directory_iterator(conf::ctx.histDir)) + { + const boost::filesystem::path file_path = entry.path(); + const std::string file_name = entry.path().filename().string(); + + if (boost::filesystem::is_directory(file_path)) + { + LOG_ERR << "Found directory " << file_name << " in " << conf::ctx.histDir << ". There should be no folders in this directory"; + } + else if (file_path.extension() != ".lcl") + { + LOG_ERR << "Found invalid file extension: " << file_path.extension() << " for lcl file " << file_name << " in " << conf::ctx.histDir; + } + else + { + std::string::size_type pos = file_name.find("-"); + uint64_t seq_no; + + if (pos != std::string::npos) + { + seq_no = std::stoull(file_name.substr(0, pos)); + } + else + { + //lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format. + LOG_ERR << "Invalid file name: " << file_name; + } + + if (seq_no > ldg_hist.led_seq_no) + { + ldg_hist.led_seq_no = seq_no; + latest_pos = pos; + latest_file_name = file_name; + } + } + } + + //check if there is a saved lcl file -> if no send genesis lcl. + if (latest_file_name.empty()) + ldg_hist.lcl = "genesis"; + else if ((latest_file_name.size() - 6) > latest_pos) //validation to check position is not the end of the file name. + ldg_hist.lcl = latest_file_name.substr(latest_pos + 1, (latest_file_name.size() - 6)); + else + LOG_ERR << "Invalid latest file name: " << latest_file_name; + + return ldg_hist; +} + +} // namespace cons \ No newline at end of file diff --git a/src/cons/ledger_handler.hpp b/src/cons/ledger_handler.hpp new file mode 100644 index 00000000..671ad6e9 --- /dev/null +++ b/src/cons/ledger_handler.hpp @@ -0,0 +1,21 @@ +#ifndef _HP_CONS_LEDGER_H_ +#define _HP_CONS_LEDGER_H_ + +#include "../p2p/p2p.hpp" + +namespace cons +{ + +struct ledger_history +{ + std::string lcl; + uint64_t led_seq_no; +}; + +const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no); + +const ledger_history load_ledger(); + +} + +#endif \ No newline at end of file diff --git a/src/crypto.cpp b/src/crypto.cpp index 4aae3965..74f92de0 100644 --- a/src/crypto.cpp +++ b/src/crypto.cpp @@ -142,7 +142,7 @@ std::string get_hash(std::string_view data) { unsigned char hashchars[crypto_generichash_BYTES]; crypto_generichash(hashchars, sizeof hashchars, (unsigned char *)data.data(), data.length(), NULL, 0); - return std::string(reinterpret_cast(hashchars), crypto_hash_sha512_BYTES); + return std::string(reinterpret_cast(hashchars), crypto_generichash_BYTES); } } // namespace crypto \ No newline at end of file diff --git a/src/fbschema/ledger_helpers.cpp b/src/fbschema/ledger_helpers.cpp new file mode 100644 index 00000000..69bd5d79 --- /dev/null +++ b/src/fbschema/ledger_helpers.cpp @@ -0,0 +1,30 @@ +#include +#include "ledger_schema_generated.h" +#include "../p2p/p2p.hpp" +#include "common_helpers.hpp" +#include "ledger_helpers.hpp" + +namespace fbschema::ledger +{ + +/** + * Create ledger from the given proposal struct. + * @param p The proposal struct to be placed in ledger. + */ +std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p) +{ + flatbuffers::Offset ledger = + ledger::CreateLedger( + builder, + p.time, + sv_to_flatbuff_bytes(builder, p.lcl), + stringlist_to_flatbuf_bytearrayvector(builder, p.users), 0, 0 + //p2p::hashbuffermap_to_flatbuf_rawinputs(builder, p.raw_inputs), + //stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs) + ); + + builder.Finish(ledger); // Finished building message content to get serialised content. + + return flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize()); +} +} // namespace fbschema diff --git a/src/fbschema/ledger_helpers.hpp b/src/fbschema/ledger_helpers.hpp new file mode 100644 index 00000000..1d9ae192 --- /dev/null +++ b/src/fbschema/ledger_helpers.hpp @@ -0,0 +1,16 @@ +#ifndef _HP_FBSCHEMA_LEDGER_HELPERS_H_ +#define _HP_FBSCHEMA_LEDGER_HELPERS_H_ + +#include +#include +#include +#include "ledger_schema_generated.h" +#include "../p2p/p2p.hpp" + +namespace fbschema::ledger +{ + +std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p); +} + +#endif \ No newline at end of file diff --git a/src/fbschema/ledger_schema.fbs b/src/fbschema/ledger_schema.fbs new file mode 100644 index 00000000..df07179d --- /dev/null +++ b/src/fbschema/ledger_schema.fbs @@ -0,0 +1,18 @@ +include "common_schema.fbs"; + +namespace fbschema.ledger; + +table Ledger { + time:uint64; + lcl:[ubyte]; + users: [ByteArray]; + inputs: [RawInputList]; + outputs: [ByteArray]; +} + +table RawInputList { + hash:[ubyte]; + inputs:[ByteArray]; +} + +root_type Ledger; \ No newline at end of file diff --git a/src/fbschema/ledger_schema_generated.h b/src/fbschema/ledger_schema_generated.h new file mode 100644 index 00000000..0d049856 --- /dev/null +++ b/src/fbschema/ledger_schema_generated.h @@ -0,0 +1,248 @@ +// automatically generated by the FlatBuffers compiler, do not modify + + +#ifndef FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ +#define FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ + +#include "flatbuffers/flatbuffers.h" + +#include "common_schema_generated.h" + +namespace fbschema { +namespace ledger { + +struct Ledger; + +struct RawInputList; + +struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_TIME = 4, + VT_LCL = 6, + VT_USERS = 8, + VT_INPUTS = 10, + VT_OUTPUTS = 12 + }; + uint64_t time() const { + return GetField(VT_TIME, 0); + } + bool mutate_time(uint64_t _time) { + return SetField(VT_TIME, _time, 0); + } + const flatbuffers::Vector *lcl() const { + return GetPointer *>(VT_LCL); + } + flatbuffers::Vector *mutable_lcl() { + return GetPointer *>(VT_LCL); + } + const flatbuffers::Vector> *users() const { + return GetPointer> *>(VT_USERS); + } + flatbuffers::Vector> *mutable_users() { + return GetPointer> *>(VT_USERS); + } + const flatbuffers::Vector> *inputs() const { + return GetPointer> *>(VT_INPUTS); + } + flatbuffers::Vector> *mutable_inputs() { + return GetPointer> *>(VT_INPUTS); + } + const flatbuffers::Vector> *outputs() const { + return GetPointer> *>(VT_OUTPUTS); + } + flatbuffers::Vector> *mutable_outputs() { + return GetPointer> *>(VT_OUTPUTS); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyField(verifier, VT_TIME) && + VerifyOffset(verifier, VT_LCL) && + verifier.VerifyVector(lcl()) && + VerifyOffset(verifier, VT_USERS) && + verifier.VerifyVector(users()) && + verifier.VerifyVectorOfTables(users()) && + VerifyOffset(verifier, VT_INPUTS) && + verifier.VerifyVector(inputs()) && + verifier.VerifyVectorOfTables(inputs()) && + VerifyOffset(verifier, VT_OUTPUTS) && + verifier.VerifyVector(outputs()) && + verifier.VerifyVectorOfTables(outputs()) && + verifier.EndTable(); + } +}; + +struct LedgerBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_time(uint64_t time) { + fbb_.AddElement(Ledger::VT_TIME, time, 0); + } + void add_lcl(flatbuffers::Offset> lcl) { + fbb_.AddOffset(Ledger::VT_LCL, lcl); + } + void add_users(flatbuffers::Offset>> users) { + fbb_.AddOffset(Ledger::VT_USERS, users); + } + void add_inputs(flatbuffers::Offset>> inputs) { + fbb_.AddOffset(Ledger::VT_INPUTS, inputs); + } + void add_outputs(flatbuffers::Offset>> outputs) { + fbb_.AddOffset(Ledger::VT_OUTPUTS, outputs); + } + explicit LedgerBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + LedgerBuilder &operator=(const LedgerBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateLedger( + flatbuffers::FlatBufferBuilder &_fbb, + uint64_t time = 0, + flatbuffers::Offset> lcl = 0, + flatbuffers::Offset>> users = 0, + flatbuffers::Offset>> inputs = 0, + flatbuffers::Offset>> outputs = 0) { + LedgerBuilder builder_(_fbb); + builder_.add_time(time); + builder_.add_outputs(outputs); + builder_.add_inputs(inputs); + builder_.add_users(users); + builder_.add_lcl(lcl); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateLedgerDirect( + flatbuffers::FlatBufferBuilder &_fbb, + uint64_t time = 0, + const std::vector *lcl = nullptr, + const std::vector> *users = nullptr, + const std::vector> *inputs = nullptr, + const std::vector> *outputs = nullptr) { + auto lcl__ = lcl ? _fbb.CreateVector(*lcl) : 0; + auto users__ = users ? _fbb.CreateVector>(*users) : 0; + auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; + auto outputs__ = outputs ? _fbb.CreateVector>(*outputs) : 0; + return fbschema::ledger::CreateLedger( + _fbb, + time, + lcl__, + users__, + inputs__, + outputs__); +} + +struct RawInputList FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { + VT_HASH = 4, + VT_INPUTS = 6 + }; + const flatbuffers::Vector *hash() const { + return GetPointer *>(VT_HASH); + } + flatbuffers::Vector *mutable_hash() { + return GetPointer *>(VT_HASH); + } + const flatbuffers::Vector> *inputs() const { + return GetPointer> *>(VT_INPUTS); + } + flatbuffers::Vector> *mutable_inputs() { + return GetPointer> *>(VT_INPUTS); + } + bool Verify(flatbuffers::Verifier &verifier) const { + return VerifyTableStart(verifier) && + VerifyOffset(verifier, VT_HASH) && + verifier.VerifyVector(hash()) && + VerifyOffset(verifier, VT_INPUTS) && + verifier.VerifyVector(inputs()) && + verifier.VerifyVectorOfTables(inputs()) && + verifier.EndTable(); + } +}; + +struct RawInputListBuilder { + flatbuffers::FlatBufferBuilder &fbb_; + flatbuffers::uoffset_t start_; + void add_hash(flatbuffers::Offset> hash) { + fbb_.AddOffset(RawInputList::VT_HASH, hash); + } + void add_inputs(flatbuffers::Offset>> inputs) { + fbb_.AddOffset(RawInputList::VT_INPUTS, inputs); + } + explicit RawInputListBuilder(flatbuffers::FlatBufferBuilder &_fbb) + : fbb_(_fbb) { + start_ = fbb_.StartTable(); + } + RawInputListBuilder &operator=(const RawInputListBuilder &); + flatbuffers::Offset Finish() { + const auto end = fbb_.EndTable(start_); + auto o = flatbuffers::Offset(end); + return o; + } +}; + +inline flatbuffers::Offset CreateRawInputList( + flatbuffers::FlatBufferBuilder &_fbb, + flatbuffers::Offset> hash = 0, + flatbuffers::Offset>> inputs = 0) { + RawInputListBuilder builder_(_fbb); + builder_.add_inputs(inputs); + builder_.add_hash(hash); + return builder_.Finish(); +} + +inline flatbuffers::Offset CreateRawInputListDirect( + flatbuffers::FlatBufferBuilder &_fbb, + const std::vector *hash = nullptr, + const std::vector> *inputs = nullptr) { + auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; + auto inputs__ = inputs ? _fbb.CreateVector>(*inputs) : 0; + return fbschema::ledger::CreateRawInputList( + _fbb, + hash__, + inputs__); +} + +inline const fbschema::ledger::Ledger *GetLedger(const void *buf) { + return flatbuffers::GetRoot(buf); +} + +inline const fbschema::ledger::Ledger *GetSizePrefixedLedger(const void *buf) { + return flatbuffers::GetSizePrefixedRoot(buf); +} + +inline Ledger *GetMutableLedger(void *buf) { + return flatbuffers::GetMutableRoot(buf); +} + +inline bool VerifyLedgerBuffer( + flatbuffers::Verifier &verifier) { + return verifier.VerifyBuffer(nullptr); +} + +inline bool VerifySizePrefixedLedgerBuffer( + flatbuffers::Verifier &verifier) { + return verifier.VerifySizePrefixedBuffer(nullptr); +} + +inline void FinishLedgerBuffer( + flatbuffers::FlatBufferBuilder &fbb, + flatbuffers::Offset root) { + fbb.Finish(root); +} + +inline void FinishSizePrefixedLedgerBuffer( + flatbuffers::FlatBufferBuilder &fbb, + flatbuffers::Offset root) { + fbb.FinishSizePrefixed(root); +} + +} // namespace ledger +} // namespace fbschema + +#endif // FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_ diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index d89848e4..c44ba664 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -19,7 +19,7 @@ table Content { } table Proposal_Message { //Proposal type message schema - stage: int8; + stage:uint8; time:uint64; lcl:[ubyte]; users: [ByteArray]; diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index a72f97fa..b09e8a99 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -297,11 +297,11 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { VT_HASH_OUTPUTS = 18, VT_STATE = 20 }; - int8_t stage() const { - return GetField(VT_STAGE, 0); + uint8_t stage() const { + return GetField(VT_STAGE, 0); } - bool mutate_stage(int8_t _stage) { - return SetField(VT_STAGE, _stage, 0); + bool mutate_stage(uint8_t _stage) { + return SetField(VT_STAGE, _stage, 0); } uint64_t time() const { return GetField(VT_TIME, 0); @@ -353,7 +353,7 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyField(verifier, VT_STAGE) && + VerifyField(verifier, VT_STAGE) && VerifyField(verifier, VT_TIME) && VerifyOffset(verifier, VT_LCL) && verifier.VerifyVector(lcl()) && @@ -381,8 +381,8 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { struct Proposal_MessageBuilder { flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_stage(int8_t stage) { - fbb_.AddElement(Proposal_Message::VT_STAGE, stage, 0); + void add_stage(uint8_t stage) { + fbb_.AddElement(Proposal_Message::VT_STAGE, stage, 0); } void add_time(uint64_t time) { fbb_.AddElement(Proposal_Message::VT_TIME, time, 0); @@ -422,7 +422,7 @@ struct Proposal_MessageBuilder { inline flatbuffers::Offset CreateProposal_Message( flatbuffers::FlatBufferBuilder &_fbb, - int8_t stage = 0, + uint8_t stage = 0, uint64_t time = 0, flatbuffers::Offset> lcl = 0, flatbuffers::Offset>> users = 0, @@ -446,7 +446,7 @@ inline flatbuffers::Offset CreateProposal_Message( inline flatbuffers::Offset CreateProposal_MessageDirect( flatbuffers::FlatBufferBuilder &_fbb, - int8_t stage = 0, + uint8_t stage = 0, uint64_t time = 0, const std::vector *lcl = nullptr, const std::vector> *users = nullptr, diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index 30d8ce77..2b83af4a 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -144,11 +144,12 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con * @param The Flatbuffer poporal received from the peer. * @return A proposal struct representing the message. */ -const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey) +const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, uint64_t timestamp) { p2p::proposal p; p.pubkey = flatbuff_bytes_to_sv(pubkey); + p.timestamp = timestamp; p.time = msg.time(); p.stage = msg.stage(); diff --git a/src/fbschema/p2pmsg_helpers.hpp b/src/fbschema/p2pmsg_helpers.hpp index 75484506..9a21c1e6 100644 --- a/src/fbschema/p2pmsg_helpers.hpp +++ b/src/fbschema/p2pmsg_helpers.hpp @@ -21,7 +21,7 @@ int validate_container_trust(const Container *container); int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size); -const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey); +const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector *pubkey, uint64_t timestamp); //---Message creation helpers---// diff --git a/src/main.cpp b/src/main.cpp index 678d1800..10f80f86 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -78,8 +78,47 @@ void signal_handler(int signum) exit(signum); } +/** + * Global exception handler for boost exceptions. + */ +void boost::throw_exception(std::exception const &e) +{ + LOG_ERR << "Boost error:" << e.what(); + exit(1); +} + +/** + * Global exception handler for std exceptions. + */ +void std_terminate() noexcept +{ + std::exception_ptr exptr = std::current_exception(); + if (exptr != 0) + { + try + { + std::rethrow_exception(exptr); + } + catch (std::exception &ex) + { + LOG_ERR << "std error: " << ex.what(); + } + catch (...) + { + LOG_ERR << "std error: Terminated due to unknown exception"; + } + } + else + { + LOG_ERR << "std error: Terminated due to unknown reason"; + } + + exit(1); +} + int main(int argc, char **argv) { + std::set_terminate(&std_terminate); // Extract the CLI args // This call will populate conf::ctx if (parse_cmd(argc, argv) != 0) @@ -129,11 +168,15 @@ int main(int argc, char **argv) if (usr::init() != 0) return -1; + if (cons::init() != 0) + return 1; + // After initializing primary subsystems, register the SIGINT handler. signal(SIGINT, signal_handler); - cons::ctx.stage = 0; - cons::ctx.lcl = "static_lcl"; + //we are waiting for peer to estasblish peer connections. + //otherwise we'll run into not enough peers propsing/stage desync deadlock directly now. + sleep(3); while (true) { @@ -150,11 +193,3 @@ int main(int argc, char **argv) return 0; } -/** - * Global exception handler for boost exceptions. - */ -void boost::throw_exception(std::exception const &e) -{ - LOG_ERR << "Boost error:" << e.what(); - exit(-1); -} \ No newline at end of file diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 87dc007d..b22f25df 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -116,7 +116,7 @@ void peer_connection_watchdog() } } - std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime * 4)); + std::this_thread::sleep_for(std::chrono::milliseconds(200)); } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index f728dd4e..7a290fa5 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -14,9 +14,9 @@ namespace p2p struct proposal { std::string pubkey; - int64_t timestamp; - int64_t time; - int8_t stage; + uint64_t timestamp; + uint64_t time; + uint8_t stage; std::string lcl; std::unordered_set users; std::unordered_map> raw_inputs; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 0b87184a..cf1457ff 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -88,7 +88,7 @@ void peer_session_handler::on_message(sock::socket_session lock(collected_msgs.proposals_mutex); // Insert proposal with lock. collected_msgs.proposals.push_back( - p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey())); + p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp())); } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message {