Achieving reliable consensus. (#60)

* Remove considering stage when voting and considering lcl.

* Stage sync , lcl sync  and candidate set changes.

* Implemented ledger close time resolution and fixed ledger retrieval issues.

* Code cleanup and added more comments on reliability changes.

* Added further comments and  clenup.
This commit is contained in:
Asanka Indrajith
2019-11-25 07:40:22 -05:00
committed by GitHub
parent 684a6bb18f
commit bebdace519
10 changed files with 267 additions and 137 deletions

View File

@@ -50,7 +50,7 @@ do
binargs: './bin/contract.js', \
peerport: ${peerport}, \
pubport: ${pubport}, \
roundtime: 10000,
roundtime: 1000,
loglevel: 'debug'
}, null, 2)" > hp.cfg
rm tmp.json

View File

@@ -40,7 +40,7 @@ int init()
ctx.led_seq_no = ldr_hist.led_seq_no;
ctx.lcl = ldr_hist.lcl;
ctx.lcl_list.swap(ldr_hist.lcl_list);
ctx.prev_close_time = util::get_epoch_milliseconds();
return 0;
}
@@ -52,15 +52,29 @@ void consensus()
// Get the latest current time.
ctx.time_now = util::get_epoch_milliseconds();
std::list<p2p::proposal> collected_proposals;
// Throughout consensus, we move over the incoming proposals collected via the network so far into
// the candidate proposal set (move and append). This is to have a private working set for the consensus
// and avoid threading conflicts with network incoming proposals.
{
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.proposals_mutex);
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::ctx.collected_msgs.proposals);
collected_proposals.splice(collected_proposals.end(), p2p::ctx.collected_msgs.proposals);
}
//Copy collected propsals to candidate set of proposals.
//Add mpropsals of new nodes and Replace messages from old nodes to reflect current status of nodes.
for (const auto &proposal : collected_proposals)
{
auto prop_itr = ctx.candidate_proposals.find(proposal.pubkey);
if (prop_itr != ctx.candidate_proposals.end())
{
ctx.candidate_proposals.erase(prop_itr);
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
else
ctx.candidate_proposals.emplace(proposal.pubkey, std::move(proposal));
}
// Throughout consensus, we move over the incoming npl messages collected via the network so far into
// the candidate npl message set (move and append). This is to have a private working set for the consensus
// and avoid threading conflicts with network incoming npl messages.
@@ -78,25 +92,8 @@ void consensus()
p2p::ctx.collected_msgs.npl_messages.clear();
}
if (ctx.stage == 0)
{
// Stage 0 means begining of a consensus round.
{
// Remove any useless candidate proposals so we'll have a cleaner proposal set to look at
// when we transition to stage 1.
auto itr = ctx.candidate_proposals.begin();
while (itr != ctx.candidate_proposals.end())
{
// Remove any proposal from previous round's stage 3.
// Remove any proposal from self (pubkey match).
// todo: check the state of these to ensure we're running consensus ledger
if (itr->stage == 3 || conf::cfg.pubkey == itr->pubkey)
ctx.candidate_proposals.erase(itr++);
else
++itr;
}
}
if (ctx.stage == 0) // Stage 0 means begining of a consensus round.
{
// Broadcast non-unl proposals (NUP) containing inputs from locally connected users.
broadcast_nonunl_proposal();
util::sleep(conf::cfg.roundtime / 10);
@@ -110,21 +107,19 @@ void consensus()
}
else // Stage 1, 2, 3
{
std::cout << "Started stage " << std::to_string(ctx.stage) << "\n";
for (auto p : ctx.candidate_proposals)
for (auto &[pubkey, proposal] : ctx.candidate_proposals)
{
bool self = p.pubkey == conf::cfg.pubkey;
LOG_DBG << "[stage" << std::to_string(p.stage)
<< "] users:" << p.users.size()
<< " hinp:" << p.hash_inputs.size()
<< " hout:" << p.hash_outputs.size()
<< " lcl:" << p.lcl
bool self = proposal.pubkey == conf::cfg.pubkey;
LOG_DBG << "[stage" << std::to_string(proposal.stage)
<< "] users:" << proposal.users.size()
<< " hinp:" << proposal.hash_inputs.size()
<< " hout:" << proposal.hash_outputs.size()
<< " lcl:" << proposal.lcl
<< " self:" << self
<< "\n";
}
LOG_DBG << "timenow:" << std::to_string(ctx.time_now) << "\n";
// Initialize vote counters
vote_counter votes;
@@ -134,14 +129,15 @@ void consensus()
check_majority_stage(is_stage_desync, reset_to_stage0, majority_stage, votes);
if (is_stage_desync)
{
timewait_stage(reset_to_stage0);
timewait_stage(reset_to_stage0, floor(conf::cfg.roundtime / 10));
return;
}
// check if we're ahead/behind of consensus lcl
bool is_lcl_desync, should_request_history;
std::string majority_lcl;
check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes);
uint64_t time_off = 0;
check_lcl_votes(is_lcl_desync, should_request_history, time_off, majority_lcl, votes);
if (should_request_history)
{
@@ -150,9 +146,11 @@ void consensus()
}
if (is_lcl_desync)
{
bool should_reset = (ctx.time_now - ctx.novel_proposal_time) > (floor(conf::cfg.roundtime) + floor(rand() % conf::cfg.roundtime));
//for now we are resetting to stage 0 to avoid possible deadlock situations
timewait_stage(should_reset);
//We are resetting to stage 0 to avoid possible deadlock situations.
//Also we try to converge consensus by trying to reset every node in same time(close time range)
//by resetting node to max close time of candidate list of unl list peers.
timewait_stage(true, (time_off - ctx.time_now));
//LOG_DBG << "time off: " << std::to_string(time_off);
return;
}
@@ -160,19 +158,9 @@ void consensus()
const p2p::proposal stg_prop = create_stage123_proposal(votes);
broadcast_proposal(stg_prop);
// Remove all candidate proposals that are behind our current stage.
auto itr = ctx.candidate_proposals.begin();
while (itr != ctx.candidate_proposals.end())
{
if (itr->stage < ctx.stage)
ctx.candidate_proposals.erase(itr++);
else
++itr;
}
//ctx.candidate_proposals.clear();
if (ctx.stage == 3)
{
ctx.prev_close_time = stg_prop.time;
apply_ledger(stg_prop);
// We have finished a consensus round (all 4 stages).
@@ -187,7 +175,7 @@ void consensus()
// after a stage 0 novel proposal we will just busy wait for proposals
if (ctx.stage == 0)
util::sleep(conf::cfg.roundtime / 100);
util::sleep(conf::cfg.roundtime / 10);
else
util::sleep(conf::cfg.roundtime / 4);
}
@@ -322,7 +310,7 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
stg_prop.lcl = ctx.lcl;
// Vote for rest of the proposal fields by looking at candidate proposals.
for (const p2p::proposal &cp : ctx.candidate_proposals)
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
// Vote for times.
// Everyone votes on an arbitrary time, as long as its within the round time and not in the future.
@@ -380,6 +368,10 @@ p2p::proposal create_stage123_proposal(vote_counter &votes)
stg_prop.time = time;
}
}
//todo:apply a round time resolution to increase close time reliability(for stage 1,2)
if (ctx.stage == 3)
get_ledger_time_resolution(stg_prop.time);
return stg_prop;
}
@@ -410,7 +402,7 @@ void broadcast_proposal(const p2p::proposal &p)
void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes)
{
// Stage votes.
for (const p2p::proposal &cp : ctx.candidate_proposals)
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
// Vote stages if only proposal lcl is match with node's last consensus lcl
if (cp.lcl == ctx.lcl)
@@ -438,32 +430,28 @@ void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority
LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage)
<< " is ahead of majority stage:" << std::to_string(majority_stage);
}
else if (majority_stage > ctx.stage - 1)
{
should_reset = true;
is_desync = true;
LOG_DBG << "Stage desync (Reset:" << should_reset << "). Node stage:" << std::to_string(ctx.stage)
<< " is behind majority stage:" << std::to_string(majority_stage);
}
}
/**
* Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes.
*/
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes)
void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes)
{
// Stage votes.
int32_t total_lcl_votes = 0;
for (const p2p::proposal &cp : ctx.candidate_proposals)
for (const auto &[pubkey, cp] : ctx.candidate_proposals)
{
// only consider recent proposals and proposals from previous stage.
if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && (cp.stage == ctx.stage - 1))
// only consider recent proposals and proposals from previous stage and current stage.
if ((ctx.time_now - cp.timestamp < conf::cfg.roundtime * 4) && cp.stage >= (ctx.stage - 1))
{
increment(votes.lcl, cp.lcl);
total_lcl_votes++;
}
//keep track of max time of peers, so we can reset nodes in a close time range to increase reliability.
//This is very usefull especially boostrapping a node cluster.
if (cp.time > time_off)
time_off = cp.time;
}
is_desync = false;
@@ -486,26 +474,25 @@ void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string
}
}
double wining_votes_unl_ratio = winning_votes / conf::cfg.unl.size();
if (wining_votes_unl_ratio < 0.8)
{
// potential fork condition.
LOG_DBG << "No consensus on lcl. Possible fork condition.";
is_desync = true;
return;
}
//if winning lcl is not matched node lcl,
//that means vode is not on the consensus ledger.
//that means vote is not on the consensus ledger.
//Should request history from a peer.
if (ctx.lcl != majority_lcl)
{
LOG_DBG << "We are not on the consensus ledger, requesting history from a random peer";
is_desync = true;
should_request_history = true;
return;
}
if (winning_votes < 0.8 * ctx.candidate_proposals.size())
{
// potential fork condition.
// critical!!!
LOG_WARN << "No consensus on lcl. Possible fork condition. " << std::to_string(winning_votes) << std::to_string(ctx.candidate_proposals.size());
is_desync = true;
return;
}
}
/**
@@ -526,12 +513,38 @@ float_t get_stage_threshold(const uint8_t stage)
return -1;
}
void timewait_stage(const bool reset)
/**
* Awiat/Sleep consensus to time milliseconds and reset consensus.
* @param reset reset consensus stage to 0 or not.
* @param time milliseconds to sleep/await.
*/
void timewait_stage(const bool reset, const uint64_t time)
{
if (reset)
{
ctx.candidate_proposals.clear();
ctx.stage = 0;
}
util::sleep(conf::cfg.roundtime / 100);
util::sleep(time);
}
/**
* Calculate the effective ledger close time
* After adjusting the ledger close time based on the current resolution,
* also ensure it is sufficiently separated from the prior close time.
* @param close_time voted/agreed closed time
*/
const uint64_t get_ledger_time_resolution(uint64_t close_time)
{
uint64_t closeResolution = conf::cfg.roundtime / 4;
//todo: change time resolution dynamically.
//When nodes agree often reduce resolution time and increase if they don't.
close_time += (closeResolution / 2);
close_time -= (close_time % closeResolution);
return std::max(close_time, (ctx.prev_close_time + conf::cfg.roundtime));
}
/**

View File

@@ -44,8 +44,10 @@ struct candidate_user_output
*/
struct consensus_context
{
// The set of proposals that are being collected as consensus stages are progressing.
std::list<p2p::proposal> candidate_proposals;
// The map of proposals that are being collected as consensus stages are progressing.
// peer public key is the key.
// todo: having a queue of proposals against peer pubkey.
std::unordered_map<std::string, const p2p::proposal> candidate_proposals;
// The set of npl messages that are being collected as consensus stages are progressing.
std::list<std::string> candidate_npl_messages;
@@ -70,7 +72,13 @@ struct consensus_context
uint64_t time_now;
std::string lcl;
uint64_t led_seq_no;
//Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key.
//contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE.
//this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range.
//We will use this to track lcls related logic.- track state, lcl request, response.
std::map<uint64_t, std::string> lcl_list;
//ledger close time of previous hash
uint64_t prev_close_time;
consensus_context() : recent_userinput_hashes(200)
{
@@ -105,11 +113,13 @@ void broadcast_proposal(const p2p::proposal &p);
void check_majority_stage(bool &is_desync, bool &should_reset, uint8_t &majority_stage, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, uint64_t &time_off, std::string &majority_lcl, vote_counter &votes);
float_t get_stage_threshold(const uint8_t stage);
void timewait_stage(const bool reset);
void timewait_stage(const bool reset, const uint64_t time);
const uint64_t get_ledger_time_resolution(uint64_t close_time);
void apply_ledger(const p2p::proposal &proposal);

View File

@@ -62,7 +62,7 @@ const std::tuple<const uint64_t, std::string> save_ledger(const p2p::proposal &p
cons::ctx.lcl_list.emplace(led_seq_no, file_name);
//Remove old ledgers that exceeds max sequence range.
if (led_seq_no > MAX_LEDGER_SEQUENCE)
if (led_seq_no > MAX_LEDGER_SEQUENCE)
{
remove_old_ledgers(led_seq_no - MAX_LEDGER_SEQUENCE);
}
@@ -98,7 +98,8 @@ void remove_old_ledgers(const uint64_t led_seq_no)
if (boost::filesystem::exists(file_path))
boost::filesystem::remove(file_path);
}
cons::ctx.lcl_list.erase(cons::ctx.lcl_list.begin(), cons::ctx.lcl_list.lower_bound(led_seq_no + 1));
if (!cons::ctx.lcl_list.empty())
cons::ctx.lcl_list.erase(cons::ctx.lcl_list.begin(), cons::ctx.lcl_list.lower_bound(led_seq_no + 1));
}
/**
@@ -244,7 +245,8 @@ bool check_required_lcl_availability(const p2p::history_request &hr)
if (itr == cons::ctx.lcl_list.end())
{
LOG_DBG << "Required lcl peer asked for is not in our lcl cache.";
//either this node is also not in consesnsus ledger or other node requesting a lcl that is older than maximum ledger range.
//either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current
// minimum lcl sequence becuase of maximum ledger history range.
return false;
}
else if (itr->second != hr.required_lcl)
@@ -265,9 +267,9 @@ bool check_required_lcl_availability(const p2p::history_request &hr)
const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr)
{
p2p::history_response history_response;
size_t pos = hr.minimum_lcl.find("-");
uint64_t min_seq_no = 0;
std::string min_lcl_hash;
//get sequence number of minimum lcl required
if (pos != std::string::npos)
@@ -278,22 +280,40 @@ const p2p::history_response retrieve_ledger_history(const p2p::history_request &
const auto itr = cons::ctx.lcl_list.find(min_seq_no);
if (itr != cons::ctx.lcl_list.end()) //requested minimum lcl is not in our lcl history cache
{
LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl";
min_seq_no = itr->first;
//check whether minimum lcl node ask for is same as this node's.
//eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition.
if (hr.minimum_lcl != itr->second)
{
LOG_DBG << "Invalid minimum ledger. Recieved min hash: "<< min_lcl_hash << " Node hash: " << itr->second;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
}
else if (min_seq_no > cons::ctx.lcl_list.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence.
{
LOG_DBG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. hash: "<< min_lcl_hash;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
else
{
LOG_DBG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl";
min_seq_no = cons::ctx.lcl_list.begin()->first;
}
//copy current history cache.
std::map<uint64_t, std::string> lcl_list = cons::ctx.lcl_list;
//LOG_DBG << "history request min seq: " << std::to_string(min_seq_no);
//filter out cache and get raw files here.
//copy current history cache.
std::map<uint64_t, std::string>
lcl_list = cons::ctx.lcl_list;
//filter out cache from finalized minimum sequence.
lcl_list.erase(
lcl_list.begin(),
lcl_list.lower_bound(min_seq_no));
//Get raw content of lcls that going to be send.
for (auto &[seq_no, lcl_hash] : lcl_list)
{
p2p::history_ledger ledger;
@@ -350,49 +370,59 @@ void handle_ledger_history_response(const p2p::history_response &hr)
return;
}
//check whether recieved lcl history contains the current lcl node required.
bool have_requested_lcl = false;
for (auto &[seq_no, ledger] : hr.hist_ledgers)
if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER)
{
if (last_requested_lcl == ledger.lcl)
// This means we are in a fork ledger.Remove/rollback current ledger.
// Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger .
remove_ledger(ctx.lcl);
cons::ctx.lcl_list.erase(ctx.lcl_list.rbegin()->first);
}
else
{
//check whether recieved lcl history contains the current lcl node required.
bool have_requested_lcl = false;
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
have_requested_lcl = true;
break;
if (last_requested_lcl == ledger.lcl)
{
have_requested_lcl = true;
break;
}
}
}
if (!have_requested_lcl)
{
LOG_DBG << "Peer sent us a history response but not containing the lcl we asked for!";
return;
}
//Check integrity of recieved lcl list.
//By checking recieved lcl hashes matches lcl content by applying hashing for each raw content.
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
const size_t pos = ledger.lcl.find("-");
std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1));
//Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size());
//Get hex from binary hash
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
//LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash;
//recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it
if (lcl_hash != rec_lcl_hash)
if (!have_requested_lcl)
{
LOG_WARN << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes";
//todo: we should penalize peer who send this?
LOG_DBG << "Peer sent us a history response but not containing the lcl we asked for! " << hr.hist_ledgers.size();
return;
}
//Check integrity of recieved lcl list.
//By checking recieved lcl hashes matches lcl content by applying hashing for each raw content.
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
const size_t pos = ledger.lcl.find("-");
std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1));
//Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size());
//Get hex from binary hash
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
//LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash;
//recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it
if (lcl_hash != rec_lcl_hash)
{
LOG_WARN << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes";
//todo: we should penalize peer who send this?
return;
}
}
}
//Execution to here means the history data sent checks out
@@ -410,9 +440,18 @@ void handle_ledger_history_response(const p2p::history_response &hr)
}
last_requested_lcl = "";
const auto latest_lcl_itr = cons::ctx.lcl_list.rbegin();
cons::ctx.lcl = latest_lcl_itr->second;
cons::ctx.led_seq_no = latest_lcl_itr->first;
if (cons::ctx.lcl_list.empty())
{
cons::ctx.led_seq_no = 0;
cons::ctx.lcl = "0-genesis";
}
else
{
const auto latest_lcl_itr = cons::ctx.lcl_list.rbegin();
cons::ctx.lcl = latest_lcl_itr->second;
cons::ctx.led_seq_no = latest_lcl_itr->first;
}
}
} // namespace cons

View File

@@ -40,8 +40,16 @@ table History_Request_Message { //Ledger History request type message schema
required_lcl:[ubyte];
}
enum Ledger_Response_Error : ubyte
{
None = 0,
Invalid_Min_Ledger = 1,
Req_Ledger_Not_Found = 2
}
table History_Response_Message { //Ledger History request type message schema
hist_ledgers:[HistoryLedgerPair];
error: Ledger_Response_Error;
}
table HistoryLedgerPair { //A key, value pair of byte[].

View File

@@ -104,6 +104,39 @@ template<> struct MessageTraits<History_Response_Message> {
bool VerifyMessage(flatbuffers::Verifier &verifier, const void *obj, Message type);
bool VerifyMessageVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
enum Ledger_Response_Error {
Ledger_Response_Error_None = 0,
Ledger_Response_Error_Invalid_Min_Ledger = 1,
Ledger_Response_Error_Req_Ledger_Not_Found = 2,
Ledger_Response_Error_MIN = Ledger_Response_Error_None,
Ledger_Response_Error_MAX = Ledger_Response_Error_Req_Ledger_Not_Found
};
inline const Ledger_Response_Error (&EnumValuesLedger_Response_Error())[3] {
static const Ledger_Response_Error values[] = {
Ledger_Response_Error_None,
Ledger_Response_Error_Invalid_Min_Ledger,
Ledger_Response_Error_Req_Ledger_Not_Found
};
return values;
}
inline const char * const *EnumNamesLedger_Response_Error() {
static const char * const names[] = {
"None",
"Invalid_Min_Ledger",
"Req_Ledger_Not_Found",
nullptr
};
return names;
}
inline const char *EnumNameLedger_Response_Error(Ledger_Response_Error e) {
if (e < Ledger_Response_Error_None || e > Ledger_Response_Error_Req_Ledger_Not_Found) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesLedger_Response_Error()[index];
}
struct UserSubmittedMessage FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_CONTENT = 4,
@@ -653,7 +686,8 @@ inline flatbuffers::Offset<History_Request_Message> CreateHistory_Request_Messag
struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_HIST_LEDGERS = 4
VT_HIST_LEDGERS = 4,
VT_ERROR = 6
};
const flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>> *hist_ledgers() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>> *>(VT_HIST_LEDGERS);
@@ -661,11 +695,18 @@ struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::T
flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>> *mutable_hist_ledgers() {
return GetPointer<flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>> *>(VT_HIST_LEDGERS);
}
Ledger_Response_Error error() const {
return static_cast<Ledger_Response_Error>(GetField<uint8_t>(VT_ERROR, 0));
}
bool mutate_error(Ledger_Response_Error _error) {
return SetField<uint8_t>(VT_ERROR, static_cast<uint8_t>(_error), 0);
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_HIST_LEDGERS) &&
verifier.VerifyVector(hist_ledgers()) &&
verifier.VerifyVectorOfTables(hist_ledgers()) &&
VerifyField<uint8_t>(verifier, VT_ERROR) &&
verifier.EndTable();
}
};
@@ -676,6 +717,9 @@ struct History_Response_MessageBuilder {
void add_hist_ledgers(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>>> hist_ledgers) {
fbb_.AddOffset(History_Response_Message::VT_HIST_LEDGERS, hist_ledgers);
}
void add_error(Ledger_Response_Error error) {
fbb_.AddElement<uint8_t>(History_Response_Message::VT_ERROR, static_cast<uint8_t>(error), 0);
}
explicit History_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
@@ -690,19 +734,23 @@ struct History_Response_MessageBuilder {
inline flatbuffers::Offset<History_Response_Message> CreateHistory_Response_Message(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>>> hist_ledgers = 0) {
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<HistoryLedgerPair>>> hist_ledgers = 0,
Ledger_Response_Error error = Ledger_Response_Error_None) {
History_Response_MessageBuilder builder_(_fbb);
builder_.add_hist_ledgers(hist_ledgers);
builder_.add_error(error);
return builder_.Finish();
}
inline flatbuffers::Offset<History_Response_Message> CreateHistory_Response_MessageDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<flatbuffers::Offset<HistoryLedgerPair>> *hist_ledgers = nullptr) {
const std::vector<flatbuffers::Offset<HistoryLedgerPair>> *hist_ledgers = nullptr,
Ledger_Response_Error error = Ledger_Response_Error_None) {
auto hist_ledgers__ = hist_ledgers ? _fbb.CreateVector<flatbuffers::Offset<HistoryLedgerPair>>(*hist_ledgers) : 0;
return fbschema::p2pmsg::CreateHistory_Response_Message(
_fbb,
hist_ledgers__);
hist_ledgers__,
error);
}
struct HistoryLedgerPair FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {

View File

@@ -166,6 +166,9 @@ const p2p::history_response create_history_response_from_msg(const History_Respo
if (msg.hist_ledgers())
hr.hist_ledgers = flatbuf_historyledgermap_to_historyledgermap(msg.hist_ledgers());
if (msg.error())
hr.error = (p2p::LEDGER_RESPONSE_ERROR)msg.error();
return hr;
}
@@ -318,7 +321,8 @@ void create_msg_from_history_response(flatbuffers::FlatBufferBuilder &container_
flatbuffers::Offset<History_Response_Message> hrmsg =
CreateHistory_Response_Message(
builder,
historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers));
historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers),
(Ledger_Response_Error)hr.error);
flatbuffers::Offset<Content> message = CreateContent(builder, Message_History_Response_Message, hrmsg.Union());
builder.Finish(message); // Finished building message content to get serialised content.

View File

@@ -176,8 +176,7 @@ int main(int argc, char **argv)
signal(SIGINT, signal_handler);
//we are waiting for peer to estasblish peer connections.
//otherwise we'll run into not enough peers propsing/stage desync deadlock directly now.
sleep(3);
sleep(10); //todo: replace waiting with a check to peer check.
while (true)
{

View File

@@ -71,8 +71,7 @@ void peer_connection_watchdog()
}
}
//util::sleep(conf::cfg.roundtime * 4);
util::sleep(200);
util::sleep(conf::cfg.roundtime * 4);
}
}

View File

@@ -38,9 +38,19 @@ struct history_ledger
std::vector<uint8_t> raw_ledger;
};
enum LEDGER_RESPONSE_ERROR
{
NONE = 0,
INVALID_MIN_LEDGER = 1,
REQ_LEDGER_NOT_FOUND = 2
};
struct history_response
{
std::map<uint64_t,const history_ledger> hist_ledgers;
LEDGER_RESPONSE_ERROR error;
};
struct npl_message
@@ -50,7 +60,7 @@ struct npl_message
struct message_collection
{
std::list<proposal> proposals;
std::list<proposal> proposals;
std::mutex proposals_mutex; // Mutex for proposals access race conditions.
std::list<nonunl_proposal> nonunl_proposals;