Ledger maintenance refactor. (#130)

* Added ledger namespace.
* Thread-safe lcl access and update.
* Refactored history sync and serving into a thread.
* Restructured ledger cache item.
This commit is contained in:
Ravin Perera
2020-10-08 22:25:47 +05:30
committed by GitHub
parent 31048f55b8
commit cb4d0c4f59
30 changed files with 945 additions and 754 deletions

View File

@@ -46,8 +46,8 @@ add_executable(hpcore
src/usr/user_session_handler.cpp
src/usr/usr.cpp
src/usr/read_req.cpp
src/cons/cons.cpp
src/cons/ledger_handler.cpp
src/ledger.cpp
src/consensus.cpp
src/state/state_sync.cpp
src/state/state_serve.cpp
src/main.cpp

View File

@@ -48,7 +48,9 @@ Code is divided into subsystems via namespaces.
**p2p::** Handles peer-to-peer connections and message exchange between nodes. Makes use of **crypto** and **comm**.
**cons::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **sc**
**consensus::** Handles consensus and proposal rounds. Makes use of **usr**, **p2p** and **sc**
**ledger::** Maintains the ledger and handles ledger syncing activites.
**comm::** Handles generic web sockets communication functionality. Mainly acts as a wrapper for websocketd/websocat.

View File

@@ -1,476 +0,0 @@
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../crypto.hpp"
#include "../p2p/p2p.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../msg/fbuf/ledger_helpers.hpp"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../hplog.hpp"
#include "ledger_handler.hpp"
#include "cons.hpp"
namespace cons
{
namespace p2pmsg = msg::fbuf::p2pmsg;
/**
* Create and save ledger from the given proposal message.
* @param proposal consensus reached Satge 3 proposal.
* @return tuple of current lcl sequence number and file name of the saved lcl.
*/
const std::tuple<const uint64_t, std::string> save_ledger(const p2p::proposal &proposal)
{
const size_t pos = proposal.lcl.find("-");
uint64_t led_seq_no = 0;
if (pos != std::string::npos)
{
led_seq_no = std::stoull(proposal.lcl.substr(0, pos)); //get lcl sequence number.
led_seq_no++; //current lcl sequence number.
}
else
{
//lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format.
LOG_ERROR << "Invalid lcl name: " << proposal.lcl << " when saving ledger.";
}
//Serialize lcl using flatbuffer ledger schema.
flatbuffers::FlatBufferBuilder builder(1024);
const std::string_view ledger_str = msg::fbuf::ledger::create_ledger_from_proposal(builder, proposal, led_seq_no);
//Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(ledger_str);
//Get hex from binary hash.
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
//construct lcl file name.
//lcl file name should follow [ledger sequnce numer]-lcl[lcl hex] format.
const std::string seq_no_str = std::to_string(led_seq_no);
std::string file_name;
file_name.reserve(lcl_hash.size() + seq_no_str.size() + 1);
file_name.append(seq_no_str)
.append("-")
.append(lcl_hash);
write_ledger(file_name, ledger_str.data(), ledger_str.size());
ledger_cache_entry c;
c.lcl = file_name;
c.state = proposal.state.to_string_view();
cons::ctx.ledger_cache.emplace(led_seq_no, std::move(c));
//Remove old ledgers that exceeds max sequence range.
if (led_seq_no > MAX_LEDGER_SEQUENCE)
{
remove_old_ledgers(led_seq_no - MAX_LEDGER_SEQUENCE);
}
return std::make_tuple(led_seq_no, std::move(file_name));
}
/**
* Remove old ledgers that exceeds max sequence range from file system and ledger history cache.
* @param led_seq_no minimum sequence number to be in history.
*/
void remove_old_ledgers(const uint64_t led_seq_no)
{
std::map<uint64_t, ledger_cache_entry>::iterator itr;
std::string dir_path;
dir_path.reserve(conf::ctx.hist_dir.size() + 1);
dir_path.append(conf::ctx.hist_dir)
.append("/");
for (itr = cons::ctx.ledger_cache.begin();
itr != cons::ctx.ledger_cache.lower_bound(led_seq_no + 1);
itr++)
{
const std::string file_name = itr->second.lcl;
std::string file_path;
file_path.reserve(dir_path.size() + itr->second.lcl.size() + 4);
file_path.append(dir_path)
.append(file_name)
.append(".lcl");
if (util::is_file_exists(file_path))
util::remove_file(file_path);
}
if (!cons::ctx.ledger_cache.empty())
cons::ctx.ledger_cache.erase(cons::ctx.ledger_cache.begin(), cons::ctx.ledger_cache.lower_bound(led_seq_no + 1));
}
/**
* Write ledger to file system.
* @param file_name current ledger sequence number.
* @param ledger_raw raw lcl data.
* @param ledger_size size of the raw lcl data.
*/
void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size)
{
//create file path to save ledger.
//file name -> [ledger sequnce numer]-[lcl hex]
std::string path;
path.reserve(file_name.size() + conf::ctx.hist_dir.size() + 5);
path.append(conf::ctx.hist_dir)
.append("/")
.append(file_name)
.append(".lcl");
//write ledger to file system
std::ofstream ofs(std::move(path));
ofs.write(ledger_raw, ledger_size);
ofs.close();
}
/**
* Delete ledger from file system.
* @param file_name name of ledger to be deleted.
*/
void remove_ledger(const std::string &file_name)
{
std::string file_path;
file_path.reserve(conf::ctx.hist_dir.size() + file_name.size() + 5);
file_path.append(conf::ctx.hist_dir)
.append("/")
.append(file_name)
.append(".lcl");
util::remove_file(file_path);
}
/**
* Retrieve lcl(last closed ledger) information from ledger history.
* @return A ledger_history struct representing the lcl.
*/
const ledger_history load_ledger()
{
ledger_history ldg_hist;
//Get all records at lcl history direcory and find the last closed ledger.
size_t latest_pos = 0;
for (const auto &entry : util::fetch_dir_entries(conf::ctx.hist_dir))
{
std::string file_path(conf::ctx.hist_dir);
file_path.append("/").append(entry.d_name);
if (util::is_dir_exists(file_path))
{
LOG_ERROR << "Found directory " << entry.d_name << " in " << conf::ctx.hist_dir << ". There should be no folders in this directory";
}
else
{
const std::string_view extension = util::fetch_file_extension(file_path);
const std::string file_name(util::remove_file_extension(entry.d_name));
if (extension != ".lcl")
{
LOG_ERROR << "Found invalid file extension: " << extension << " for lcl file " << entry.d_name << " in " << conf::ctx.hist_dir;
}
const size_t pos = file_name.find("-");
uint64_t seq_no = 0;
if (pos != std::string::npos)
{
seq_no = std::stoull(file_name.substr(0, pos));
std::ifstream file(file_path, std::ios::binary | std::ios::ate);
std::streamsize size = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<char> buffer(size);
if (file.read(buffer.data(), size))
{
const uint8_t *ledger_buf_ptr = reinterpret_cast<const uint8_t *>(buffer.data());
const msg::fbuf::ledger::Ledger *ledger = msg::fbuf::ledger::GetLedger(ledger_buf_ptr);
ledger_cache_entry c;
c.lcl = file_name;
c.state = msg::fbuf::flatbuff_bytes_to_sv(ledger->state());
ldg_hist.cache.emplace(seq_no, std::move(c)); //lcl_cache -> [seq_no-hash]
}
}
else
{
//lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format.
LOG_ERROR << "Invalid lcl file name: " << file_name << " in " << conf::ctx.hist_dir;
}
}
}
//check if there is a saved lcl file -> if no send genesis lcl.
if (ldg_hist.cache.empty())
{
ldg_hist.led_seq_no = 0;
ldg_hist.lcl = GENESIS_LEDGER;
}
else
{
ldg_hist.led_seq_no = ldg_hist.cache.rbegin()->first;
ldg_hist.lcl = ldg_hist.cache.rbegin()->second.lcl;
//Remove old ledgers that exceeds max sequence range.
if (ldg_hist.led_seq_no > MAX_LEDGER_SEQUENCE)
{
remove_old_ledgers(ldg_hist.led_seq_no - MAX_LEDGER_SEQUENCE);
}
}
return ldg_hist;
}
/**
* Create and send ledger history request to random node from unl list.
* @param minimum_lcl hash of the minimum lcl from which node need lcl history.
* @param required_lcl hash of the required lcl.
*/
void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl)
{
p2p::history_request hr;
hr.required_lcl = required_lcl;
hr.minimum_lcl = minimum_lcl;
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_request(fbuf, hr);
p2p::send_message_to_random_peer(fbuf);
ctx.last_requested_lcl = required_lcl;
LOG_DEBUG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15);
}
/**
* Check requested lcl is in node's lcl history cache.
* @param hr lcl history request information.
* @return true if requested lcl is in lcl history cache.
*/
bool check_required_lcl_availability(const p2p::history_request &hr)
{
size_t pos = hr.required_lcl.find("-");
uint64_t req_seq_no = 0;
//get sequence number of required lcl
if (pos != std::string::npos)
{
req_seq_no = std::stoull(hr.required_lcl.substr(0, pos)); //get required lcl sequence number
}
if (req_seq_no > 0)
{
const auto itr = cons::ctx.ledger_cache.find(req_seq_no);
if (itr == cons::ctx.ledger_cache.end())
{
LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache.";
//either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current
// minimum lcl sequence becuase of maximum ledger history range.
return false;
}
else if (itr->second.lcl != hr.required_lcl)
{
LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache.";
//either this node or requesting node is in a fork condition.
return false;
}
}
else
{
return false; //Very rare case: node asking for the genisis lcl.
}
return true;
}
/**
* Retrieve lcl(last closed ledger) information from ledger history.
* @param hr lcl history request information.
* @return A ledger history response containing requested ledger details.
*/
const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr)
{
p2p::history_response history_response;
size_t pos = hr.minimum_lcl.find("-");
uint64_t min_seq_no = 0;
//get sequence number of minimum lcl required
if (pos != std::string::npos)
{
min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); //get required lcl sequence number
}
const auto itr = cons::ctx.ledger_cache.find(min_seq_no);
if (itr != cons::ctx.ledger_cache.end()) //requested minimum lcl is not in our lcl history cache
{
min_seq_no = itr->first;
//check whether minimum lcl node ask for is same as this node's.
//eventhough sequence number are same, lcl hash can be changed if one of node is in a fork condition.
if (hr.minimum_lcl != itr->second.lcl)
{
LOG_DEBUG << "Invalid minimum ledger. Recieved min hash: " << hr.minimum_lcl << " Node hash: " << itr->second.lcl;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
}
else if (min_seq_no > cons::ctx.ledger_cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence.
{
LOG_DEBUG << "Invalid minimum ledger. Recieved minimum sequence number is ahead of node current lcl sequence. Recvd hash: " << hr.minimum_lcl;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return history_response;
}
else
{
LOG_DEBUG << "Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl";
min_seq_no = cons::ctx.ledger_cache.begin()->first;
}
//LOG_DBG << "history request min seq: " << std::to_string(min_seq_no);
//copy current history cache.
std::map<uint64_t, ledger_cache_entry> led_cache = cons::ctx.ledger_cache;
//filter out cache and get raw files here.
led_cache.erase(
led_cache.begin(),
led_cache.lower_bound(min_seq_no));
//Get raw content of lcls that going to be send.
for (auto &[seq_no, cache] : led_cache)
{
p2p::history_ledger ledger;
ledger.lcl = cache.lcl;
ledger.state = cache.state;
std::string path;
path.reserve(conf::ctx.hist_dir.size() + cache.lcl.size() + 5);
path.append(conf::ctx.hist_dir)
.append("/")
.append(cache.lcl)
.append(".lcl");
//read lcl file
std::ifstream file(path, std::ios::binary | std::ios::ate);
std::streamsize size = file.tellg();
file.seekg(0, std::ios::beg);
std::vector<char> buffer(size);
if (file.read(buffer.data(), size))
{
ledger.raw_ledger = reinterpret_cast<std::vector<uint8_t> &>(buffer);
history_response.hist_ledgers.emplace(seq_no, ledger);
}
}
return history_response;
}
/**
* Handle recieved ledger history response.
* @param hr lcl history request information.
* @return peer outbound message object with ledger history response.
*/
void handle_ledger_history_response(const p2p::history_response &hr)
{
//check response object contains
if (ctx.last_requested_lcl.empty())
{
LOG_DEBUG << "Peer sent us a history response but we never asked for one!";
return;
}
if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER)
{
// This means we are in a fork ledger.Remove/rollback current ledger.
// Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger .
remove_ledger(ctx.lcl);
cons::ctx.ledger_cache.erase(ctx.ledger_cache.rbegin()->first);
LOG_DEBUG << "Invalid min ledger. Removed last ledger.";
}
else
{
//check whether recieved lcl history contains the current lcl node required.
bool have_requested_lcl = false;
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
if (ctx.last_requested_lcl == ledger.lcl)
{
have_requested_lcl = true;
break;
}
}
if (!have_requested_lcl)
{
LOG_DEBUG << "Peer sent us a history response but not containing the lcl we asked for! " << hr.hist_ledgers.size();
return;
}
//Check integrity of recieved lcl list.
//By checking recieved lcl hashes matches lcl content by applying hashing for each raw content.
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
const size_t pos = ledger.lcl.find("-");
std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1));
//Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(&ledger.raw_ledger[0], ledger.raw_ledger.size());
//Get hex from binary hash
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
//LOG_DBG << "passed lcl: " << ledger.lcl << " gen lcl: " << lcl_hash;
//recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it
if (lcl_hash != rec_lcl_hash)
{
LOG_WARNING << "peer sent us a history response we asked for but the ledger data does not match the ledger hashes";
//todo: we should penalize peer who send this?
return;
}
}
}
//Execution to here means the history data sent checks out
//Save recieved lcl in file system and update lcl history cache
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
auto prev_dup_itr = cons::ctx.ledger_cache.find(seq_no);
if (prev_dup_itr != cons::ctx.ledger_cache.end())
{
remove_ledger(prev_dup_itr->second.lcl);
cons::ctx.ledger_cache.erase(prev_dup_itr);
}
write_ledger(ledger.lcl, reinterpret_cast<const char *>(&ledger.raw_ledger[0]), ledger.raw_ledger.size());
ledger_cache_entry l;
l.lcl = ledger.lcl;
l.state = ledger.state;
cons::ctx.ledger_cache.emplace(seq_no, std::move(l));
}
ctx.last_requested_lcl = "";
if (cons::ctx.ledger_cache.empty())
{
cons::ctx.led_seq_no = 0;
cons::ctx.lcl = GENESIS_LEDGER;
}
else
{
const auto latest_lcl_itr = cons::ctx.ledger_cache.rbegin();
cons::ctx.lcl = latest_lcl_itr->second.lcl;
cons::ctx.led_seq_no = latest_lcl_itr->first;
}
LOG_INFO << "lcl sync complete. New lcl:" << cons::ctx.lcl.substr(0, 15);
}
} // namespace cons

View File

@@ -1,46 +0,0 @@
#ifndef _HP_CONS_LEDGER_
#define _HP_CONS_LEDGER_
#include "../pchheader.hpp"
#include "../p2p/p2p.hpp"
namespace cons
{
//max ledger count
constexpr uint64_t MAX_LEDGER_SEQUENCE = 200;
constexpr const char* GENESIS_LEDGER = "0-genesis";
struct ledger_cache_entry
{
std::string lcl;
std::string state;
};
struct ledger_history
{
std::string lcl;
uint64_t led_seq_no = 0;
std::map<uint64_t, ledger_cache_entry> cache;
};
const std::tuple<const uint64_t, std::string> save_ledger(const p2p::proposal &proposal);
void remove_old_ledgers(const uint64_t led_seq_no);
void write_ledger(const std::string &file_name, const char *ledger_raw, size_t ledger_size);
void remove_ledger(const std::string &file_name);
const ledger_history load_ledger();
void send_ledger_history_request(const std::string &minimum_lcl, const std::string &required_lcl);
bool check_required_lcl_availability(const p2p::history_request &hr);
const p2p::history_response retrieve_ledger_history(const p2p::history_request &hr);
void handle_ledger_history_response(const p2p::history_response &hr);
} // namespace cons
#endif

View File

@@ -1,24 +1,24 @@
#include "../pchheader.hpp"
#include "../conf.hpp"
#include "../usr/usr.hpp"
#include "../usr/user_input.hpp"
#include "../p2p/p2p.hpp"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/usrmsg_parser.hpp"
#include "../msg/usrmsg_common.hpp"
#include "../p2p/peer_session_handler.hpp"
#include "../hplog.hpp"
#include "../crypto.hpp"
#include "../sc.hpp"
#include "../hpfs/h32.hpp"
#include "../hpfs/hpfs.hpp"
#include "../state/state_sync.hpp"
#include "ledger_handler.hpp"
#include "cons.hpp"
#include "pchheader.hpp"
#include "conf.hpp"
#include "usr/usr.hpp"
#include "usr/user_input.hpp"
#include "p2p/p2p.hpp"
#include "msg/fbuf/p2pmsg_helpers.hpp"
#include "msg/usrmsg_parser.hpp"
#include "msg/usrmsg_common.hpp"
#include "p2p/peer_session_handler.hpp"
#include "hplog.hpp"
#include "crypto.hpp"
#include "sc.hpp"
#include "hpfs/h32.hpp"
#include "hpfs/hpfs.hpp"
#include "state/state_sync.hpp"
#include "ledger.hpp"
#include "consensus.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
namespace cons
namespace consensus
{
/**
@@ -30,22 +30,10 @@ namespace cons
constexpr float MAJORITY_THRESHOLD = 0.8;
consensus_context ctx;
bool init_success = false;
bool is_shutting_down = false;
// Consensus processing thread.
std::thread consensus_thread;
int init()
{
//load lcl details from lcl history.
ledger_history ldr_hist = load_ledger();
ctx.led_seq_no = ldr_hist.led_seq_no;
ctx.lcl = ldr_hist.lcl;
ctx.ledger_cache.swap(ldr_hist.cache);
if (get_initial_state_hash(ctx.state) == -1)
{
LOG_ERROR << "Failed to get initial state hash.";
@@ -64,7 +52,7 @@ namespace cons
ctx.contract_ctx.args.readonly = false;
// Starting consensus processing thread.
consensus_thread = std::thread(cons::run_consensus);
ctx.consensus_thread = std::thread(run_consensus);
init_success = true;
return 0;
@@ -78,14 +66,14 @@ namespace cons
if (init_success)
{
// Making the consensus while loop stop.
is_shutting_down = true;
ctx.is_shutting_down = true;
// Stop the contract if running.
sc::stop(ctx.contract_ctx);
// Joining consensus processing thread.
if (consensus_thread.joinable())
consensus_thread.join();
if (ctx.consensus_thread.joinable())
ctx.consensus_thread.join();
}
}
@@ -94,7 +82,7 @@ namespace cons
*/
void wait()
{
consensus_thread.join();
ctx.consensus_thread.join();
}
void run_consensus()
@@ -103,7 +91,7 @@ namespace cons
LOG_INFO << "Consensus processor started.";
while (!is_shutting_down)
while (!ctx.is_shutting_down)
{
if (consensus() == -1)
{
@@ -128,6 +116,10 @@ namespace cons
ctx.time_now = stage_start;
std::list<p2p::proposal> collected_proposals;
// Get current lcl and sequence no.
const std::string lcl = ledger::ctx.get_lcl();
const uint64_t lcl_seq_no = ledger::ctx.get_seq_no();
// Throughout consensus, we move over the incoming proposals collected via the network so far into
// the candidate proposal set (move and append). This is to have a private working set for the consensus
// and avoid threading conflicts with network incoming proposals.
@@ -165,7 +157,7 @@ namespace cons
auto itr = ctx.candidate_npl_messages.begin();
while (itr != ctx.candidate_npl_messages.end())
{
if (itr->lcl == ctx.lcl)
if (itr->lcl == lcl)
++itr;
else
ctx.candidate_npl_messages.erase(itr++);
@@ -177,13 +169,12 @@ namespace cons
{
// Broadcast non-unl proposals (NUP) containing inputs from locally connected users.
broadcast_nonunl_proposal();
//util::sleep(conf::cfg.roundtime / 10);
// Verify and transfer user inputs from incoming NUPs onto consensus candidate data.
verify_and_populate_candidate_user_inputs();
verify_and_populate_candidate_user_inputs(lcl_seq_no);
// In stage 0 we create a novel proposal and broadcast it.
const p2p::proposal stg_prop = create_stage0_proposal();
const p2p::proposal stg_prop = create_stage0_proposal(lcl);
broadcast_proposal(stg_prop);
}
else // Stage 1, 2, 3
@@ -196,31 +187,15 @@ namespace cons
// check if we're ahead/behind of consensus lcl
bool is_lcl_desync = false, should_request_history = false;
std::string majority_lcl;
check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes);
check_lcl_votes(is_lcl_desync, should_request_history, majority_lcl, votes, lcl);
if (is_lcl_desync)
{
if (should_request_history)
{
LOG_INFO << "Syncing lcl. Curr lcl:" << cons::ctx.lcl.substr(0, 15) << " majority:" << majority_lcl.substr(0, 15);
// TODO: If we are in a lcl fork condition try to rollback state with the help of
// state_restore to rollback state checkpoints before requesting new state.
// Handle minority going forward when boostrapping cluster.
// Here we are mimicking invalid min ledger scenario.
if (majority_lcl == GENESIS_LEDGER)
{
ctx.last_requested_lcl = majority_lcl;
p2p::history_response res;
res.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
handle_ledger_history_response(std::move(res));
}
else
{
//create history request message and request history from a random peer.
send_ledger_history_request(ctx.lcl, majority_lcl);
}
//Node is not in sync with majority lcl. Switch to observer mode.
conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER);
ledger::set_sync_target(majority_lcl);
}
}
else
@@ -239,16 +214,16 @@ namespace cons
conf::change_operating_mode(conf::OPERATING_MODE::PROPOSER);
// In stage 1, 2, 3 we vote for incoming proposals and promote winning votes based on thresholds.
const p2p::proposal stg_prop = create_stage123_proposal(votes);
const p2p::proposal stg_prop = create_stage123_proposal(votes, lcl);
broadcast_proposal(stg_prop);
if (ctx.stage == 3)
{
if (apply_ledger(stg_prop) != -1)
if (apply_ledger(stg_prop, lcl_seq_no, lcl) != -1)
{
// node has finished a consensus round (all 4 stages).
LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << ctx.lcl.substr(0, 15)
LOG_INFO << "****Stage 3 consensus reached**** (lcl:" << lcl.substr(0, 15)
<< " state:" << ctx.state << ")";
}
else
@@ -384,7 +359,7 @@ namespace cons
* Verifies the user signatures and populate non-expired user inputs from collected
* non-unl proposals (if any) into consensus candidate data.
*/
void verify_and_populate_candidate_user_inputs()
void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no)
{
// Lock the user sessions and the list so any network activity is blocked.
std::scoped_lock<std::mutex, std::mutex> lock(usr::ctx.users_mutex, p2p::ctx.collected_msgs.nonunl_proposals_mutex);
@@ -422,7 +397,7 @@ namespace cons
parser.extract_input_container(input, nonce, max_lcl_seqno, umsg.input_container);
// Ignore the input if our ledger has passed the input TTL.
if (max_lcl_seqno > ctx.led_seq_no)
if (max_lcl_seqno > lcl_seq_no)
{
if (!appbill_balance_exceeded)
{
@@ -548,13 +523,13 @@ namespace cons
}
}
p2p::proposal create_stage0_proposal()
p2p::proposal create_stage0_proposal(std::string_view lcl)
{
// The proposal we are going to emit in stage 0.
p2p::proposal stg_prop;
stg_prop.time = ctx.time_now;
stg_prop.stage = 0;
stg_prop.lcl = ctx.lcl;
stg_prop.lcl = lcl;
stg_prop.state = ctx.state;
// Populate the proposal with set of candidate user pubkeys.
@@ -577,7 +552,7 @@ namespace cons
return stg_prop;
}
p2p::proposal create_stage123_proposal(vote_counter &votes)
p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl)
{
// The proposal to be emited at the end of this stage.
p2p::proposal stg_prop;
@@ -586,7 +561,7 @@ namespace cons
// we always vote for our current lcl and state regardless of what other peers are saying
// if there's a fork condition we will either request history and state from
// our peers or we will halt depending on level of consensus on the sides of the fork
stg_prop.lcl = ctx.lcl;
stg_prop.lcl = lcl;
stg_prop.state = ctx.state;
// Vote for rest of the proposal fields by looking at candidate proposals.
@@ -677,7 +652,7 @@ namespace cons
/**
* Check our LCL is consistent with the proposals being made by our UNL peers lcl_votes.
*/
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes)
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes, std::string_view lcl)
{
int32_t total_lcl_votes = 0;
@@ -710,14 +685,10 @@ namespace cons
//if winning lcl is not matched node lcl,
//that means vote is not on the consensus ledger.
//Should request history from a peer.
if (ctx.lcl != majority_lcl)
if (lcl != majority_lcl)
{
LOG_DEBUG << "We are not on the consensus ledger, requesting history from a random peer";
is_desync = true;
//Node is not in sync with current lcl ->switch to observer mode.
conf::change_operating_mode(conf::OPERATING_MODE::OBSERVER);
should_request_history = true;
return;
}
@@ -767,11 +738,11 @@ namespace cons
switch (stage)
{
case 1:
return cons::STAGE1_THRESHOLD * conf::cfg.unl.size();
return STAGE1_THRESHOLD * conf::cfg.unl.size();
case 2:
return cons::STAGE2_THRESHOLD * conf::cfg.unl.size();
return STAGE2_THRESHOLD * conf::cfg.unl.size();
case 3:
return cons::STAGE3_THRESHOLD * conf::cfg.unl.size();
return STAGE3_THRESHOLD * conf::cfg.unl.size();
}
return -1;
}
@@ -780,18 +751,17 @@ namespace cons
* Finalize the ledger after consensus.
* @param cons_prop The proposal that reached consensus.
*/
int apply_ledger(const p2p::proposal &cons_prop)
int apply_ledger(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl)
{
const std::tuple<const uint64_t, std::string> new_lcl = save_ledger(cons_prop);
ctx.led_seq_no = std::get<0>(new_lcl);
ctx.lcl = std::get<1>(new_lcl);
if (ledger::save_ledger(cons_prop) == -1)
return -1;
// After the current ledger seq no is updated, we remove any newly expired inputs from candidate set.
{
auto itr = ctx.candidate_user_inputs.begin();
while (itr != ctx.candidate_user_inputs.end())
{
if (itr->second.maxledgerseqno <= ctx.led_seq_no)
if (itr->second.maxledgerseqno <= lcl_seq_no)
ctx.candidate_user_inputs.erase(itr++);
else
++itr;
@@ -799,13 +769,13 @@ namespace cons
}
// Send any output from the previous consensus round to locally connected users.
dispatch_user_outputs(cons_prop);
dispatch_user_outputs(cons_prop, lcl_seq_no, lcl);
// Execute the contract
{
sc::contract_execution_args &args = ctx.contract_ctx.args;
args.time = cons_prop.time;
args.lcl = ctx.lcl;
args.lcl = lcl;
// Feed NPL messages.
args.npl_messages.splice(args.npl_messages.end(), ctx.candidate_npl_messages);
@@ -822,7 +792,7 @@ namespace cons
ctx.state = args.post_execution_state_hash;
extract_user_outputs_from_contract_bufmap(args.userbufs);
broadcast_npl_output(args.npl_output);
broadcast_npl_output(args.npl_output, lcl);
sc::clear_args(args);
}
@@ -833,7 +803,7 @@ namespace cons
* Dispatch any consensus-reached outputs to matching users if they are connected to us locally.
* @param cons_prop The proposal that achieved consensus.
*/
void dispatch_user_outputs(const p2p::proposal &cons_prop)
void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl)
{
std::scoped_lock<std::mutex> lock(usr::ctx.users_mutex);
@@ -866,7 +836,7 @@ namespace cons
msg::usrmsg::usrmsg_parser parser(user.protocol);
std::vector<uint8_t> msg;
parser.create_contract_output_container(msg, outputtosend);
parser.create_contract_output_container(msg, outputtosend, lcl_seq_no, lcl);
user.session.send(msg);
}
@@ -941,12 +911,12 @@ namespace cons
}
}
void broadcast_npl_output(std::string &output)
void broadcast_npl_output(std::string &output, std::string_view lcl)
{
if (!output.empty())
{
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_npl_output(fbuf, output, ctx.lcl);
p2pmsg::create_msg_from_npl_output(fbuf, output, lcl);
p2p::broadcast_message(fbuf, true);
}
}
@@ -986,4 +956,4 @@ namespace cons
ctx.state = new_state;
}
} // namespace cons
} // namespace consensus

View File

@@ -1,16 +1,15 @@
#ifndef _HP_CONS_
#define _HP_CONS_
#include "../pchheader.hpp"
#include "../util.hpp"
#include "../sc.hpp"
#include "../p2p/p2p.hpp"
#include "../usr/user_input.hpp"
#include "../hpfs/h32.hpp"
#include "../sc.hpp"
#include "ledger_handler.hpp"
#include "pchheader.hpp"
#include "util.hpp"
#include "sc.hpp"
#include "p2p/p2p.hpp"
#include "usr/user_input.hpp"
#include "hpfs/h32.hpp"
#include "sc.hpp"
namespace cons
namespace consensus
{
/**
* Represents a contract input that takes part in consensus.
@@ -71,18 +70,8 @@ namespace cons
uint8_t stage = 0;
uint64_t time_now = 0;
std::string lcl;
uint64_t led_seq_no = 0;
hpfs::h32 state = hpfs::h32_empty;
//Map of closed ledgers(only lrdgername[sequnece_number-hash], state hash) with sequence number as map key.
//contains closed ledgers from latest to latest - MAX_LEDGER_SEQUENCE.
//this is loaded when node started and updated throughout consensus - delete ledgers that falls behind MAX_LEDGER_SEQUENCE range.
//We will use this to track lcls related logic.- track state, lcl request, response.
std::map<uint64_t, ledger_cache_entry> ledger_cache;
std::string last_requested_lcl;
//ledger close time of previous hash
uint16_t stage_time = 0; // Time allocated to a consensus stage.
uint16_t stage_reset_wait_threshold = 0; // Minimum stage wait time to reset the stage.
@@ -90,6 +79,8 @@ namespace cons
sc::execution_context contract_ctx;
bool is_shutting_down = false;
std::thread consensus_thread;
consensus_context()
: recent_userinput_hashes(200)
{
@@ -106,8 +97,6 @@ namespace cons
std::map<hpfs::h32, int32_t> state;
};
extern consensus_context ctx;
int init();
void deinit();
@@ -124,17 +113,17 @@ namespace cons
void broadcast_nonunl_proposal();
void verify_and_populate_candidate_user_inputs();
void verify_and_populate_candidate_user_inputs(const uint64_t lcl_seq_no);
bool verify_appbill_check(std::string_view pubkey, const size_t input_len);
p2p::proposal create_stage0_proposal();
p2p::proposal create_stage0_proposal(std::string_view lcl);
p2p::proposal create_stage123_proposal(vote_counter &votes);
p2p::proposal create_stage123_proposal(vote_counter &votes, std::string_view lcl);
void broadcast_proposal(const p2p::proposal &p);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes);
void check_lcl_votes(bool &is_desync, bool &should_request_history, std::string &majority_lcl, vote_counter &votes, std::string_view lcl);
void check_state_votes(bool &is_desync, hpfs::h32 &majority_state, vote_counter &votes);
@@ -146,15 +135,15 @@ namespace cons
uint64_t get_stage_time_resolution(const uint64_t time);
int apply_ledger(const p2p::proposal &proposal);
int apply_ledger(const p2p::proposal &proposal, const uint64_t lcl_seq_no, std::string_view lcl);
void dispatch_user_outputs(const p2p::proposal &cons_prop);
void dispatch_user_outputs(const p2p::proposal &cons_prop, const uint64_t lcl_seq_no, std::string_view lcl);
void feed_user_inputs_to_contract_bufmap(sc::contract_bufmap_t &bufmap, const p2p::proposal &cons_prop);
void extract_user_outputs_from_contract_bufmap(sc::contract_bufmap_t &bufmap);
void broadcast_npl_output(std::string &output);
void broadcast_npl_output(std::string &output, std::string_view lcl);
template <typename T>
void increment(std::map<T, int32_t> &counter, const T &candidate);
@@ -163,6 +152,6 @@ namespace cons
void on_state_sync_completion(const hpfs::h32 new_state);
} // namespace cons
} // namespace consensus
#endif

619
src/ledger.cpp Normal file
View File

@@ -0,0 +1,619 @@
#include "pchheader.hpp"
#include "conf.hpp"
#include "crypto.hpp"
#include "p2p/p2p.hpp"
#include "msg/fbuf/common_helpers.hpp"
#include "msg/fbuf/ledger_helpers.hpp"
#include "msg/fbuf/p2pmsg_helpers.hpp"
#include "hplog.hpp"
#include "ledger.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
namespace ledger
{
constexpr int FILE_PERMS = 0644;
constexpr uint64_t MAX_LEDGER_SEQUENCE = 200; // Max ledger count.
constexpr uint16_t SYNCER_IDLE_WAIT = 20; // lcl syncer loop sleep time (milliseconds).
ledger_context ctx;
sync_context sync_ctx;
bool init_success = false;
/**
* Retrieve ledger history information from persisted ledgers.
*/
int init()
{
// Get all records at lcl history direcory and find the last closed ledger.
for (const auto &entry : util::fetch_dir_entries(conf::ctx.hist_dir))
{
const std::string file_path = conf::ctx.hist_dir + "/" + entry.d_name;
if (util::is_dir_exists(file_path))
{
LOG_ERROR << "Found directory " << entry.d_name << " in " << conf::ctx.hist_dir << ". There should be no folders in this directory.";
return -1;
}
else
{
const std::string_view extension = util::fetch_file_extension(file_path);
const std::string file_name(util::remove_file_extension(entry.d_name));
if (extension != ".lcl")
{
LOG_ERROR << "Found invalid file extension: " << extension << " for lcl file " << entry.d_name << " in " << conf::ctx.hist_dir;
return -1;
}
const size_t pos = file_name.find("-");
if (pos != std::string::npos)
{
std::vector<uint8_t> buffer;
if (read_ledger(file_path, buffer) == -1)
return -1;
if (!msg::fbuf::ledger::verify_ledger_buffer(buffer.data(), buffer.size()))
{
LOG_ERROR << "Ledger data verification failed. " << file_name;
return -1;
}
const uint64_t seq_no = std::stoull(file_name.substr(0, pos));
ctx.cache.emplace(seq_no, std::move(file_name)); // cache -> [seq_no - hash]
}
else
{
// lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format.
LOG_ERROR << "Invalid lcl file name: " << file_name;
return -1;
}
}
}
// Check if there is a saved lcl file -> if no send genesis lcl.
if (ctx.cache.empty())
{
ctx.set_lcl(0, GENESIS_LEDGER);
}
else
{
const auto last_ledger = ctx.cache.rbegin();
ctx.set_lcl(last_ledger->first, last_ledger->second);
const uint64_t seq_no = ctx.get_seq_no();
// Remove old ledgers that exceeds max sequence range.
if (seq_no > MAX_LEDGER_SEQUENCE)
remove_old_ledgers(seq_no - MAX_LEDGER_SEQUENCE);
}
sync_ctx.lcl_sync_thread = std::thread(lcl_syncer_loop);
init_success = true;
return 0;
}
void deinit()
{
if (init_success)
{
sync_ctx.is_shutting_down = true;
sync_ctx.lcl_sync_thread.join();
}
}
void set_sync_target(std::string_view target_lcl)
{
if (sync_ctx.is_shutting_down)
return;
{
std::scoped_lock<std::mutex> lock(sync_ctx.target_lcl_mutex);
sync_ctx.target_lcl = target_lcl;
}
const std::string lcl = ctx.get_lcl();
LOG_INFO << "lcl sync: Syncing for target:" << sync_ctx.target_lcl.substr(0, 15) << " (current:" << lcl.substr(0, 15) << ")";
// Request history from a random peer if needed.
// If target is genesis ledger, we simply clear our ledger history without sending a
// history request.
if (target_lcl != GENESIS_LEDGER)
send_ledger_history_request(lcl, target_lcl);
}
/**
* Runs the lcl sync worker loop.
*/
void lcl_syncer_loop()
{
util::mask_signal();
LOG_INFO << "lcl sync: Worker started.";
std::list<std::pair<std::string, p2p::history_request>> history_requests;
std::list<p2p::history_response> history_responses;
while (!sync_ctx.is_shutting_down)
{
util::sleep(SYNCER_IDLE_WAIT);
const std::string lcl = ctx.get_lcl();
// Move over the collected sync items to the local lists.
{
std::scoped_lock<std::mutex>(sync_ctx.list_mutex);
history_requests.splice(history_requests.end(), sync_ctx.collected_history_requests);
history_responses.splice(history_responses.end(), sync_ctx.collected_history_responses);
}
// Process any target lcl sync activities.
{
std::scoped_lock<std::mutex> lock(sync_ctx.target_lcl_mutex);
if (!sync_ctx.target_lcl.empty())
{
if (sync_ctx.target_lcl == GENESIS_LEDGER)
{
clear_ledger();
sync_ctx.target_lcl.clear();
}
else
{
// Only process the first successful item which matches with our current lcl.
for (const p2p::history_response &hr : history_responses)
{
if (hr.requester_lcl == lcl && handle_ledger_history_response(hr) != -1)
{
sync_ctx.target_lcl.clear();
break;
}
}
}
}
history_responses.clear();
}
// Serve any history requests from other nodes.
{
// Acquire lock so consensus does not update the ledger while we are reading the ledger.
std::scoped_lock<std::mutex> ledger_lock(ctx.ledger_mutex);
for (const auto &[session_id, hr] : history_requests)
{
// First check whether we have the required lcl available.
if (!check_required_lcl_availability(hr.required_lcl))
continue;
p2p::history_response resp;
if (ledger::retrieve_ledger_history(hr, resp) != -1)
{
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_response(fbuf, resp);
std::string_view msg = msg::fbuf::flatbuff_bytes_to_sv(fbuf.GetBufferPointer(), fbuf.GetSize());
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
const auto peer_itr = p2p::ctx.peer_connections.find(session_id);
if (peer_itr != p2p::ctx.peer_connections.end())
{
comm::comm_session *session = peer_itr->second;
session->send(msg);
}
}
}
history_requests.clear();
}
}
LOG_INFO << "lcl sync: Worker stopped.";
}
/**
* Returns the current top ledger seq no and lcl.
*/
const std::pair<uint64_t, std::string> get_ledger_cache_top()
{
const auto latest_lcl_itr = ctx.cache.rbegin();
if (latest_lcl_itr == ctx.cache.rend())
return std::make_pair(0, GENESIS_LEDGER);
else
return std::make_pair(latest_lcl_itr->first, latest_lcl_itr->second);
}
/**
* Create and save ledger from the given proposal message. Called by consensus.
* @param proposal Consensus-reached Stage 3 proposal.
*/
int save_ledger(const p2p::proposal &proposal)
{
const size_t pos = proposal.lcl.find("-");
uint64_t seq_no = 0;
if (pos != std::string::npos)
{
seq_no = std::stoull(proposal.lcl.substr(0, pos)); // Get lcl sequence number.
seq_no++; // New lcl sequence number.
}
else
{
// lcl records should follow [ledger sequnce numer]-lcl[lcl hex] format.
LOG_ERROR << "Invalid lcl name: " << proposal.lcl << " when saving ledger.";
return -1;
}
// Serialize lcl using flatbuffer ledger schema.
flatbuffers::FlatBufferBuilder builder(1024);
msg::fbuf::ledger::create_ledger_from_proposal(builder, proposal, seq_no);
// Get binary hash of the the serialized lcl.
std::string_view ledger_str_buf = msg::fbuf::flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize());
const std::string lcl = crypto::get_hash(ledger_str_buf);
// Get hex from binary hash.
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
// Acquire lock so history request serving does not access the ledger while consensus is updating the ledger.
std::scoped_lock<std::mutex> ledger_lock(ctx.ledger_mutex);
// Construct lcl file name.
// lcl file name should follow [ledger sequnce numer]-lcl[lcl hex] format.
const std::string file_name = std::to_string(seq_no) + "-" + lcl_hash;
if (write_ledger(file_name, builder.GetBufferPointer(), builder.GetSize()) == -1)
return -1;
ctx.set_lcl(seq_no, file_name);
ctx.cache.emplace(seq_no, std::move(file_name));
//Remove old ledgers that exceeds max sequence range.
if (seq_no > MAX_LEDGER_SEQUENCE)
remove_old_ledgers(seq_no - MAX_LEDGER_SEQUENCE);
return 0;
}
/**
* Remove old ledgers that exceeds max sequence range from file system and ledger history cache.
* @param led_seq_no minimum sequence number to be in history.
*/
void remove_old_ledgers(const uint64_t led_seq_no)
{
std::map<uint64_t, const std::string>::iterator itr;
for (itr = ctx.cache.begin();
itr != ctx.cache.lower_bound(led_seq_no + 1);
itr++)
{
const std::string file_path = conf::ctx.hist_dir + "/" + itr->second + ".lcl";
if (util::is_file_exists(file_path))
util::remove_file(file_path);
}
if (!ctx.cache.empty())
ctx.cache.erase(ctx.cache.begin(), ctx.cache.lower_bound(led_seq_no + 1));
}
/**
* Clears out entire ledger history.
*/
void clear_ledger()
{
util::clear_directory(conf::ctx.hist_dir);
ctx.cache.clear();
ctx.set_lcl(0, GENESIS_LEDGER);
}
/**
* Reads the specified ledger entry.
* @param file_path File path to read.
* @param buffer Buffer to populate with file contents.
* @return 0 on success. -1 on failure.
*/
int read_ledger(std::string_view file_path, std::vector<uint8_t> &buffer)
{
const int fd = open(file_path.data(), O_RDONLY);
if (fd == -1)
{
LOG_ERROR << errno << ": Error opening ledger file for read. " << file_path;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
LOG_ERROR << errno << ": Error in ledger file stat. " << file_path;
return -1;
}
buffer.resize(st.st_size);
if (read(fd, buffer.data(), buffer.size()) == -1)
{
LOG_ERROR << errno << ": Error reading ledger file. " << file_path;
return -1;
}
return 0;
}
/**
* Write ledger to file system.
* @param file_name current ledger sequence number.
* @param ledger_raw raw lcl data.
* @param ledger_size size of the raw lcl data.
*/
int write_ledger(const std::string &file_name, const uint8_t *ledger_raw, const size_t ledger_size)
{
// Create file path to save ledger.
// file name -> [ledger sequnce numer]-[lcl hex]
const std::string file_path = conf::ctx.hist_dir + "/" + file_name + ".lcl";
// Write ledger to file system
const int fd = open(file_path.data(), O_CREAT | O_RDWR, FILE_PERMS);
if (fd == -1)
{
LOG_ERROR << errno << ": Error creating ledger file. " << file_path;
return -1;
}
if (write(fd, ledger_raw, ledger_size) == -1)
{
LOG_ERROR << errno << ": Error writing to new ledger file. " << file_path;
close(fd);
return -1;
}
close(fd);
return 0;
}
/**
* Delete ledger from file system.
* @param file_name name of ledger to be deleted.
*/
void remove_ledger(const std::string &file_name)
{
std::string file_path;
file_path.reserve(conf::ctx.hist_dir.size() + file_name.size() + 5);
file_path.append(conf::ctx.hist_dir)
.append("/")
.append(file_name)
.append(".lcl");
util::remove_file(file_path);
}
/**
* Create and send ledger history request to random node from unl list.
* @param minimum_lcl hash of the minimum lcl from which node need lcl history.
* @param required_lcl hash of the required lcl.
*/
void send_ledger_history_request(std::string_view minimum_lcl, std::string_view required_lcl)
{
p2p::history_request hr;
hr.required_lcl = required_lcl;
hr.minimum_lcl = minimum_lcl;
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_request(fbuf, hr);
p2p::send_message_to_random_peer(fbuf);
LOG_DEBUG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15);
}
/**
* Check requested lcl is in node's lcl history cache.
* @param hr lcl history request information.
* @return true if requested lcl is in lcl history cache.
*/
bool check_required_lcl_availability(const std::string &required_lcl)
{
size_t pos = required_lcl.find("-");
uint64_t req_seq_no = 0;
// Get sequence number of required lcl
if (pos != std::string::npos)
{
req_seq_no = std::stoull(required_lcl.substr(0, pos)); // Get required lcl sequence number
}
if (req_seq_no > 0)
{
const auto itr = ctx.cache.find(req_seq_no);
if (itr == ctx.cache.end())
{
LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache.";
// Either this node is also not in consesnsus ledger or other node requesting a lcl that is older than node's current
// minimum lcl sequence becuase of maximum ledger history range.
return false;
}
else if (itr->second != required_lcl)
{
LOG_DEBUG << "Required lcl peer asked for is not in our lcl cache.";
// Either this node or requesting node is in a fork condition.
return false;
}
}
else
{
return false; // Very rare case: Peer asking for the genisis lcl.
}
return true;
}
/**
* Retrieve lcl(last closed ledger) information from ledger history.
* @param hr lcl history request information.
* @param history_response Ledger history response to populate requested ledger details
* @return 0 on success. -1 on failure.
*/
int retrieve_ledger_history(const p2p::history_request &hr, p2p::history_response &history_response)
{
// Get sequence number of minimum lcl required
const size_t pos = hr.minimum_lcl.find("-");
if (pos == std::string::npos)
{
LOG_DEBUG << "lcl serve: Invalid lcl history request. Requested:" << hr.minimum_lcl;
return -1;
}
// We put the requester's own lcl back in the response so they can validate the liveliness of the response.
history_response.requester_lcl = hr.minimum_lcl;
uint64_t min_seq_no = std::stoull(hr.minimum_lcl.substr(0, pos)); // Get required lcl sequence number
const auto itr = ctx.cache.find(min_seq_no);
if (itr != ctx.cache.end()) // Requested minimum lcl is not in our lcl history cache
{
min_seq_no = itr->first;
// Check whether minimum lcl requested is same as this node's.
// Evenhough sequence number are same, lcl hash can be changed if one of node is in a fork condition.
if (hr.minimum_lcl != itr->second)
{
LOG_DEBUG << "lcl serve: Invalid minimum ledger. Requested min lcl:" << hr.minimum_lcl << " Node lcl:" << itr->second;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return 0;
}
}
else if (min_seq_no > ctx.cache.rbegin()->first) //Recieved minimum lcl sequence is ahead of node's lcl sequence.
{
LOG_DEBUG << "lcl serve: Invalid minimum ledger. Recieved minimum seq no is ahead of node current seq no. Requested lcl:" << hr.minimum_lcl;
history_response.error = p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER;
return 0;
}
else
{
LOG_DEBUG << "lcl serve: Minimum lcl peer asked for is not in our lcl cache. Therefore sending from node minimum lcl.";
min_seq_no = ctx.cache.begin()->first;
}
//copy current history cache.
std::map<uint64_t, const std::string> led_cache = ctx.cache;
//filter out cache and get raw files here.
led_cache.erase(
led_cache.begin(),
led_cache.lower_bound(min_seq_no));
//Get raw content of lcls that going to be send.
for (const auto &[seq_no, lcl] : led_cache)
{
p2p::history_ledger ledger;
ledger.lcl = lcl;
// Read lcl file.
const std::string file_path = conf::ctx.hist_dir + "/" + lcl + ".lcl";
if (read_ledger(file_path, ledger.raw_ledger) == -1)
{
LOG_DEBUG << "lcl serve: Error when reading ledger file.";
return -1;
}
history_response.hist_ledgers.emplace(seq_no, std::move(ledger));
}
return 0;
}
/**
* Handle recieved ledger history response.
* @param hr lcl history request information.
* @return 0 on successful lcl update. -1 on failure.
*/
int handle_ledger_history_response(const p2p::history_response &hr)
{
if (hr.error == p2p::LEDGER_RESPONSE_ERROR::INVALID_MIN_LEDGER)
{
// This means we are in a fork ledger. Remove/rollback current top ledger.
// Basically in the long run we'll rolback one by one untill we catch up to valid minimum ledger.
remove_ledger(ctx.get_lcl());
ctx.cache.erase(ctx.cache.rbegin()->first);
const auto [seq_no, lcl] = get_ledger_cache_top();
ctx.set_lcl(seq_no, lcl);
LOG_INFO << "lcl sync: Fork detected. Removed last ledger. New lcl:" << lcl.substr(0, 15);
return 0;
}
else
{
// Check whether recieved lcl history contains the current lcl node required.
bool contains_requested_lcl = false;
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
if (sync_ctx.target_lcl == ledger.lcl)
{
contains_requested_lcl = true;
break;
}
}
if (!contains_requested_lcl)
{
LOG_DEBUG << "lcl sync: Peer sent us a history response but not containing the lcl we asked for.";
return -1;
}
// Check integrity of recieved lcl list.
// By checking recieved lcl hashes matches lcl content by applying hashing for each raw content.
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
const size_t pos = ledger.lcl.find("-");
const std::string rec_lcl_hash = ledger.lcl.substr((pos + 1), (ledger.lcl.size() - 1));
// Get binary hash of the the serialized lcl.
const std::string lcl = crypto::get_hash(ledger.raw_ledger.data(), ledger.raw_ledger.size());
// Get hex from binary hash
std::string lcl_hash;
util::bin2hex(lcl_hash,
reinterpret_cast<const unsigned char *>(lcl.data()),
lcl.size());
// recieved lcl hash and hash generated from recieved lcl content doesn't match -> abandon applying it
if (lcl_hash != rec_lcl_hash)
{
LOG_DEBUG << "lcl sync: Peer sent us a history response but the ledger data does not match the hashes.";
// todo: we should penalize peer who sent this.
return -1;
}
}
}
// Execution to here means the history data sent checks out.
// Save recieved lcl in file system and update lcl history cache.
for (auto &[seq_no, ledger] : hr.hist_ledgers)
{
auto prev_dup_itr = ctx.cache.find(seq_no);
if (prev_dup_itr != ctx.cache.end())
{
remove_ledger(prev_dup_itr->second);
ctx.cache.erase(prev_dup_itr);
}
write_ledger(ledger.lcl, ledger.raw_ledger.data(), ledger.raw_ledger.size());
ctx.cache.emplace(seq_no, ledger.lcl);
}
const auto [seq_no, lcl] = get_ledger_cache_top();
ctx.set_lcl(seq_no, lcl);
LOG_INFO << "lcl sync: Sync complete. New lcl:" << lcl.substr(0, 15);
return 0;
}
} // namespace ledger

97
src/ledger.hpp Normal file
View File

@@ -0,0 +1,97 @@
#ifndef _HP_LEDGER_
#define _HP_LEDGER_
#include "pchheader.hpp"
#include "p2p/p2p.hpp"
namespace ledger
{
constexpr const char *GENESIS_LEDGER = "0-genesis";
struct sync_context
{
// The current target lcl that we are syncing towards.
std::string target_lcl;
std::mutex target_lcl_mutex;
// Lists holding history requests and responses collected from incoming p2p messages.
std::list<std::pair<std::string, p2p::history_request>> collected_history_requests;
std::list<p2p::history_response> collected_history_responses;
std::mutex list_mutex;
std::thread lcl_sync_thread;
bool is_shutting_down = false;
};
struct ledger_context
{
private:
std::string lcl;
uint64_t seq_no = 0;
std::shared_mutex lcl_mutex;
public:
// Map of closed ledgers (lcl string) with sequence number as map key.
// Contains closed ledgers from oldest to latest - MAX_LEDGER_SEQUENCE.
// This is loaded when node started and updated throughout consensus.
// Deletes ledgers that falls behind MAX_LEDGER_SEQUENCE range.
std::map<uint64_t, const std::string> cache;
std::mutex ledger_mutex;
const std::string get_lcl()
{
std::shared_lock lock(lcl_mutex);
return lcl;
}
uint64_t get_seq_no()
{
std::shared_lock lock(lcl_mutex);
return seq_no;
}
void set_lcl(const uint64_t new_seq_no, std::string_view new_lcl)
{
std::unique_lock lock(lcl_mutex);
lcl = new_lcl;
seq_no = new_seq_no;
}
};
extern sync_context sync_ctx;
extern ledger_context ctx;
int init();
void deinit();
void lcl_syncer_loop();
void set_sync_target(std::string_view target_lcl);
const std::pair<uint64_t, std::string> get_ledger_cache_top();
int save_ledger(const p2p::proposal &proposal);
void remove_old_ledgers(const uint64_t led_seq_no);
void clear_ledger();
int read_ledger(std::string_view file_path, std::vector<uint8_t> &buffer);
int write_ledger(const std::string &file_name, const uint8_t *ledger_raw, const size_t ledger_size);
void remove_ledger(const std::string &file_name);
void send_ledger_history_request(std::string_view minimum_lcl, std::string_view required_lcl);
bool check_required_lcl_availability(const std::string &required_lcl);
int retrieve_ledger_history(const p2p::history_request &hr, p2p::history_response &history_response);
int handle_ledger_history_response(const p2p::history_response &hr);
} // namespace ledger
#endif

View File

@@ -11,7 +11,8 @@
#include "usr/usr.hpp"
#include "usr/read_req.hpp"
#include "p2p/p2p.hpp"
#include "cons/cons.hpp"
#include "consensus.hpp"
#include "ledger.hpp"
#include "hpfs/hpfs.hpp"
#include "state/state_sync.hpp"
#include "state/state_serve.hpp"
@@ -67,7 +68,8 @@ int parse_cmd(int argc, char **argv)
*/
void deinit()
{
cons::deinit();
consensus::deinit();
ledger::deinit();
state_sync::deinit();
state_serve::deinit();
read_req::deinit();
@@ -191,7 +193,7 @@ int main(int argc, char **argv)
LOG_INFO << "Public key: " << conf::cfg.pubkeyhex.substr(2); // Public key without 'ed' prefix.
if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || read_req::init() != 0 ||
state_serve::init() != 0 || state_sync::init() != 0 || cons::init() != 0)
state_serve::init() != 0 || state_sync::init() != 0 || ledger::init() || consensus::init() != 0)
{
deinit();
return -1;
@@ -201,7 +203,7 @@ int main(int argc, char **argv)
signal(SIGINT, &sigint_handler);
// Wait until consensus thread finishes.
cons::wait();
consensus::wait();
// deinit() here only gets called when there is an error in consensus.
// If not deinit in the sigint handler is called when a SIGINT is received.

View File

@@ -1,6 +1,5 @@
#include "../../pchheader.hpp"
#include "../../util.hpp"
#include "../../cons/cons.hpp"
#include "../../hplog.hpp"
#include "../usrmsg_common.hpp"
#include "usrmsg_bson.hpp"
@@ -17,16 +16,16 @@ namespace msg::usrmsg::bson
* "lcl_seqno": <integer>
* }
*/
void create_status_response(std::vector<uint8_t> &msg)
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_STAT_RESPONSE);
encoder.key(msg::usrmsg::FLD_LCL);
encoder.string_value(cons::ctx.lcl);
encoder.string_value(lcl);
encoder.key(msg::usrmsg::FLD_LCL_SEQ);
encoder.int64_value(cons::ctx.led_seq_no);
encoder.int64_value(lcl_seq_no);
encoder.end_object();
encoder.flush();
}
@@ -95,16 +94,16 @@ namespace msg::usrmsg::bson
* }
* @param content The contract binary output content to be put in the message.
*/
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content)
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl)
{
jsoncons::bson::bson_bytes_encoder encoder(msg);
encoder.begin_object();
encoder.key(msg::usrmsg::FLD_TYPE);
encoder.string_value(msg::usrmsg::MSGTYPE_CONTRACT_OUTPUT);
encoder.key(msg::usrmsg::FLD_LCL);
encoder.string_value(cons::ctx.lcl);
encoder.string_value(lcl);
encoder.key(msg::usrmsg::FLD_LCL_SEQ);
encoder.int64_value(cons::ctx.led_seq_no);
encoder.int64_value(lcl_seq_no);
encoder.key(msg::usrmsg::FLD_CONTENT);
encoder.byte_string_value(content);
encoder.end_object();

View File

@@ -8,14 +8,14 @@ namespace msg::usrmsg::bson
void create_user_challenge(std::vector<uint8_t> &msg, std::string &challengehex);
void create_status_response(std::vector<uint8_t> &msg);
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl);
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_sig);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl);
int verify_user_handshake_response(std::string &extracted_pubkeyhex, std::string &extracted_protocol,
std::string_view response, std::string_view original_challenge);

View File

@@ -7,25 +7,30 @@
namespace msg::fbuf::ledger
{
/**
/**
* Create ledger from the given proposal struct.
* @param p The proposal struct to be placed in ledger.
*/
const std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no)
{
flatbuffers::Offset<ledger::Ledger> ledger =
ledger::CreateLedger(
builder,
seq_no,
p.time,
sv_to_flatbuff_bytes(builder, p.lcl),
sv_to_flatbuff_bytes(builder, p.state.to_string_view()),
stringlist_to_flatbuf_bytearrayvector(builder, p.users),
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs),
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs));
void create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no)
{
flatbuffers::Offset<ledger::Ledger> ledger =
ledger::CreateLedger(
builder,
seq_no,
p.time,
sv_to_flatbuff_bytes(builder, p.lcl),
sv_to_flatbuff_bytes(builder, p.state.to_string_view()),
stringlist_to_flatbuf_bytearrayvector(builder, p.users),
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_inputs),
stringlist_to_flatbuf_bytearrayvector(builder, p.hash_outputs));
builder.Finish(ledger); // Finished building message content to get serialised content.
builder.Finish(ledger); // Finished building message content to get serialised content.
}
bool verify_ledger_buffer(const uint8_t *ledger_buf_ptr, const size_t buf_len)
{
flatbuffers::Verifier ledger_verifier(ledger_buf_ptr, buf_len);
return VerifyLedgerBuffer(ledger_verifier);
}
return flatbuff_bytes_to_sv(builder.GetBufferPointer(), builder.GetSize());
}
} // namespace msg::fbuf::ledger

View File

@@ -8,7 +8,10 @@
namespace msg::fbuf::ledger
{
const std::string_view create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no);
}
void create_ledger_from_proposal(flatbuffers::FlatBufferBuilder &builder, const p2p::proposal &p, const uint64_t seq_no);
bool verify_ledger_buffer(const uint8_t *ledger_buf_ptr, const size_t buf_len);
} // namespace msg::fbuf::ledger
#endif

View File

@@ -60,6 +60,7 @@ enum Ledger_Response_Error : ubyte
}
table History_Response_Message { //Ledger History request type message schema
requester_lcl:[ubyte];
hist_ledgers:[HistoryLedgerPair];
error: Ledger_Response_Error;
}
@@ -70,7 +71,6 @@ table HistoryLedgerPair { //A key, value pair of byte[].
}
table HistoryLedger {
state:[ubyte];
lcl:[ubyte];
raw_ledger:[ubyte];
}

View File

@@ -974,9 +974,16 @@ inline flatbuffers::Offset<History_Request_Message> CreateHistory_Request_Messag
struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef History_Response_MessageBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_HIST_LEDGERS = 4,
VT_ERROR = 6
VT_REQUESTER_LCL = 4,
VT_HIST_LEDGERS = 6,
VT_ERROR = 8
};
const flatbuffers::Vector<uint8_t> *requester_lcl() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_REQUESTER_LCL);
}
flatbuffers::Vector<uint8_t> *mutable_requester_lcl() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_REQUESTER_LCL);
}
const flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>> *hist_ledgers() const {
return GetPointer<const flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>> *>(VT_HIST_LEDGERS);
}
@@ -991,6 +998,8 @@ struct History_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::T
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_REQUESTER_LCL) &&
verifier.VerifyVector(requester_lcl()) &&
VerifyOffset(verifier, VT_HIST_LEDGERS) &&
verifier.VerifyVector(hist_ledgers()) &&
verifier.VerifyVectorOfTables(hist_ledgers()) &&
@@ -1003,6 +1012,9 @@ struct History_Response_MessageBuilder {
typedef History_Response_Message Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_requester_lcl(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> requester_lcl) {
fbb_.AddOffset(History_Response_Message::VT_REQUESTER_LCL, requester_lcl);
}
void add_hist_ledgers(flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>>> hist_ledgers) {
fbb_.AddOffset(History_Response_Message::VT_HIST_LEDGERS, hist_ledgers);
}
@@ -1022,21 +1034,26 @@ struct History_Response_MessageBuilder {
inline flatbuffers::Offset<History_Response_Message> CreateHistory_Response_Message(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> requester_lcl = 0,
flatbuffers::Offset<flatbuffers::Vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>>> hist_ledgers = 0,
msg::fbuf::p2pmsg::Ledger_Response_Error error = msg::fbuf::p2pmsg::Ledger_Response_Error_None) {
History_Response_MessageBuilder builder_(_fbb);
builder_.add_hist_ledgers(hist_ledgers);
builder_.add_requester_lcl(requester_lcl);
builder_.add_error(error);
return builder_.Finish();
}
inline flatbuffers::Offset<History_Response_Message> CreateHistory_Response_MessageDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<uint8_t> *requester_lcl = nullptr,
const std::vector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>> *hist_ledgers = nullptr,
msg::fbuf::p2pmsg::Ledger_Response_Error error = msg::fbuf::p2pmsg::Ledger_Response_Error_None) {
auto requester_lcl__ = requester_lcl ? _fbb.CreateVector<uint8_t>(*requester_lcl) : 0;
auto hist_ledgers__ = hist_ledgers ? _fbb.CreateVector<flatbuffers::Offset<msg::fbuf::p2pmsg::HistoryLedgerPair>>(*hist_ledgers) : 0;
return msg::fbuf::p2pmsg::CreateHistory_Response_Message(
_fbb,
requester_lcl__,
hist_ledgers__,
error);
}
@@ -1102,16 +1119,9 @@ inline flatbuffers::Offset<HistoryLedgerPair> CreateHistoryLedgerPair(
struct HistoryLedger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HistoryLedgerBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_STATE = 4,
VT_LCL = 6,
VT_RAW_LEDGER = 8
VT_LCL = 4,
VT_RAW_LEDGER = 6
};
const flatbuffers::Vector<uint8_t> *state() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_STATE);
}
flatbuffers::Vector<uint8_t> *mutable_state() {
return GetPointer<flatbuffers::Vector<uint8_t> *>(VT_STATE);
}
const flatbuffers::Vector<uint8_t> *lcl() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_LCL);
}
@@ -1126,8 +1136,6 @@ struct HistoryLedger FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
bool Verify(flatbuffers::Verifier &verifier) const {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_STATE) &&
verifier.VerifyVector(state()) &&
VerifyOffset(verifier, VT_LCL) &&
verifier.VerifyVector(lcl()) &&
VerifyOffset(verifier, VT_RAW_LEDGER) &&
@@ -1140,9 +1148,6 @@ struct HistoryLedgerBuilder {
typedef HistoryLedger Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_state(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> state) {
fbb_.AddOffset(HistoryLedger::VT_STATE, state);
}
void add_lcl(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lcl) {
fbb_.AddOffset(HistoryLedger::VT_LCL, lcl);
}
@@ -1162,27 +1167,22 @@ struct HistoryLedgerBuilder {
inline flatbuffers::Offset<HistoryLedger> CreateHistoryLedger(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> state = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> lcl = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> raw_ledger = 0) {
HistoryLedgerBuilder builder_(_fbb);
builder_.add_raw_ledger(raw_ledger);
builder_.add_lcl(lcl);
builder_.add_state(state);
return builder_.Finish();
}
inline flatbuffers::Offset<HistoryLedger> CreateHistoryLedgerDirect(
flatbuffers::FlatBufferBuilder &_fbb,
const std::vector<uint8_t> *state = nullptr,
const std::vector<uint8_t> *lcl = nullptr,
const std::vector<uint8_t> *raw_ledger = nullptr) {
auto state__ = state ? _fbb.CreateVector<uint8_t>(*state) : 0;
auto lcl__ = lcl ? _fbb.CreateVector<uint8_t>(*lcl) : 0;
auto raw_ledger__ = raw_ledger ? _fbb.CreateVector<uint8_t>(*raw_ledger) : 0;
return msg::fbuf::p2pmsg::CreateHistoryLedger(
_fbb,
state__,
lcl__,
raw_ledger__);
}

View File

@@ -201,6 +201,9 @@ namespace msg::fbuf::p2pmsg
{
p2p::history_response hr;
if (msg.requester_lcl())
hr.requester_lcl = flatbuff_bytes_to_sv(msg.requester_lcl());
if (msg.hist_ledgers())
hr.hist_ledgers = flatbuf_historyledgermap_to_historyledgermap(msg.hist_ledgers());
@@ -427,6 +430,7 @@ namespace msg::fbuf::p2pmsg
flatbuffers::Offset<History_Response_Message> hrmsg =
CreateHistory_Response_Message(
builder,
sv_to_flatbuff_bytes(builder, hr.requester_lcl),
historyledgermap_to_flatbuf_historyledgermap(builder, hr.hist_ledgers),
(Ledger_Response_Error)hr.error);
@@ -692,7 +696,6 @@ namespace msg::fbuf::p2pmsg
{
flatbuffers::Offset<HistoryLedger> history_ledger = CreateHistoryLedger(
builder,
sv_to_flatbuff_bytes(builder, ledger.state),
sv_to_flatbuff_bytes(builder, ledger.lcl),
builder.CreateVector(ledger.raw_ledger));

View File

@@ -1,7 +1,6 @@
#include "../../pchheader.hpp"
#include "../../util.hpp"
#include "../../crypto.hpp"
#include "../../cons/cons.hpp"
#include "../../hplog.hpp"
#include "../usrmsg_common.hpp"
#include "usrmsg_json.hpp"
@@ -73,7 +72,7 @@ namespace msg::usrmsg::json
* "lcl_seqno": <integer>
* }
*/
void create_status_response(std::vector<uint8_t> &msg)
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl)
{
msg.reserve(128);
msg += "{\"";
@@ -83,11 +82,11 @@ namespace msg::usrmsg::json
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_LCL;
msg += SEP_COLON;
msg += cons::ctx.lcl;
msg += lcl;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_LCL_SEQ;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(cons::ctx.led_seq_no);
msg += std::to_string(lcl_seq_no);
msg += "}";
}
@@ -172,7 +171,7 @@ namespace msg::usrmsg::json
* }
* @param content The contract binary output content to be put in the message.
*/
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content)
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl)
{
std::string contenthex;
util::bin2hex(
@@ -188,11 +187,11 @@ namespace msg::usrmsg::json
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_LCL;
msg += SEP_COLON;
msg += cons::ctx.lcl;
msg += lcl;
msg += SEP_COMMA;
msg += msg::usrmsg::FLD_LCL_SEQ;
msg += SEP_COLON_NOQUOTE;
msg += std::to_string(cons::ctx.led_seq_no);
msg += std::to_string(lcl_seq_no);
msg += SEP_COMMA_NOQUOTE;
msg += msg::usrmsg::FLD_CONTENT;
msg += SEP_COLON;

View File

@@ -8,14 +8,14 @@ namespace msg::usrmsg::json
void create_user_challenge(std::vector<uint8_t> &msg, std::string &challengehex);
void create_status_response(std::vector<uint8_t> &msg);
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl);
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status, std::string_view reason,
std::string_view input_sig);
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content);
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl);
int verify_user_handshake_response(std::string &extracted_pubkeyhex, std::string &extracted_protocol,
std::string_view response, std::string_view original_challenge);

View File

@@ -13,12 +13,12 @@ namespace msg::usrmsg
{
}
void usrmsg_parser::create_status_response(std::vector<uint8_t> &msg) const
void usrmsg_parser::create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_status_response(msg);
jusrmsg::create_status_response(msg, lcl_seq_no, lcl);
else
busrmsg::create_status_response(msg);
busrmsg::create_status_response(msg, lcl_seq_no, lcl);
}
void usrmsg_parser::create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status,
@@ -38,12 +38,12 @@ namespace msg::usrmsg
busrmsg::create_contract_read_response_container(msg, content);
}
void usrmsg_parser::create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content) const
void usrmsg_parser::create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) const
{
if (protocol == util::PROTOCOL::JSON)
jusrmsg::create_contract_output_container(msg, content);
jusrmsg::create_contract_output_container(msg, content, lcl_seq_no, lcl);
else
busrmsg::create_contract_output_container(msg, content);
busrmsg::create_contract_output_container(msg, content, lcl_seq_no, lcl);
}
int usrmsg_parser::parse(std::string_view message)

View File

@@ -18,14 +18,14 @@ namespace msg::usrmsg
public:
usrmsg_parser(const util::PROTOCOL protocol);
void create_status_response(std::vector<uint8_t> &msg) const;
void create_status_response(std::vector<uint8_t> &msg, const uint64_t lcl_seq_no, std::string_view lcl) const;
void create_contract_input_status(std::vector<uint8_t> &msg, std::string_view status,
std::string_view reason, std::string_view input_sig) const;
void create_contract_read_response_container(std::vector<uint8_t> &msg, std::string_view content) const;
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content) const;
void create_contract_output_container(std::vector<uint8_t> &msg, std::string_view content, const uint64_t lcl_seq_no, std::string_view lcl) const;
int parse(std::string_view message);

View File

@@ -39,7 +39,6 @@ namespace p2p
struct history_ledger
{
std::string state;
std::string lcl;
std::vector<uint8_t> raw_ledger;
};
@@ -60,6 +59,7 @@ namespace p2p
struct history_response
{
std::string requester_lcl;
std::map<uint64_t, const history_ledger> hist_ledgers;
LEDGER_RESPONSE_ERROR error = LEDGER_RESPONSE_ERROR::NONE;
};

View File

@@ -11,9 +11,8 @@
#include "../comm/comm_client.hpp"
#include "p2p.hpp"
#include "peer_session_handler.hpp"
#include "../cons/ledger_handler.hpp"
#include "../state/state_sync.hpp"
#include "../cons/cons.hpp"
#include "../ledger.hpp"
namespace p2pmsg = msg::fbuf::p2pmsg;
@@ -129,6 +128,7 @@ namespace p2p
{
if (p2pmsg::validate_container_trust(container) != 0)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "NPL message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
return 0;
}
@@ -146,6 +146,7 @@ namespace p2p
{
if (p2pmsg::validate_container_trust(container) != 0)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "State request message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
return 0;
}
@@ -159,6 +160,7 @@ namespace p2p
{
if (p2pmsg::validate_container_trust(container) != 0)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "State response message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
return 0;
}
@@ -175,33 +177,27 @@ namespace p2p
{
if (p2pmsg::validate_container_trust(container) != 0)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "History request message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
return 0;
}
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message());
//first check node has the required lcl available. -> if so send lcl history accordingly.
const bool req_lcl_avail = cons::check_required_lcl_availability(hr);
if (req_lcl_avail)
{
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_history_response(fbuf, cons::retrieve_ledger_history(hr));
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
session.send(msg);
}
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr)));
}
else if (content_message_type == p2pmsg::Message_History_Response_Message) //message is a lcl history response message
{
if (p2pmsg::validate_container_trust(container) != 0)
{
session.increment_metric(comm::SESSION_THRESHOLDS::MAX_BADSIGMSGS_PER_MINUTE, 1);
LOG_DEBUG << "History response message rejected due to trust failure. " << session.uniqueid.substr(0, 10);
return 0;
}
cons::handle_ledger_history_response(
p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message()));
const p2p::history_response hr = p2pmsg::create_history_response_from_msg(*content->message_as_History_Response_Message());
std::scoped_lock<std::mutex> lock(ledger::sync_ctx.list_mutex);
ledger::sync_ctx.collected_history_responses.push_back(std::move(hr));
}
else
{

View File

@@ -11,6 +11,7 @@
#include <fcntl.h>
#include <flatbuffers/flatbuffers.h>
#include <fstream>
#include <ftw.h>
#include <iomanip>
#include <iostream>
#include <jsoncons/json.hpp>
@@ -22,6 +23,7 @@
#include <poll.h>
#include <readerwriterqueue/readerwriterqueue.h>
#include <set>
#include <shared_mutex>
#include <sodium.h>
#include <sstream>
#include <stdlib.h>
@@ -38,6 +40,7 @@
#include <unistd.h>
#include <unordered_map>
#include <unordered_set>
#include <variant>
#include <vector>
#include <blake3.h>
#include <concurrentqueue.h>

View File

@@ -6,7 +6,7 @@
#include "../msg/fbuf/p2pmsg_content_generated.h"
#include "../msg/fbuf/p2pmsg_helpers.hpp"
#include "../msg/fbuf/common_helpers.hpp"
#include "../cons/cons.hpp"
#include "../ledger.hpp"
#include "../hplog.hpp"
#include "state_serve.hpp"
#include "state_common.hpp"
@@ -64,6 +64,7 @@ namespace state_serve
}
const uint64_t time_start = util::get_epoch_milliseconds();
const std::string lcl = ledger::ctx.get_lcl();
for (auto &[session_id, request] : state_requests)
{
@@ -81,7 +82,7 @@ namespace state_serve
const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message());
flatbuffers::FlatBufferBuilder fbuf(1024);
if (state_serve::create_state_response(fbuf, sr) == 1)
if (state_serve::create_state_response(fbuf, sr, lcl) == 1)
{
// Find the peer that we should send the state response to.
std::scoped_lock<std::mutex> lock(p2p::ctx.peer_connections_mutex);
@@ -111,7 +112,7 @@ namespace state_serve
* @return 1 if successful state response was generated. 0 if request is invalid
* and no response was generated. -1 on error.
*/
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr)
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr, std::string_view lcl)
{
LOG_DEBUG << "Serving state req. path:" << sr.parent_path << " block_id:" << sr.block_id;
@@ -136,7 +137,7 @@ namespace state_serve
resp.hash = sr.expected_hash;
resp.data = std::string_view(reinterpret_cast<const char *>(block.data()), block.size());
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, cons::ctx.lcl);
msg::fbuf::p2pmsg::create_msg_from_block_response(fbuf, resp, lcl);
return 1; // Success.
}
}
@@ -158,7 +159,7 @@ namespace state_serve
{
msg::fbuf::p2pmsg::create_msg_from_filehashmap_response(
fbuf, sr.parent_path, block_hashes,
file_length, sr.expected_hash, cons::ctx.lcl);
file_length, sr.expected_hash, lcl);
return 1; // Success.
}
}
@@ -177,7 +178,7 @@ namespace state_serve
else if (result == 1)
{
msg::fbuf::p2pmsg::create_msg_from_fsentry_response(
fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, cons::ctx.lcl);
fbuf, sr.parent_path, child_hash_nodes, sr.expected_hash, lcl);
return 1; // Success.
}
}

View File

@@ -14,7 +14,7 @@ namespace state_serve
void state_serve_loop();
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr);
int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr, std::string_view lcl);
int get_data_block(std::vector<uint8_t> &vec, const std::string_view vpath,
const uint32_t block_id, const hpfs::h32 expected_hash);

View File

@@ -3,7 +3,7 @@
#include "../msg/fbuf/common_helpers.hpp"
#include "../p2p/p2p.hpp"
#include "../pchheader.hpp"
#include "../cons/cons.hpp"
#include "../ledger.hpp"
#include "../hplog.hpp"
#include "../util.hpp"
#include "../hpfs/hpfs.hpp"
@@ -129,6 +129,7 @@ namespace state_sync
LOG_ERROR << "State sync: Failed to start hpfs rw session";
}
std::scoped_lock<std::mutex> lock(ctx.target_state_update_lock);
ctx.target_state = hpfs::h32_empty;
ctx.is_syncing = false;
}
@@ -138,13 +139,18 @@ namespace state_sync
void request_loop(const hpfs::h32 current_target, hpfs::h32 &updated_state)
{
std::string lcl = ledger::ctx.get_lcl();
// Send the initial root state request.
submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target});
submit_request(backlog_item{BACKLOG_ITEM_TYPE::DIR, "/", -1, current_target}, lcl);
while (!should_stop_request_loop(current_target))
{
util::sleep(REQUEST_LOOP_WAIT);
// Get current lcl.
std::string lcl = ledger::ctx.get_lcl();
{
std::scoped_lock<std::mutex> lock(p2p::ctx.collected_msgs.state_responses_mutex);
@@ -216,7 +222,7 @@ namespace state_sync
// Reset the counter and re-submit request.
request.waiting_time = 0;
LOG_DEBUG << "State sync: Resubmitting request...";
submit_request(request);
submit_request(request, lcl);
}
}
@@ -230,7 +236,7 @@ namespace state_sync
return;
const backlog_item &request = ctx.pending_requests.front();
submit_request(request);
submit_request(request, lcl);
ctx.pending_requests.pop_front();
}
}
@@ -257,7 +263,8 @@ namespace state_sync
* @param block_id The requested block id. Only relevant if requesting a file block. Otherwise -1.
* @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different.
*/
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash)
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const hpfs::h32 expected_hash, std::string_view lcl)
{
p2p::state_request sr;
sr.parent_path = path;
@@ -266,25 +273,25 @@ namespace state_sync
sr.expected_hash = expected_hash;
flatbuffers::FlatBufferBuilder fbuf(1024);
msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, sr, cons::ctx.lcl);
msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, sr, lcl);
p2p::send_message_to_random_peer(fbuf); //todo: send to a node that hold the majority state to improve reliability of retrieving state.
}
/**
* Submits a pending state request to the peer.
*/
void submit_request(const backlog_item &request)
void submit_request(const backlog_item &request, std::string_view lcl)
{
LOG_DEBUG << "State sync: Submitting request. type:" << request.type
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
<< " path:" << request.path << " block_id:" << request.block_id
<< " hash:" << request.expected_hash;
const std::string key = std::string(request.path)
.append(reinterpret_cast<const char *>(&request.expected_hash), sizeof(hpfs::h32));
ctx.submitted_requests.try_emplace(key, request);
const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR;
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash);
request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl);
}
/**
@@ -412,8 +419,8 @@ namespace state_sync
std::string_view buf = msg::fbuf::flatbuff_bytes_to_sv(block_msg->data());
LOG_DEBUG << "State sync: Writing block_id " << block_id
<< " (len:" << buf.length()
<< ") of " << file_vpath;
<< " (len:" << buf.length()
<< ") of " << file_vpath;
std::string file_physical_path = std::string(ctx.hpfs_mount_dir).append(file_vpath);
const int fd = open(file_physical_path.c_str(), O_WRONLY | O_CREAT | O_CLOEXEC, FILE_PERMS);

View File

@@ -68,9 +68,10 @@ namespace state_sync
bool should_stop_request_loop(const hpfs::h32 current_target);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, const hpfs::h32 expected_hash);
void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id,
const hpfs::h32 expected_hash, std::string_view lcl);
void submit_request(const backlog_item &request);
void submit_request(const backlog_item &request, std::string_view lcl);
int handle_fs_entry_response(std::string_view parent_vpath, const msg::fbuf::p2pmsg::Fs_Entry_Response *fs_entry_resp);

View File

@@ -8,6 +8,7 @@
#include "../conf.hpp"
#include "../crypto.hpp"
#include "../hplog.hpp"
#include "../ledger.hpp"
#include "usr.hpp"
#include "user_session_handler.hpp"
#include "user_input.hpp"
@@ -180,7 +181,7 @@ namespace usr
else if (msg_type == msg::usrmsg::MSGTYPE_STAT)
{
std::vector<uint8_t> msg;
parser.create_status_response(msg);
parser.create_status_response(msg, ledger::ctx.get_seq_no(), ledger::ctx.get_lcl());
user.session.send(msg);
return 0;
}

View File

@@ -375,6 +375,20 @@ namespace util
return remove(path.data());
}
/**
* Clears all files from a directory (not recursive).
*/
int clear_directory(std::string_view dir_path)
{
return nftw(
dir_path.data(), [](const char *fpath, const struct stat *sb, int typeflag, struct FTW *ftwbuf) {
if (typeflag == FTW_F) // Is file.
return remove(fpath);
return 0;
},
1, FTW_PHYS);
}
void split_string(std::vector<std::string> &collection, std::string_view str, std::string_view delimeter)
{
if (str.empty())

View File

@@ -108,6 +108,8 @@ namespace util
int remove_file(std::string_view path);
int clear_directory(std::string_view dir_path);
void split_string(std::vector<std::string> &collection, std::string_view str, std::string_view delimeter);
} // namespace util