Loading and saving ledger and lcl consensus. (#51)

* Ledger saving, loading, consensus with ledger sequence number.
* std terminate exception handler.
This commit is contained in:
Asanka Indrajith
2019-11-01 05:51:25 -04:00
committed by Ravin Perera
parent b8ffb83869
commit b598025346
18 changed files with 636 additions and 42 deletions

View File

@@ -13,6 +13,7 @@ add_executable(hpcore
src/hplog.cpp
src/fbschema/common_helpers.cpp
src/fbschema/p2pmsg_helpers.cpp
src/fbschema/ledger_helpers.cpp
src/jsonschema/usrmsg_helpers.cpp
src/sock/socket_client.cpp
src/sock/socket_server.cpp
@@ -24,6 +25,7 @@ add_executable(hpcore
src/usr/usr.cpp
src/proc.cpp
src/cons/cons.cpp
src/cons/ledger_handler.cpp
src/main.cpp
)

View File

@@ -1,6 +1,8 @@
#include <math.h>
#include <thread>
#include <flatbuffers/flatbuffers.h>
#include <boost/filesystem.hpp>
#include <fstream>
#include "../conf.hpp"
#include "../usr/usr.hpp"
#include "../p2p/p2p.hpp"
@@ -9,6 +11,7 @@
#include "../hplog.hpp"
#include "../crypto.hpp"
#include "../proc.hpp"
#include "ledger_handler.hpp"
#include "cons.hpp"
namespace p2pmsg = fbschema::p2pmsg;
@@ -33,6 +36,19 @@ void increment(std::map<T, int32_t> &counter, const T &candidate)
counter.try_emplace(candidate, 1);
}
int init()
{
//set start stage
ctx.stage = 0;
//load lcl detals from lcl history.
const ledger_history ldr_hist = load_ledger();
ctx.led_seq_no = ldr_hist.led_seq_no;
ctx.lcl = ldr_hist.lcl;
return 0;
}
void consensus()
{
// A consensus round consists of 4 stages (0,1,2,3).
@@ -50,6 +66,22 @@ void consensus()
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals);
}
std::cout << "Started stage " << std::to_string(ctx.stage) << "\n";
for (auto p : ctx.candidate_proposals)
{
bool self = p.pubkey == conf::cfg.pubkey;
std::cout << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " rinp:" << p.raw_inputs.size()
<< " hinp:" << p.hash_inputs.size()
<< " rout:" << p.raw_outputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
<< " self:" << self
<< "\n";
}
std::cout << "timenow:" << std::to_string(ctx.time_now) << "\n";
if (ctx.stage == 0)
{
// Stage 0 means begining of a consensus round.
@@ -87,16 +119,33 @@ void consensus()
// Initialize vote counters
vote_counter votes;
// check if we're ahead/behind of consensus
bool is_desync, reset_to_stage0;
int8_t majority_stage;
check_majority_stage(is_desync, reset_to_stage0, majority_stage, votes);
if (is_desync)
// check if we're ahead/behind of consensus stage
bool is_stage_desync, reset_to_stage0;
uint8_t majority_stage;
check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes);
if (is_stage_desync)
{
timewait_stage(reset_to_stage0);
return;
}
// check if we're ahead/behind of consensus lcl
bool is_lcl_desync, should_request_history;
std::string majority_lcl;
check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes);
if (should_request_history)
{
//todo:create history request message and request request history from a random peer.
}
if (is_lcl_desync)
{
bool should_reset = (ctx.time_now - ctx.novel_proposal_time) < floor(conf::cfg.roundtime / 4);
//for now we are resetting to stage 0 to avoid possible deadlock situations
timewait_stage(true);
return;
}
// In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds.
const p2p::proposal stg_prop = create_stage123_proposal(votes);
broadcast_proposal(stg_prop);
@@ -349,7 +398,7 @@ int broadcast_proposal(const p2p::proposal &p)
/**
* Check whether our current stage is ahead or behind of the majority stage.
*/
void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes)
void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes)
{
// Stage votes.
for (const p2p::proposal &cp : ctx.candidate_proposals)
@@ -392,11 +441,70 @@ void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_
}
}
/**
* Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes.
*/
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes)
{
// Stage votes.
int32_t total_lcl_votes = 0;
for (const p2p::proposal &cp : ctx.candidate_proposals)
{
// only consider recent proposals and proposals from previous stage.
if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && (cp.stage == ctx.stage - 1))
{
increment(votes.lcl, cp.lcl);
total_lcl_votes++;
}
}
is_desync = false;
should_request_history = false;
if (total_lcl_votes < (0.8 * conf::cfg.unl.size()))
{
LOG_DBG << "Not enough peers proposing to perform consensus" << std::to_string(total_lcl_votes) << " needed " << std::to_string(0.8 * conf::cfg.unl.size());
is_desync = true;
return;
}
int32_t winning_votes = 0;
for (const auto [lcl, votes] : votes.lcl)
{
if (votes > winning_votes)
{
winning_votes = votes;
majority_lcl = lcl;
}
}
double wining_votes_unl_ratio = winning_votes / conf::cfg.unl.size();
if (wining_votes_unl_ratio < 0.8)
{
// potential fork condition.
LOG_DBG << "No consensus on lcl. Possible fork condition.";
is_desync = true;
return;
}
//if winning lcl is not matched node lcl,
//that means vode is not on the consensus ledger.
//Should request history from a peer.
if (ctx.lcl != majority_lcl)
{
LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer";
is_desync = true;
//todo:create history request message and request request history from a random peer.
should_request_history = true;
return;
}
}
/**
* Returns the consensus percentage threshold for the specified stage.
* @param stage The consensus stage [1, 2, 3]
*/
float_t get_stage_threshold(int8_t stage)
float_t get_stage_threshold(uint8_t stage)
{
switch (stage)
{
@@ -425,6 +533,8 @@ void timewait_stage(bool reset)
void apply_ledger(const p2p::proposal &cons_prop)
{
// todo:write lcl.
ctx.led_seq_no++;
ctx.lcl = cons::save_ledger(cons_prop, ctx.led_seq_no);
// Send any output from the previous consensus round to users.
for (const std::string &hash : cons_prop.hash_outputs)

View File

@@ -26,10 +26,11 @@ struct consensus_context
std::list<p2p::proposal> candidate_proposals;
std::unordered_map<std::string, std::list<util::hash_buffer>> candidate_users;
int8_t stage;
int64_t novel_proposal_time;
int64_t time_now;
uint8_t stage;
uint64_t novel_proposal_time;
uint64_t time_now;
std::string lcl;
uint64_t led_seq_no;
std::string novel_proposal;
std::map<std::string, std::pair<const std::string, std::string>> possible_inputs;
@@ -42,8 +43,8 @@ struct consensus_context
struct vote_counter
{
std::map<int8_t, int32_t> stage;
std::map<int64_t, int32_t> time;
std::map<uint8_t, int32_t> stage;
std::map<uint64_t, int32_t> time;
std::map<std::string, int32_t> lcl;
std::map<std::string, int32_t> users;
std::map<std::string, int32_t> inputs;
@@ -52,11 +53,13 @@ struct vote_counter
extern consensus_context ctx;
int init();
void consensus();
void apply_ledger(const p2p::proposal &proposal);
float_t get_stage_threshold(int8_t stage);
float_t get_stage_threshold(uint8_t stage);
void timewait_stage(bool reset);
@@ -68,7 +71,9 @@ p2p::proposal create_stage123_proposal(vote_counter &votes);
int broadcast_proposal(const p2p::proposal &p);
void check_majority_stage(bool &is_desync, bool &should_reset, int8_t &majority_stage, vote_counter &votes);
void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
void run_contract_binary(int64_t time);

108
src/cons/ledger_handler.cpp Normal file
View File

@@ -0,0 +1,108 @@
#include <flatbuffers/flatbuffers.h>
#include <iostream>
#include <fstream>
#include <boost/filesystem.hpp>
#include "../conf.hpp"
#include "../crypto.hpp"
#include "../p2p/p2p.hpp"
#include "../fbschema/ledger_helpers.hpp"
#include "ledger_handler.hpp"
namespace cons
{
const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no)
{
//Serialize lcl using flatbuffer ledger schema.
flatbuffers::FlatBufferBuilder builder(1024);
const std::string_view ledger_str = fbschema::ledger::create_ledger_from_proposal(builder, proposal);
//Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(ledger_str);
//Get hex from binary hash
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
//create file path to save lcl.
//file name -> [ledger sequnce numer]-[lcl hex]
std::string path;
std::string seq_no = std::to_string(led_seq_no);
path.reserve(conf::ctx.histDir.size() + lcl_hash.size() + seq_no.size() + 6);
path.append(conf::ctx.histDir);
path.append("/");
path.append(seq_no);
path.append("-");
path.append(lcl_hash);
path.append(".lcl");
//write lcl to file system
std::ofstream ofs(std::move(path));
ofs.write(ledger_str.data(), ledger_str.size());
ofs.close();
return (lcl_hash);
}
const ledger_history load_ledger()
{
ledger_history ldg_hist;
ldg_hist.led_seq_no = 0;
// might need to load history in order to request response lcl history
//std::unordered_map<std::string, std::string_view> lcl_history_files;
//Get all records at lcl history direcory and find the last closed ledger.
std::string latest_file_name;
std::string::size_type latest_pos = 0;
for (auto &entry : boost::filesystem::directory_iterator(conf::ctx.histDir))
{
const boost::filesystem::path file_path = entry.path();
const std::string file_name = entry.path().filename().string();
if (boost::filesystem::is_directory(file_path))
{
LOG_ERR << "Found directory " << file_name << " in " << conf::ctx.histDir << ". There should be no folders in this directory";
}
else if (file_path.extension() != ".lcl")
{
LOG_ERR << "Found invalid file extension: " << file_path.extension() << " for lcl file " << file_name << " in " << conf::ctx.histDir;
}
else
{
std::string::size_type pos = file_name.find("-");
uint64_t seq_no;
if (pos != std::string::npos)
{
seq_no = std::stoull(file_name.substr(0, pos));
}
else
{
//lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format.
LOG_ERR << "Invalid file name: " << file_name;
}
if (seq_no > ldg_hist.led_seq_no)
{
ldg_hist.led_seq_no = seq_no;
latest_pos = pos;
latest_file_name = file_name;
}
}
}
//check if there is a saved lcl file -> if no send genesis lcl.
if (latest_file_name.empty())
ldg_hist.lcl = "genesis";
else if ((latest_file_name.size() - 6) > latest_pos) //validation to check position is not the end of the file name.
ldg_hist.lcl = latest_file_name.substr(latest_pos + 1, (latest_file_name.size() - 6));
else
LOG_ERR << "Invalid latest file name: " << latest_file_name;
return ldg_hist;
}
} // namespace cons

View File

@@ -0,0 +1,21 @@
#ifndef _HP_CONS_LEDGER_H_
#define _HP_CONS_LEDGER_H_
#include "../p2p/p2p.hpp"
namespace cons
{
struct ledger_history
{
std::string lcl;
uint64_t led_seq_no;
};
const std::string save_ledger(const p2p::proposal &proposal, const uint64_t led_seq_no);
const ledger_history load_ledger();
}
#endif

View File

@@ -142,7 +142,7 @@ std::string get_hash(std::string_view data)
{
unsigned char hashchars[crypto_generichash_BYTES];
crypto_generichash(hashchars, sizeof hashchars, (unsigned char *)data.data(), data.length(), NULL, 0);
return std::string(reinterpret_cast<char *>(hashchars), crypto_hash_sha512_BYTES);
return std::string(reinterpret_cast<char *>(hashchars), crypto_generichash_BYTES);
}
} // namespace crypto

View File

@@ -0,0 +1,30 @@
#include <flatbuffers/flatbuffers.h>
#include "ledger_schema_generated.h"
#include "../p2p/p2p.hpp"
#include "common_helpers.hpp"
#include "ledger_helpers.hpp"
namespace fbschema::ledger
{
/**
* Create ledger from the given proposal struct.
* @param p The proposal struct to be placed in ledger.
*/
std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p)
{
flatbuffers::Offset<ledger::Ledger> ledger =
ledger::CreateLedger(
builder,
p.time,
sv_to_flatbuff_bytes(builder, p.lcl),
stringlist_to_flatbuf_bytearrayvector(builder, p.users), 0, 0
//p2p::hashbuffermap_to_flatbuf_rawinputs(builder, p.raw_inputs),
//stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs)
);
builder.Finish(ledger); // Finished building message content to get serialised content.
return flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize());
}
} // namespace fbschema

View File

@@ -0,0 +1,16 @@
#ifndef _HP_FBSCHEMA_LEDGER_HELPERS_H_
#define _HP_FBSCHEMA_LEDGER_HELPERS_H_
#include <string>
#include <unordered_set>
#include <flatbuffers/flatbuffers.h>
#include "ledger_schema_generated.h"
#include "../p2p/p2p.hpp"
namespace fbschema::ledger
{
std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p);
}
#endif

View File

@@ -0,0 +1,18 @@
include "common_schema.fbs";
namespace fbschema.ledger;
table Ledger {
time:uint64;
lcl:[ubyte];
users: [ByteArray];
inputs: [RawInputList];
outputs: [ByteArray];
}
table RawInputList {
hash:[ubyte];
inputs:[ByteArray];
}
root_type Ledger;

View File

@@ -0,0 +1,248 @@
// automatically generated by the FlatBuffers compiler, do not modify
#ifndef FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_
#define FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_
#include "flatbuffers/flatbuffers.h"
#include "common_schema_generated.h"
namespace fbschema {
namespace ledger {
struct Ledger;
struct RawInputList;
struct Ledger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_TIME = 4,
VT_LCL = 6,
VT_USERS = 8,
VT_INPUTS = 10,
VT_OUTPUTS = 12
};
uint64_t time() const {
return GetField<uint64_t>(VT_TIME, 0);
}
bool mutate_time(uint64_t _time) {
return SetField<uint64_t>(VT_TIME, _time, 0);
}
const flatbuffers::Vector<uint8_t> *lcl() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_LCL);
}
flatbuffers::Vector<uint8_t> *mutable_lcl() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_LCL);
}
const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *users() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_USERS);
}
flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *mutable_users() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_USERS);
}
const flatbuffers::Vector<flatbuffers::Offset<RawInputList>> *inputs() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<RawInputList>> *>(VT_INPUTS);
}
flatbuffers::Vector<flatbuffers::Offset<RawInputList>> *mutable_inputs() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<RawInputList>> *>(VT_INPUTS);
}
const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *outputs() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_OUTPUTS);
}
flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *mutable_outputs() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_OUTPUTS);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<uint64_t>(verifier, VT_TIME) &&
VerifyOffset(verifier, VT_LCL) &&
verifier.VerifyVector(lcl()) &&
VerifyOffset(verifier, VT_USERS) &&
verifier.VerifyVector(users()) &&
verifier.VerifyVectorOfTables(users()) &&
VerifyOffset(verifier, VT_INPUTS) &&
verifier.VerifyVector(inputs()) &&
verifier.VerifyVectorOfTables(inputs()) &&
VerifyOffset(verifier, VT_OUTPUTS) &&
verifier.VerifyVector(outputs()) &&
verifier.VerifyVectorOfTables(outputs()) &&
verifier.EndTable();
}
};
struct LedgerBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_time(uint64_t time) {
fbb_.AddElement<uint64_t>(Ledger::VT_TIME, time, 0);
}
void add_lcl(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lcl) {
fbb_.AddOffset(Ledger::VT_LCL, lcl);
}
void add_users(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> users) {
fbb_.AddOffset(Ledger::VT_USERS, users);
}
void add_inputs(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<RawInputList>>> inputs) {
fbb_.AddOffset(Ledger::VT_INPUTS, inputs);
}
void add_outputs(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> outputs) {
fbb_.AddOffset(Ledger::VT_OUTPUTS, outputs);
}
explicit LedgerBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
LedgerBuilder &operator=(const LedgerBuilder &);
flatbuffers::Offset<Ledger> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<Ledger>(end);
return o;
}
};
inline flatbuffers::Offset<Ledger> CreateLedger(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t time = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lcl = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> users = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<RawInputList>>> inputs = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> outputs = 0) {
LedgerBuilder builder_(_fbb);
builder_.add_time(time);
builder_.add_outputs(outputs);
builder_.add_inputs(inputs);
builder_.add_users(users);
builder_.add_lcl(lcl);
return builder_.Finish();
}
inline flatbuffers::Offset<Ledger> CreateLedgerDirect(
flatbuffers::FlatBufferBuilder &_fbb,
uint64_t time = 0,
const std::vector<uint8_t> *lcl = nullptr,
const std::vector<flatbuffers::Offset<fbschema::ByteArray>> *users = nullptr,
const std::vector<flatbuffers::Offset<RawInputList>> *inputs = nullptr,
const std::vector<flatbuffers::Offset<fbschema::ByteArray>> *outputs = nullptr) {
auto lcl__ = lcl ? _fbb.CreateVector<uint8_t>(*lcl) : 0;
auto users__ = users ? _fbb.CreateVector<flatbuffers::Offset<fbschema::ByteArray>>(*users) : 0;
auto inputs__ = inputs ? _fbb.CreateVector<flatbuffers::Offset<RawInputList>>(*inputs) : 0;
auto outputs__ = outputs ? _fbb.CreateVector<flatbuffers::Offset<fbschema::ByteArray>>(*outputs) : 0;
return fbschema::ledger::CreateLedger(
_fbb,
time,
lcl__,
users__,
inputs__,
outputs__);
}
struct RawInputList FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_HASH = 4,
VT_INPUTS = 6
};
const flatbuffers::Vector<uint8_t> *hash() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
flatbuffers::Vector<uint8_t> *mutable_hash() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_HASH);
}
const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *inputs() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_INPUTS);
}
flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *mutable_inputs() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>> *>(VT_INPUTS);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_HASH) &&
verifier.VerifyVector(hash()) &&
VerifyOffset(verifier, VT_INPUTS) &&
verifier.VerifyVector(inputs()) &&
verifier.VerifyVectorOfTables(inputs()) &&
verifier.EndTable();
}
};
struct RawInputListBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_hash(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash) {
fbb_.AddOffset(RawInputList::VT_HASH, hash);
}
void add_inputs(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> inputs) {
fbb_.AddOffset(RawInputList::VT_INPUTS, inputs);
}
explicit RawInputListBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
RawInputListBuilder &operator=(const RawInputListBuilder &);
flatbuffers::Offset<RawInputList> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<RawInputList>(end);
return o;
}
};
inline flatbuffers::Offset<RawInputList> CreateRawInputList(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> hash = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> inputs = 0) {
RawInputListBuilder builder_(_fbb);
builder_.add_inputs(inputs);
builder_.add_hash(hash);
return builder_.Finish();
}
inline flatbuffers::Offset<RawInputList> CreateRawInputListDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<uint8_t> *hash = nullptr,
const std::vector<flatbuffers::Offset<fbschema::ByteArray>> *inputs = nullptr) {
auto hash__ = hash ? _fbb.CreateVector<uint8_t>(*hash) : 0;
auto inputs__ = inputs ? _fbb.CreateVector<flatbuffers::Offset<fbschema::ByteArray>>(*inputs) : 0;
return fbschema::ledger::CreateRawInputList(
_fbb,
hash__,
inputs__);
}
inline const fbschema::ledger::Ledger *GetLedger(const void *buf) {
return flatbuffers::GetRoot<fbschema::ledger::Ledger>(buf);
}
inline const fbschema::ledger::Ledger *GetSizePrefixedLedger(const void *buf) {
return flatbuffers::GetSizePrefixedRoot<fbschema::ledger::Ledger>(buf);
}
inline Ledger *GetMutableLedger(void *buf) {
return flatbuffers::GetMutableRoot<Ledger>(buf);
}
inline bool VerifyLedgerBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifyBuffer<fbschema::ledger::Ledger>(nullptr);
}
inline bool VerifySizePrefixedLedgerBuffer(
flatbuffers::Verifier &verifier) {
return verifier.VerifySizePrefixedBuffer<fbschema::ledger::Ledger>(nullptr);
}
inline void FinishLedgerBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<fbschema::ledger::Ledger> root) {
fbb.Finish(root);
}
inline void FinishSizePrefixedLedgerBuffer(
flatbuffers::FlatBufferBuilder &fbb,
flatbuffers::Offset<fbschema::ledger::Ledger> root) {
fbb.FinishSizePrefixed(root);
}
} // namespace ledger
} // namespace fbschema
#endif // FLATBUFFERS_GENERATED_LEDGERSCHEMA_FBSCHEMA_LEDGER_H_

View File

@@ -19,7 +19,7 @@ table Content {
}
table Proposal_Message { //Proposal type message schema
stage: int8;
stage:uint8;
time:uint64;
lcl:[ubyte];
users: [ByteArray];

View File

@@ -297,11 +297,11 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
VT_HASH_OUTPUTS = 18,
VT_STATE = 20
};
int8_t stage() const {
return GetField<int8_t>(VT_STAGE, 0);
uint8_t stage() const {
return GetField<uint8_t>(VT_STAGE, 0);
}
bool mutate_stage(int8_t _stage) {
return SetField<int8_t>(VT_STAGE, _stage, 0);
bool mutate_stage(uint8_t _stage) {
return SetField<uint8_t>(VT_STAGE, _stage, 0);
}
uint64_t time() const {
return GetField<uint64_t>(VT_TIME, 0);
@@ -353,7 +353,7 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyField<int8_t>(verifier, VT_STAGE) &&
VerifyField<uint8_t>(verifier, VT_STAGE) &&
VerifyField<uint64_t>(verifier, VT_TIME) &&
VerifyOffset(verifier, VT_LCL) &&
verifier.VerifyVector(lcl()) &&
@@ -381,8 +381,8 @@ struct Proposal_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
struct Proposal_MessageBuilder {
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_stage(int8_t stage) {
fbb_.AddElement<int8_t>(Proposal_Message::VT_STAGE, stage, 0);
void add_stage(uint8_t stage) {
fbb_.AddElement<uint8_t>(Proposal_Message::VT_STAGE, stage, 0);
}
void add_time(uint64_t time) {
fbb_.AddElement<uint64_t>(Proposal_Message::VT_TIME, time, 0);
@@ -422,7 +422,7 @@ struct Proposal_MessageBuilder {
inline flatbuffers::Offset<Proposal_Message> CreateProposal_Message(
flatbuffers::FlatBufferBuilder &_fbb,
int8_t stage = 0,
uint8_t stage = 0,
uint64_t time = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lcl = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<fbschema::ByteArray>>> users = 0,
@@ -446,7 +446,7 @@ inline flatbuffers::Offset<Proposal_Message> CreateProposal_Message(
inline flatbuffers::Offset<Proposal_Message> CreateProposal_MessageDirect(
flatbuffers::FlatBufferBuilder &_fbb,
int8_t stage = 0,
uint8_t stage = 0,
uint64_t time = 0,
const std::vector<uint8_t> *lcl = nullptr,
const std::vector<flatbuffers::Offset<fbschema::ByteArray>> *users = nullptr,

View File

@@ -144,11 +144,12 @@ int validate_and_extract_content(const Content **content_ref, const uint8_t *con
* @param The Flatbuffer poporal received from the peer.
* @return A proposal struct representing the message.
*/
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey)
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, uint64_t timestamp)
{
p2p::proposal p;
p.pubkey = flatbuff_bytes_to_sv(pubkey);
p.timestamp = timestamp;
p.time = msg.time();
p.stage = msg.stage();

View File

@@ -21,7 +21,7 @@ int validate_container_trust(const Container *container);
int validate_and_extract_content(const Content **content_ref, const uint8_t *content_ptr, flatbuffers::uoffset_t content_size);
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey);
const p2p::proposal create_proposal_from_msg(const Proposal_Message &msg, const flatbuffers::Vector<uint8_t> *pubkey, uint64_t timestamp);
//---Message creation helpers---//

View File

@@ -78,8 +78,47 @@ void signal_handler(int signum)
exit(signum);
}
/**
* Global exception handler for boost exceptions.
*/
void boost::throw_exception(std::exception const &e)
{
LOG_ERR << "Boost error:" << e.what();
exit(1);
}
/**
* Global exception handler for std exceptions.
*/
void std_terminate() noexcept
{
std::exception_ptr exptr = std::current_exception();
if (exptr != 0)
{
try
{
std::rethrow_exception(exptr);
}
catch (std::exception &ex)
{
LOG_ERR << "std error: " << ex.what();
}
catch (...)
{
LOG_ERR << "std error: Terminated due to unknown exception";
}
}
else
{
LOG_ERR << "std error: Terminated due to unknown reason";
}
exit(1);
}
int main(int argc, char **argv)
{
std::set_terminate(&std_terminate);
// Extract the CLI args
// This call will populate conf::ctx
if (parse_cmd(argc, argv) != 0)
@@ -129,11 +168,15 @@ int main(int argc, char **argv)
if (usr::init() != 0)
return -1;
if (cons::init() != 0)
return 1;
// After initializing primary subsystems, register the SIGINT handler.
signal(SIGINT, signal_handler);
cons::ctx.stage = 0;
cons::ctx.lcl = "static_lcl";
//we are waiting for peer to estasblish peer connections.
//otherwise we'll run into not enough peers propsing/stage desync deadlock directly now.
sleep(3);
while (true)
{
@@ -150,11 +193,3 @@ int main(int argc, char **argv)
return 0;
}
/**
* Global exception handler for boost exceptions.
*/
void boost::throw_exception(std::exception const &e)
{
LOG_ERR << "Boost error:" << e.what();
exit(-1);
}

View File

@@ -116,7 +116,7 @@ void peer_connection_watchdog()
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(conf::cfg.roundtime * 4));
std::this_thread::sleep_for(std::chrono::milliseconds(200));
}
}

View File

@@ -14,9 +14,9 @@ namespace p2p
struct proposal
{
std::string pubkey;
int64_t timestamp;
int64_t time;
int8_t stage;
uint64_t timestamp;
uint64_t time;
uint8_t stage;
std::string lcl;
std::unordered_set<std::string> users;
std::unordered_map<std::string, const std::vector<util::hash_buffer>> raw_inputs;

View File

@@ -88,7 +88,7 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
std::lock_guard<std::mutex> lock(collected_msgs.proposals_mutex); // Insert proposal with lock.
collected_msgs.proposals.push_back(
p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey()));
p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp()));
}
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
{