hpfs binary update, hpfs log serving and passing to hpfs (#281)

* full history announcement in peer challenge and ability to send random messages only to full history nodes.

* log sync infrastructure codes.

* Resolving merge conflict  induced errors.

* Fork detection implementation.

* Cleanup.

* Resolving PR comments.

* hpfs binary update, hpfs log serving and persisting

* Fixed merge conflict and added missing log

Co-authored-by: Savinda Senevirathne <savindadilsara@gmail.com>
This commit is contained in:
Chalith Desaman
2021-04-01 17:13:58 +05:30
committed by GitHub
parent bba5266f5d
commit 5833431157
10 changed files with 247 additions and 104 deletions

View File

@@ -17,8 +17,8 @@ namespace hpfs
constexpr const char *HMAP_HASH = "::hpfs.hmap.hash";
constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children";
constexpr const char *INDEX_CONTROL = "/::hpfs.index";
constexpr const char *ROOT_PATH = "/";
constexpr const char *INDEX_UPDATE = "/::hpfs.index";
constexpr const char *LOG_INDEX_FILENAME = "/log.hpfs.idx";
constexpr ino_t ROOT_INO = 1;
@@ -26,6 +26,8 @@ namespace hpfs
constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000;
constexpr uint16_t INIT_CHECK_INTERVAL = 20;
constexpr uint64_t MAX_HPFS_LOG_READ_SIZE = 1 * 1024 * 1024;
/**
* This should be called to activate the hpfs mount process.
*/
@@ -396,9 +398,13 @@ namespace hpfs
}
}
/**
* This updates the hpfs log index file with latest log offset and the root hash.
* @return Returns 0 in success, otherwise -1.
*/
int hpfs_mount::update_hpfs_log_index()
{
const std::string index_file = mount_dir + INDEX_UPDATE;
const std::string index_file = mount_dir + INDEX_CONTROL;
const int fd = open(index_file.c_str(), O_RDWR);
if (fd == -1)
@@ -423,7 +429,7 @@ namespace hpfs
*/
int hpfs_mount::truncate_log_file(const uint64_t seq_no)
{
const std::string file_path = mount_dir + INDEX_UPDATE + "." + std::to_string(seq_no);
const std::string file_path = mount_dir + INDEX_CONTROL + "." + std::to_string(seq_no);
// File /hpfs::index.<seq_no> is truncated to invoke log file truncation in hpfs.
// This call waits until any running RW or RO sessions stop.
if (truncate(file_path.c_str(), 0) == -1)
@@ -434,6 +440,65 @@ namespace hpfs
return 0;
}
/**
* This reads the hpfs logs from given min to max ledger seq_no range.
* @param min_ledger_seq_no Mininmum ledger seq number.
* @param max_ledger_seq_no Maximum ledger seq number.
* @param buf Buffer to read logs.
* @return Returns 0 if success, otherwise -1.
*/
int hpfs_mount::read_hpfs_logs(const uint64_t min_ledger_seq_no, const uint64_t max_ledger_seq_no, std::vector<uint8_t> &buf)
{
const std::string index_file = mount_dir + INDEX_CONTROL + "." + std::to_string(min_ledger_seq_no) + "." + std::to_string(max_ledger_seq_no);
const int fd = open(index_file.c_str(), O_RDONLY);
if (fd == -1)
{
LOG_ERROR << errno << ": Error opening the hpfs logs file";
return -1;
}
// First resize the buffer to max size and then after reading resize it to the actual read size.
buf.resize(MAX_HPFS_LOG_READ_SIZE);
const int res = read(fd, buf.data(), MAX_HPFS_LOG_READ_SIZE);
if (res == -1)
{
LOG_ERROR << errno << ": Error reading the hpfs logs file";
close(fd);
return -1;
}
buf.resize(res);
close(fd);
return 0;
}
/**
* This appends new log records to the hpfs log file.
* @param buf Hpfs log record buffer to write.
* @return Returns 0 in success, otherwise -1.
*/
int hpfs_mount::append_hpfs_log_records(const std::vector<uint8_t> &buf)
{
const std::string index_file = mount_dir + INDEX_CONTROL;
const int fd = open(index_file.c_str(), O_RDWR);
if (fd == -1)
{
LOG_ERROR << errno << ": Error opening the hpfs logs file";
return -1;
}
if (write(fd, buf.data(), buf.size()) == -1)
{
LOG_ERROR << errno << ": Error writing to the hpfs logs file";
close(fd);
return -1;
}
close(fd);
return 0;
}
/**
* Get the last sequence number updated in the index file.
* @param seq_no The last sequence number.
@@ -478,10 +543,27 @@ namespace hpfs
LOG_ERROR << errno << ": Error opening hpfs index file " << path;
return -1;
}
struct stat st;
if (fstat(fd, &st) == -1)
{
LOG_ERROR << errno << ": Error stat hpfs index file " << path;
return -1;
}
const off_t offset = ((seq_no - 1) * (sizeof(uint64_t) + sizeof(util::h32))) + sizeof(uint64_t);
// If calculated offset is beyond our file size means,
// Requested seq_no is invalid or we do not have that seq_no in our hpfs log file.
if (offset >= st.st_size)
{
LOG_DEBUG << "Requested hash does not exist in hpfs log file: seq no " << seq_no;
close(fd);
return -1;
}
if (pread(fd, &hash, sizeof(util::h32), offset) < sizeof(util::h32))
{
LOG_ERROR << errno << ": Error reading hash from the given offset " << std::to_string(offset);
LOG_ERROR << errno << ": Error reading hash from the given offset " << offset;
close(fd);
return -1;
}

View File

@@ -75,6 +75,8 @@ namespace hpfs
int truncate_log_file(const uint64_t seq_no);
int get_last_seq_no_from_index(uint64_t &seq_no);
int get_hash_from_index_by_seq_no(util::h32 &hash, const uint64_t seq_no);
int read_hpfs_logs(const uint64_t min_ledger_seq_no, const uint64_t max_ledger_seq_no, std::vector<uint8_t> &buf);
int append_hpfs_log_records(const std::vector<uint8_t> &buf);
};
} // namespace hpfs

View File

@@ -15,8 +15,8 @@ union P2PMsgContent {
PeerCapacityAnnouncementMsg,
PeerListRequestMsg,
PeerListResponseMsg,
LogRecordRequest,
LogRecordResponse
HpfsLogRequest,
HpfsLogResponse
}
table P2PMsg {
@@ -124,16 +124,15 @@ table HpfsFSHashEntry{
hash: [ubyte];
}
table LogRecordRequest
table HpfsLogRequest
{
target_record_id:SequenceHash;
min_record_id:SequenceHash;
}
table LogRecordResponse
table HpfsLogResponse
{
min_record_id:SequenceHash;
max_record_id:SequenceHash;
log_record_bytes:[ubyte];
}

View File

@@ -211,7 +211,7 @@ namespace msg::fbuf::p2pmsg
const p2p::hpfs_log_request create_hpfs_log_request_from_msg(const p2p::peer_message_info &mi)
{
const auto &msg = *mi.p2p_msg->content_as_LogRecordRequest();
const auto &msg = *mi.p2p_msg->content_as_HpfsLogRequest();
p2p::hpfs_log_request log_record;
log_record.target_record_id = flatbuf_seqhash_to_seqhash(msg.target_record_id());
log_record.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id());
@@ -220,10 +220,9 @@ namespace msg::fbuf::p2pmsg
const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi)
{
const auto &msg = *mi.p2p_msg->content_as_LogRecordResponse();
const auto &msg = *mi.p2p_msg->content_as_HpfsLogResponse();
p2p::hpfs_log_response hpfs_log_response;
hpfs_log_response.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id());
hpfs_log_response.max_record_id = flatbuf_seqhash_to_seqhash(msg.max_record_id());
hpfs_log_response.log_record_bytes.reserve(msg.log_record_bytes()->size());
for (const auto byte: *msg.log_record_bytes())
hpfs_log_response.log_record_bytes.push_back(byte);
@@ -424,23 +423,22 @@ namespace msg::fbuf::p2pmsg
void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request)
{
const auto msg = CreateLogRecordRequest(
const auto msg = CreateHpfsLogRequest(
builder,
seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.target_record_id),
seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.min_record_id));
create_p2p_msg(builder, P2PMsgContent_LogRecordRequest, msg.Union());
create_p2p_msg(builder, P2PMsgContent_HpfsLogRequest, msg.Union());
}
void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response)
{
const auto msg = CreateLogRecordResponse(
const auto msg = CreateHpfsLogResponse(
builder,
seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.min_record_id),
seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.max_record_id),
builder.CreateVector<uint8_t>(hpfs_log_response.log_record_bytes));
create_p2p_msg(builder, P2PMsgContent_LogRecordResponse, msg.Union());
create_p2p_msg(builder, P2PMsgContent_HpfsLogResponse, msg.Union());
}
void create_msg_from_fsentry_response(

View File

@@ -52,11 +52,11 @@ struct HpfsBlockResponseBuilder;
struct HpfsFSHashEntry;
struct HpfsFSHashEntryBuilder;
struct LogRecordRequest;
struct LogRecordRequestBuilder;
struct HpfsLogRequest;
struct HpfsLogRequestBuilder;
struct LogRecordResponse;
struct LogRecordResponseBuilder;
struct HpfsLogResponse;
struct HpfsLogResponseBuilder;
struct PeerRequirementAnnouncementMsg;
struct PeerRequirementAnnouncementMsgBuilder;
@@ -79,7 +79,7 @@ struct SequenceHashBuilder;
struct ByteArray;
struct ByteArrayBuilder;
enum P2PMsgContent : uint8_t {
enum P2PMsgContent {
P2PMsgContent_NONE = 0,
P2PMsgContent_PeerChallengeMsg = 1,
P2PMsgContent_PeerChallengeResponseMsg = 2,
@@ -92,10 +92,10 @@ enum P2PMsgContent : uint8_t {
P2PMsgContent_PeerCapacityAnnouncementMsg = 9,
P2PMsgContent_PeerListRequestMsg = 10,
P2PMsgContent_PeerListResponseMsg = 11,
P2PMsgContent_LogRecordRequest = 12,
P2PMsgContent_LogRecordResponse = 13,
P2PMsgContent_HpfsLogRequest = 12,
P2PMsgContent_HpfsLogResponse = 13,
P2PMsgContent_MIN = P2PMsgContent_NONE,
P2PMsgContent_MAX = P2PMsgContent_LogRecordResponse
P2PMsgContent_MAX = P2PMsgContent_HpfsLogResponse
};
inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] {
@@ -112,8 +112,8 @@ inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] {
P2PMsgContent_PeerCapacityAnnouncementMsg,
P2PMsgContent_PeerListRequestMsg,
P2PMsgContent_PeerListResponseMsg,
P2PMsgContent_LogRecordRequest,
P2PMsgContent_LogRecordResponse
P2PMsgContent_HpfsLogRequest,
P2PMsgContent_HpfsLogResponse
};
return values;
}
@@ -132,15 +132,15 @@ inline const char * const *EnumNamesP2PMsgContent() {
"PeerCapacityAnnouncementMsg",
"PeerListRequestMsg",
"PeerListResponseMsg",
"LogRecordRequest",
"LogRecordResponse",
"HpfsLogRequest",
"HpfsLogResponse",
nullptr
};
return names;
}
inline const char *EnumNameP2PMsgContent(P2PMsgContent e) {
if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_LogRecordResponse)) return "";
if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_HpfsLogResponse)) return "";
const size_t index = static_cast<size_t>(e);
return EnumNamesP2PMsgContent()[index];
}
@@ -193,18 +193,18 @@ template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::PeerListResponseMsg> {
static const P2PMsgContent enum_value = P2PMsgContent_PeerListResponseMsg;
};
template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::LogRecordRequest> {
static const P2PMsgContent enum_value = P2PMsgContent_LogRecordRequest;
template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::HpfsLogRequest> {
static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogRequest;
};
template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::LogRecordResponse> {
static const P2PMsgContent enum_value = P2PMsgContent_LogRecordResponse;
template<> struct P2PMsgContentTraits<msg::fbuf::p2pmsg::HpfsLogResponse> {
static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogResponse;
};
bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type);
bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector<flatbuffers::Offset<void>> *values, const flatbuffers::Vector<uint8_t> *types);
enum HpfsResponse : uint8_t {
enum HpfsResponse {
HpfsResponse_NONE = 0,
HpfsResponse_HpfsFileHashMapResponse = 1,
HpfsResponse_HpfsBlockResponse = 2,
@@ -319,11 +319,11 @@ struct P2PMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
const msg::fbuf::p2pmsg::PeerListResponseMsg *content_as_PeerListResponseMsg() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_PeerListResponseMsg ? static_cast<const msg::fbuf::p2pmsg::PeerListResponseMsg *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::LogRecordRequest *content_as_LogRecordRequest() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordRequest ? static_cast<const msg::fbuf::p2pmsg::LogRecordRequest *>(content()) : nullptr;
const msg::fbuf::p2pmsg::HpfsLogRequest *content_as_HpfsLogRequest() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogRequest ? static_cast<const msg::fbuf::p2pmsg::HpfsLogRequest *>(content()) : nullptr;
}
const msg::fbuf::p2pmsg::LogRecordResponse *content_as_LogRecordResponse() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordResponse ? static_cast<const msg::fbuf::p2pmsg::LogRecordResponse *>(content()) : nullptr;
const msg::fbuf::p2pmsg::HpfsLogResponse *content_as_HpfsLogResponse() const {
return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogResponse ? static_cast<const msg::fbuf::p2pmsg::HpfsLogResponse *>(content()) : nullptr;
}
void *mutable_content() {
return GetPointer<void *>(VT_CONTENT);
@@ -384,12 +384,12 @@ template<> inline const msg::fbuf::p2pmsg::PeerListResponseMsg *P2PMsg::content_
return content_as_PeerListResponseMsg();
}
template<> inline const msg::fbuf::p2pmsg::LogRecordRequest *P2PMsg::content_as<msg::fbuf::p2pmsg::LogRecordRequest>() const {
return content_as_LogRecordRequest();
template<> inline const msg::fbuf::p2pmsg::HpfsLogRequest *P2PMsg::content_as<msg::fbuf::p2pmsg::HpfsLogRequest>() const {
return content_as_HpfsLogRequest();
}
template<> inline const msg::fbuf::p2pmsg::LogRecordResponse *P2PMsg::content_as<msg::fbuf::p2pmsg::LogRecordResponse>() const {
return content_as_LogRecordResponse();
template<> inline const msg::fbuf::p2pmsg::HpfsLogResponse *P2PMsg::content_as<msg::fbuf::p2pmsg::HpfsLogResponse>() const {
return content_as_HpfsLogResponse();
}
struct P2PMsgBuilder {
@@ -412,6 +412,7 @@ struct P2PMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
P2PMsgBuilder &operator=(const P2PMsgBuilder &);
flatbuffers::Offset<P2PMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<P2PMsg>(end);
@@ -512,6 +513,7 @@ struct PeerChallengeMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerChallengeMsgBuilder &operator=(const PeerChallengeMsgBuilder &);
flatbuffers::Offset<PeerChallengeMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerChallengeMsg>(end);
@@ -603,6 +605,7 @@ struct PeerChallengeResponseMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerChallengeResponseMsgBuilder &operator=(const PeerChallengeResponseMsgBuilder &);
flatbuffers::Offset<PeerChallengeResponseMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerChallengeResponseMsg>(end);
@@ -690,6 +693,7 @@ struct UserInputBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
UserInputBuilder &operator=(const UserInputBuilder &);
flatbuffers::Offset<UserInput> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<UserInput>(end);
@@ -766,6 +770,7 @@ struct UserInputGroupBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
UserInputGroupBuilder &operator=(const UserInputGroupBuilder &);
flatbuffers::Offset<UserInputGroup> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<UserInputGroup>(end);
@@ -826,6 +831,7 @@ struct NonUnlProposalMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
NonUnlProposalMsgBuilder &operator=(const NonUnlProposalMsgBuilder &);
flatbuffers::Offset<NonUnlProposalMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<NonUnlProposalMsg>(end);
@@ -1035,6 +1041,7 @@ struct ProposalMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
ProposalMsgBuilder &operator=(const ProposalMsgBuilder &);
flatbuffers::Offset<ProposalMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<ProposalMsg>(end);
@@ -1185,6 +1192,7 @@ struct NplMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
NplMsgBuilder &operator=(const NplMsgBuilder &);
flatbuffers::Offset<NplMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<NplMsg>(end);
@@ -1298,6 +1306,7 @@ struct HpfsRequestMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsRequestMsgBuilder &operator=(const HpfsRequestMsgBuilder &);
flatbuffers::Offset<HpfsRequestMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsRequestMsg>(end);
@@ -1434,6 +1443,7 @@ struct HpfsResponseMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsResponseMsgBuilder &operator=(const HpfsResponseMsgBuilder &);
flatbuffers::Offset<HpfsResponseMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsResponseMsg>(end);
@@ -1517,6 +1527,7 @@ struct HpfsFsEntryResponseBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFsEntryResponseBuilder &operator=(const HpfsFsEntryResponseBuilder &);
flatbuffers::Offset<HpfsFsEntryResponse> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFsEntryResponse>(end);
@@ -1597,6 +1608,7 @@ struct HpfsFileHashMapResponseBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFileHashMapResponseBuilder &operator=(const HpfsFileHashMapResponseBuilder &);
flatbuffers::Offset<HpfsFileHashMapResponse> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFileHashMapResponse>(end);
@@ -1670,6 +1682,7 @@ struct HpfsBlockResponseBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsBlockResponseBuilder &operator=(const HpfsBlockResponseBuilder &);
flatbuffers::Offset<HpfsBlockResponse> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsBlockResponse>(end);
@@ -1751,6 +1764,7 @@ struct HpfsFSHashEntryBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &);
flatbuffers::Offset<HpfsFSHashEntry> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<HpfsFSHashEntry>(end);
@@ -1784,8 +1798,8 @@ inline flatbuffers::Offset<HpfsFSHashEntry> CreateHpfsFSHashEntryDirect(
hash__);
}
struct LogRecordRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef LogRecordRequestBuilder Builder;
struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsLogRequestBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_TARGET_RECORD_ID = 4,
VT_MIN_RECORD_ID = 6
@@ -1812,43 +1826,43 @@ struct LogRecordRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
}
};
struct LogRecordRequestBuilder {
typedef LogRecordRequest Table;
struct HpfsLogRequestBuilder {
typedef HpfsLogRequest Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_target_record_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> target_record_id) {
fbb_.AddOffset(LogRecordRequest::VT_TARGET_RECORD_ID, target_record_id);
fbb_.AddOffset(HpfsLogRequest::VT_TARGET_RECORD_ID, target_record_id);
}
void add_min_record_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> min_record_id) {
fbb_.AddOffset(LogRecordRequest::VT_MIN_RECORD_ID, min_record_id);
fbb_.AddOffset(HpfsLogRequest::VT_MIN_RECORD_ID, min_record_id);
}
explicit LogRecordRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb)
explicit HpfsLogRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
flatbuffers::Offset<LogRecordRequest> Finish() {
HpfsLogRequestBuilder &operator=(const HpfsLogRequestBuilder &);
flatbuffers::Offset<HpfsLogRequest> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<LogRecordRequest>(end);
auto o = flatbuffers::Offset<HpfsLogRequest>(end);
return o;
}
};
inline flatbuffers::Offset<LogRecordRequest> CreateLogRecordRequest(
inline flatbuffers::Offset<HpfsLogRequest> CreateHpfsLogRequest(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> target_record_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> min_record_id = 0) {
LogRecordRequestBuilder builder_(_fbb);
HpfsLogRequestBuilder builder_(_fbb);
builder_.add_min_record_id(min_record_id);
builder_.add_target_record_id(target_record_id);
return builder_.Finish();
}
struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef LogRecordResponseBuilder Builder;
struct HpfsLogResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
typedef HpfsLogResponseBuilder Builder;
enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE {
VT_MIN_RECORD_ID = 4,
VT_MAX_RECORD_ID = 6,
VT_LOG_RECORD_BYTES = 8
VT_LOG_RECORD_BYTES = 6
};
const msg::fbuf::p2pmsg::SequenceHash *min_record_id() const {
return GetPointer<const msg::fbuf::p2pmsg::SequenceHash *>(VT_MIN_RECORD_ID);
@@ -1856,12 +1870,6 @@ struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
msg::fbuf::p2pmsg::SequenceHash *mutable_min_record_id() {
return GetPointer<msg::fbuf::p2pmsg::SequenceHash *>(VT_MIN_RECORD_ID);
}
const msg::fbuf::p2pmsg::SequenceHash *max_record_id() const {
return GetPointer<const msg::fbuf::p2pmsg::SequenceHash *>(VT_MAX_RECORD_ID);
}
msg::fbuf::p2pmsg::SequenceHash *mutable_max_record_id() {
return GetPointer<msg::fbuf::p2pmsg::SequenceHash *>(VT_MAX_RECORD_ID);
}
const flatbuffers::Vector<uint8_t> *log_record_bytes() const {
return GetPointer<const flatbuffers::Vector<uint8_t> *>(VT_LOG_RECORD_BYTES);
}
@@ -1872,60 +1880,52 @@ struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table {
return VerifyTableStart(verifier) &&
VerifyOffset(verifier, VT_MIN_RECORD_ID) &&
verifier.VerifyTable(min_record_id()) &&
VerifyOffset(verifier, VT_MAX_RECORD_ID) &&
verifier.VerifyTable(max_record_id()) &&
VerifyOffset(verifier, VT_LOG_RECORD_BYTES) &&
verifier.VerifyVector(log_record_bytes()) &&
verifier.EndTable();
}
};
struct LogRecordResponseBuilder {
typedef LogRecordResponse Table;
struct HpfsLogResponseBuilder {
typedef HpfsLogResponse Table;
flatbuffers::FlatBufferBuilder &fbb_;
flatbuffers::uoffset_t start_;
void add_min_record_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> min_record_id) {
fbb_.AddOffset(LogRecordResponse::VT_MIN_RECORD_ID, min_record_id);
}
void add_max_record_id(flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> max_record_id) {
fbb_.AddOffset(LogRecordResponse::VT_MAX_RECORD_ID, max_record_id);
fbb_.AddOffset(HpfsLogResponse::VT_MIN_RECORD_ID, min_record_id);
}
void add_log_record_bytes(flatbuffers::Offset<flatbuffers::Vector<uint8_t>> log_record_bytes) {
fbb_.AddOffset(LogRecordResponse::VT_LOG_RECORD_BYTES, log_record_bytes);
fbb_.AddOffset(HpfsLogResponse::VT_LOG_RECORD_BYTES, log_record_bytes);
}
explicit LogRecordResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb)
explicit HpfsLogResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb)
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
flatbuffers::Offset<LogRecordResponse> Finish() {
HpfsLogResponseBuilder &operator=(const HpfsLogResponseBuilder &);
flatbuffers::Offset<HpfsLogResponse> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<LogRecordResponse>(end);
auto o = flatbuffers::Offset<HpfsLogResponse>(end);
return o;
}
};
inline flatbuffers::Offset<LogRecordResponse> CreateLogRecordResponse(
inline flatbuffers::Offset<HpfsLogResponse> CreateHpfsLogResponse(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> min_record_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> max_record_id = 0,
flatbuffers::Offset<flatbuffers::Vector<uint8_t>> log_record_bytes = 0) {
LogRecordResponseBuilder builder_(_fbb);
HpfsLogResponseBuilder builder_(_fbb);
builder_.add_log_record_bytes(log_record_bytes);
builder_.add_max_record_id(max_record_id);
builder_.add_min_record_id(min_record_id);
return builder_.Finish();
}
inline flatbuffers::Offset<LogRecordResponse> CreateLogRecordResponseDirect(
inline flatbuffers::Offset<HpfsLogResponse> CreateHpfsLogResponseDirect(
flatbuffers::FlatBufferBuilder &_fbb,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> min_record_id = 0,
flatbuffers::Offset<msg::fbuf::p2pmsg::SequenceHash> max_record_id = 0,
const std::vector<uint8_t> *log_record_bytes = nullptr) {
auto log_record_bytes__ = log_record_bytes ? _fbb.CreateVector<uint8_t>(*log_record_bytes) : 0;
return msg::fbuf::p2pmsg::CreateLogRecordResponse(
return msg::fbuf::p2pmsg::CreateHpfsLogResponse(
_fbb,
min_record_id,
max_record_id,
log_record_bytes__);
}
@@ -1958,6 +1958,7 @@ struct PeerRequirementAnnouncementMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerRequirementAnnouncementMsgBuilder &operator=(const PeerRequirementAnnouncementMsgBuilder &);
flatbuffers::Offset<PeerRequirementAnnouncementMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerRequirementAnnouncementMsg>(end);
@@ -2013,6 +2014,7 @@ struct PeerCapacityAnnouncementMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerCapacityAnnouncementMsgBuilder &operator=(const PeerCapacityAnnouncementMsgBuilder &);
flatbuffers::Offset<PeerCapacityAnnouncementMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerCapacityAnnouncementMsg>(end);
@@ -2046,6 +2048,7 @@ struct PeerListRequestMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerListRequestMsgBuilder &operator=(const PeerListRequestMsgBuilder &);
flatbuffers::Offset<PeerListRequestMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerListRequestMsg>(end);
@@ -2090,6 +2093,7 @@ struct PeerListResponseMsgBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerListResponseMsgBuilder &operator=(const PeerListResponseMsgBuilder &);
flatbuffers::Offset<PeerListResponseMsg> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerListResponseMsg>(end);
@@ -2177,6 +2181,7 @@ struct PeerPropertiesBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
PeerPropertiesBuilder &operator=(const PeerPropertiesBuilder &);
flatbuffers::Offset<PeerProperties> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<PeerProperties>(end);
@@ -2254,6 +2259,7 @@ struct SequenceHashBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
SequenceHashBuilder &operator=(const SequenceHashBuilder &);
flatbuffers::Offset<SequenceHash> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<SequenceHash>(end);
@@ -2312,6 +2318,7 @@ struct ByteArrayBuilder {
: fbb_(_fbb) {
start_ = fbb_.StartTable();
}
ByteArrayBuilder &operator=(const ByteArrayBuilder &);
flatbuffers::Offset<ByteArray> Finish() {
const auto end = fbb_.EndTable(start_);
auto o = flatbuffers::Offset<ByteArray>(end);
@@ -2385,12 +2392,12 @@ inline bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::PeerListResponseMsg *>(obj);
return verifier.VerifyTable(ptr);
}
case P2PMsgContent_LogRecordRequest: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::LogRecordRequest *>(obj);
case P2PMsgContent_HpfsLogRequest: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsLogRequest *>(obj);
return verifier.VerifyTable(ptr);
}
case P2PMsgContent_LogRecordResponse: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::LogRecordResponse *>(obj);
case P2PMsgContent_HpfsLogResponse: {
auto ptr = reinterpret_cast<const msg::fbuf::p2pmsg::HpfsLogResponse *>(obj);
return verifier.VerifyTable(ptr);
}
default: return true;

View File

@@ -143,7 +143,6 @@ namespace p2p
struct hpfs_log_response
{
sequence_hash min_record_id;
sequence_hash max_record_id;
std::vector<uint8_t> log_record_bytes;
};

View File

@@ -232,7 +232,7 @@ namespace p2p
LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << session.display_name();
}
}
else if (mi.type == p2pmsg::P2PMsgContent_LogRecordRequest)
else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogRequest)
{
if (conf::cfg.node.history == conf::HISTORY::FULL)
{
@@ -249,7 +249,7 @@ namespace p2p
LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << session.display_name();
}
}
else if (mi.type == p2pmsg::P2PMsgContent_LogRecordResponse)
else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogResponse)
{
if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing)
{

View File

@@ -88,6 +88,15 @@ namespace sc::hpfs_log_sync
// Process any history responses from other nodes.
if (!sync_ctx.target_log_record.empty() && check_hpfs_log_sync_responses() == 1)
processed = true;
// Here we check for the updated log records to check whether target has archived.
if (sync_ctx.is_syncing && get_verified_min_record() == 1)
{
LOG_INFO << "Hpfs log sync: sync target archived " << sync_ctx.target_log_record;
sync_ctx.target_log_record = {};
sync_ctx.min_log_record = {};
sync_ctx.is_syncing = false;
}
}
// Serve any history requests from other nodes.
@@ -151,6 +160,9 @@ namespace sc::hpfs_log_sync
log_record_responses.splice(log_record_responses.end(), p2p::ctx.collected_msgs.log_record_responses);
}
for (const auto &[sess_id, log_response] : log_record_responses)
handle_hpfs_log_sync_response(log_response);
return log_record_responses.empty() ? 0 : 1;
}
@@ -173,11 +185,16 @@ namespace sc::hpfs_log_sync
for (const auto &[session_id, lr] : log_record_requests)
{
flatbuffers::FlatBufferBuilder fbuf(1024);
// Before serving the request check whether we have the requested min seq_no.
// And requested min hash matches with our corresponding hash.
if (!check_required_log_record_availability(lr))
continue;
p2p::hpfs_log_response resp;
resp.max_record_id = lr.target_record_id;
if (sc::contract_fs.read_hpfs_logs(lr.min_record_id.seq_no, lr.target_record_id.seq_no, resp.log_record_bytes) == -1)
continue;
resp.min_record_id = lr.min_record_id;
resp.log_record_bytes = std::vector<uint8_t>();
flatbuffers::FlatBufferBuilder fbuf(1024);
p2pmsg::create_msg_from_hpfs_log_response(fbuf, resp);
std::string_view msg = std::string_view(reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
@@ -197,21 +214,55 @@ namespace sc::hpfs_log_sync
/**
* Check requested sequence number is in node's log file.
* @param lr log record request information.
* @return true if requested sequence number is in node's log file.
* @param log_request log record request information.
* @return true if requested sequence number is in node's log file and requested hash mathces with ours.
*/
bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record)
bool check_required_log_record_availability(const p2p::hpfs_log_request &log_request)
{
// If requested min is the genesis we serve without checking.
const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)};
if (log_request.min_record_id == genesis)
return true;
util::h32 root_hash;
if (sc::contract_fs.get_hash_from_index_by_seq_no(root_hash, log_request.min_record_id.seq_no) == -1)
return false;
if (root_hash != log_request.min_record_id.hash)
{
LOG_DEBUG << "Requested root hash does not match with ours: seq no " << log_request.min_record_id.seq_no;
return false;
}
return true;
}
/**
* Handle recieved ledger history response.
* @param lr log record request information.
* @param log_response log record response information.
* @return 0 on successful log update. -1 on failure.
*/
int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_log_record_seqno)
int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &log_response)
{
p2p::sequence_hash requested_min_seq_id;
{
std::scoped_lock<std::mutex> lock(sync_ctx.min_log_record_mutex);
requested_min_seq_id = sync_ctx.min_log_record;
}
// Append only if the response contains min_seq_no staring from requested min seq_no.
const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)};
if (log_response.min_record_id != requested_min_seq_id)
{
LOG_DEBUG << "Invalid joining point in the received hpfs log response";
return -1;
}
if (sc::contract_fs.append_hpfs_log_records(log_response.log_record_bytes) == -1)
{
LOG_ERROR << errno << ": Error persisting hpfs log responses";
return -1;
}
return 0;
}
@@ -229,9 +280,8 @@ namespace sc::hpfs_log_sync
return -1;
}
const p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id();
if (last_from_index.seq_no == 0)
p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id();
if (last_from_index.seq_no == ledger::genesis.seq_no || last_from_ledger.seq_no == ledger::genesis.seq_no)
{
// Request full ledger.
std::scoped_lock<std::mutex> lock(sync_ctx.min_log_record_mutex);
@@ -239,6 +289,12 @@ namespace sc::hpfs_log_sync
return 0;
}
if (ledger::get_root_hash_from_ledger(last_from_ledger.hash, last_from_ledger.seq_no) == -1)
{
LOG_ERROR << "Error getting root hash from ledger for sequence number: " << last_from_index.seq_no;
return -1;
}
if (last_from_index == last_from_ledger)
{
// In sync. No need to sync.

View File

@@ -49,9 +49,9 @@ namespace sc::hpfs_log_sync
int check_hpfs_log_sync_requests();
bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record);
bool check_required_log_record_availability(const p2p::hpfs_log_request &log_request);
int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_lcl);
int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &log_response);
int get_verified_min_record();

Binary file not shown.