diff --git a/src/hpfs/hpfs_mount.cpp b/src/hpfs/hpfs_mount.cpp index 1bd41a9a..98add6a3 100644 --- a/src/hpfs/hpfs_mount.cpp +++ b/src/hpfs/hpfs_mount.cpp @@ -17,8 +17,8 @@ namespace hpfs constexpr const char *HMAP_HASH = "::hpfs.hmap.hash"; constexpr const char *HMAP_CHILDREN = "::hpfs.hmap.children"; + constexpr const char *INDEX_CONTROL = "/::hpfs.index"; constexpr const char *ROOT_PATH = "/"; - constexpr const char *INDEX_UPDATE = "/::hpfs.index"; constexpr const char *LOG_INDEX_FILENAME = "/log.hpfs.idx"; constexpr ino_t ROOT_INO = 1; @@ -26,6 +26,8 @@ namespace hpfs constexpr uint16_t PROCESS_INIT_TIMEOUT = 2000; constexpr uint16_t INIT_CHECK_INTERVAL = 20; + constexpr uint64_t MAX_HPFS_LOG_READ_SIZE = 1 * 1024 * 1024; + /** * This should be called to activate the hpfs mount process. */ @@ -396,9 +398,13 @@ namespace hpfs } } + /** + * This updates the hpfs log index file with latest log offset and the root hash. + * @return Returns 0 in success, otherwise -1. + */ int hpfs_mount::update_hpfs_log_index() { - const std::string index_file = mount_dir + INDEX_UPDATE; + const std::string index_file = mount_dir + INDEX_CONTROL; const int fd = open(index_file.c_str(), O_RDWR); if (fd == -1) @@ -423,7 +429,7 @@ namespace hpfs */ int hpfs_mount::truncate_log_file(const uint64_t seq_no) { - const std::string file_path = mount_dir + INDEX_UPDATE + "." + std::to_string(seq_no); + const std::string file_path = mount_dir + INDEX_CONTROL + "." + std::to_string(seq_no); // File /hpfs::index. is truncated to invoke log file truncation in hpfs. // This call waits until any running RW or RO sessions stop. if (truncate(file_path.c_str(), 0) == -1) @@ -434,6 +440,65 @@ namespace hpfs return 0; } + /** + * This reads the hpfs logs from given min to max ledger seq_no range. + * @param min_ledger_seq_no Mininmum ledger seq number. + * @param max_ledger_seq_no Maximum ledger seq number. + * @param buf Buffer to read logs. + * @return Returns 0 if success, otherwise -1. + */ + int hpfs_mount::read_hpfs_logs(const uint64_t min_ledger_seq_no, const uint64_t max_ledger_seq_no, std::vector &buf) + { + const std::string index_file = mount_dir + INDEX_CONTROL + "." + std::to_string(min_ledger_seq_no) + "." + std::to_string(max_ledger_seq_no); + + const int fd = open(index_file.c_str(), O_RDONLY); + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening the hpfs logs file"; + return -1; + } + + // First resize the buffer to max size and then after reading resize it to the actual read size. + buf.resize(MAX_HPFS_LOG_READ_SIZE); + const int res = read(fd, buf.data(), MAX_HPFS_LOG_READ_SIZE); + if (res == -1) + { + LOG_ERROR << errno << ": Error reading the hpfs logs file"; + close(fd); + return -1; + } + buf.resize(res); + close(fd); + return 0; + } + + /** + * This appends new log records to the hpfs log file. + * @param buf Hpfs log record buffer to write. + * @return Returns 0 in success, otherwise -1. + */ + int hpfs_mount::append_hpfs_log_records(const std::vector &buf) + { + const std::string index_file = mount_dir + INDEX_CONTROL; + + const int fd = open(index_file.c_str(), O_RDWR); + if (fd == -1) + { + LOG_ERROR << errno << ": Error opening the hpfs logs file"; + return -1; + } + + if (write(fd, buf.data(), buf.size()) == -1) + { + LOG_ERROR << errno << ": Error writing to the hpfs logs file"; + close(fd); + return -1; + } + + close(fd); + return 0; + } + /** * Get the last sequence number updated in the index file. * @param seq_no The last sequence number. @@ -478,10 +543,27 @@ namespace hpfs LOG_ERROR << errno << ": Error opening hpfs index file " << path; return -1; } + + struct stat st; + if (fstat(fd, &st) == -1) + { + LOG_ERROR << errno << ": Error stat hpfs index file " << path; + return -1; + } + const off_t offset = ((seq_no - 1) * (sizeof(uint64_t) + sizeof(util::h32))) + sizeof(uint64_t); + // If calculated offset is beyond our file size means, + // Requested seq_no is invalid or we do not have that seq_no in our hpfs log file. + if (offset >= st.st_size) + { + LOG_DEBUG << "Requested hash does not exist in hpfs log file: seq no " << seq_no; + close(fd); + return -1; + } + if (pread(fd, &hash, sizeof(util::h32), offset) < sizeof(util::h32)) { - LOG_ERROR << errno << ": Error reading hash from the given offset " << std::to_string(offset); + LOG_ERROR << errno << ": Error reading hash from the given offset " << offset; close(fd); return -1; } diff --git a/src/hpfs/hpfs_mount.hpp b/src/hpfs/hpfs_mount.hpp index e9fa9660..91df99f9 100644 --- a/src/hpfs/hpfs_mount.hpp +++ b/src/hpfs/hpfs_mount.hpp @@ -75,6 +75,8 @@ namespace hpfs int truncate_log_file(const uint64_t seq_no); int get_last_seq_no_from_index(uint64_t &seq_no); int get_hash_from_index_by_seq_no(util::h32 &hash, const uint64_t seq_no); + int read_hpfs_logs(const uint64_t min_ledger_seq_no, const uint64_t max_ledger_seq_no, std::vector &buf); + int append_hpfs_log_records(const std::vector &buf); }; } // namespace hpfs diff --git a/src/msg/fbuf/p2pmsg.fbs b/src/msg/fbuf/p2pmsg.fbs index 8e4fa514..27f29051 100644 --- a/src/msg/fbuf/p2pmsg.fbs +++ b/src/msg/fbuf/p2pmsg.fbs @@ -15,8 +15,8 @@ union P2PMsgContent { PeerCapacityAnnouncementMsg, PeerListRequestMsg, PeerListResponseMsg, - LogRecordRequest, - LogRecordResponse + HpfsLogRequest, + HpfsLogResponse } table P2PMsg { @@ -124,16 +124,15 @@ table HpfsFSHashEntry{ hash: [ubyte]; } -table LogRecordRequest +table HpfsLogRequest { target_record_id:SequenceHash; min_record_id:SequenceHash; } -table LogRecordResponse +table HpfsLogResponse { min_record_id:SequenceHash; - max_record_id:SequenceHash; log_record_bytes:[ubyte]; } diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 9a006bf4..1a03faee 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -211,7 +211,7 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_log_request create_hpfs_log_request_from_msg(const p2p::peer_message_info &mi) { - const auto &msg = *mi.p2p_msg->content_as_LogRecordRequest(); + const auto &msg = *mi.p2p_msg->content_as_HpfsLogRequest(); p2p::hpfs_log_request log_record; log_record.target_record_id = flatbuf_seqhash_to_seqhash(msg.target_record_id()); log_record.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id()); @@ -220,10 +220,9 @@ namespace msg::fbuf::p2pmsg const p2p::hpfs_log_response create_hpfs_log_response_from_msg(const p2p::peer_message_info &mi) { - const auto &msg = *mi.p2p_msg->content_as_LogRecordResponse(); + const auto &msg = *mi.p2p_msg->content_as_HpfsLogResponse(); p2p::hpfs_log_response hpfs_log_response; hpfs_log_response.min_record_id = flatbuf_seqhash_to_seqhash(msg.min_record_id()); - hpfs_log_response.max_record_id = flatbuf_seqhash_to_seqhash(msg.max_record_id()); hpfs_log_response.log_record_bytes.reserve(msg.log_record_bytes()->size()); for (const auto byte: *msg.log_record_bytes()) hpfs_log_response.log_record_bytes.push_back(byte); @@ -424,23 +423,22 @@ namespace msg::fbuf::p2pmsg void create_msg_from_hpfs_log_request(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_request &hpfs_log_request) { - const auto msg = CreateLogRecordRequest( + const auto msg = CreateHpfsLogRequest( builder, seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.target_record_id), seqhash_to_flatbuf_seqhash(builder, hpfs_log_request.min_record_id)); - create_p2p_msg(builder, P2PMsgContent_LogRecordRequest, msg.Union()); + create_p2p_msg(builder, P2PMsgContent_HpfsLogRequest, msg.Union()); } void create_msg_from_hpfs_log_response(flatbuffers::FlatBufferBuilder &builder, const p2p::hpfs_log_response &hpfs_log_response) { - const auto msg = CreateLogRecordResponse( + const auto msg = CreateHpfsLogResponse( builder, seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.min_record_id), - seqhash_to_flatbuf_seqhash(builder, hpfs_log_response.max_record_id), builder.CreateVector(hpfs_log_response.log_record_bytes)); - create_p2p_msg(builder, P2PMsgContent_LogRecordResponse, msg.Union()); + create_p2p_msg(builder, P2PMsgContent_HpfsLogResponse, msg.Union()); } void create_msg_from_fsentry_response( diff --git a/src/msg/fbuf/p2pmsg_generated.h b/src/msg/fbuf/p2pmsg_generated.h index 9844b5ed..64c2c897 100644 --- a/src/msg/fbuf/p2pmsg_generated.h +++ b/src/msg/fbuf/p2pmsg_generated.h @@ -52,11 +52,11 @@ struct HpfsBlockResponseBuilder; struct HpfsFSHashEntry; struct HpfsFSHashEntryBuilder; -struct LogRecordRequest; -struct LogRecordRequestBuilder; +struct HpfsLogRequest; +struct HpfsLogRequestBuilder; -struct LogRecordResponse; -struct LogRecordResponseBuilder; +struct HpfsLogResponse; +struct HpfsLogResponseBuilder; struct PeerRequirementAnnouncementMsg; struct PeerRequirementAnnouncementMsgBuilder; @@ -79,7 +79,7 @@ struct SequenceHashBuilder; struct ByteArray; struct ByteArrayBuilder; -enum P2PMsgContent : uint8_t { +enum P2PMsgContent { P2PMsgContent_NONE = 0, P2PMsgContent_PeerChallengeMsg = 1, P2PMsgContent_PeerChallengeResponseMsg = 2, @@ -92,10 +92,10 @@ enum P2PMsgContent : uint8_t { P2PMsgContent_PeerCapacityAnnouncementMsg = 9, P2PMsgContent_PeerListRequestMsg = 10, P2PMsgContent_PeerListResponseMsg = 11, - P2PMsgContent_LogRecordRequest = 12, - P2PMsgContent_LogRecordResponse = 13, + P2PMsgContent_HpfsLogRequest = 12, + P2PMsgContent_HpfsLogResponse = 13, P2PMsgContent_MIN = P2PMsgContent_NONE, - P2PMsgContent_MAX = P2PMsgContent_LogRecordResponse + P2PMsgContent_MAX = P2PMsgContent_HpfsLogResponse }; inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] { @@ -112,8 +112,8 @@ inline const P2PMsgContent (&EnumValuesP2PMsgContent())[14] { P2PMsgContent_PeerCapacityAnnouncementMsg, P2PMsgContent_PeerListRequestMsg, P2PMsgContent_PeerListResponseMsg, - P2PMsgContent_LogRecordRequest, - P2PMsgContent_LogRecordResponse + P2PMsgContent_HpfsLogRequest, + P2PMsgContent_HpfsLogResponse }; return values; } @@ -132,15 +132,15 @@ inline const char * const *EnumNamesP2PMsgContent() { "PeerCapacityAnnouncementMsg", "PeerListRequestMsg", "PeerListResponseMsg", - "LogRecordRequest", - "LogRecordResponse", + "HpfsLogRequest", + "HpfsLogResponse", nullptr }; return names; } inline const char *EnumNameP2PMsgContent(P2PMsgContent e) { - if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_LogRecordResponse)) return ""; + if (flatbuffers::IsOutRange(e, P2PMsgContent_NONE, P2PMsgContent_HpfsLogResponse)) return ""; const size_t index = static_cast(e); return EnumNamesP2PMsgContent()[index]; } @@ -193,18 +193,18 @@ template<> struct P2PMsgContentTraits { static const P2PMsgContent enum_value = P2PMsgContent_PeerListResponseMsg; }; -template<> struct P2PMsgContentTraits { - static const P2PMsgContent enum_value = P2PMsgContent_LogRecordRequest; +template<> struct P2PMsgContentTraits { + static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogRequest; }; -template<> struct P2PMsgContentTraits { - static const P2PMsgContent enum_value = P2PMsgContent_LogRecordResponse; +template<> struct P2PMsgContentTraits { + static const P2PMsgContent enum_value = P2PMsgContent_HpfsLogResponse; }; bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj, P2PMsgContent type); bool VerifyP2PMsgContentVector(flatbuffers::Verifier &verifier, const flatbuffers::Vector> *values, const flatbuffers::Vector *types); -enum HpfsResponse : uint8_t { +enum HpfsResponse { HpfsResponse_NONE = 0, HpfsResponse_HpfsFileHashMapResponse = 1, HpfsResponse_HpfsBlockResponse = 2, @@ -319,11 +319,11 @@ struct P2PMsg FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { const msg::fbuf::p2pmsg::PeerListResponseMsg *content_as_PeerListResponseMsg() const { return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_PeerListResponseMsg ? static_cast(content()) : nullptr; } - const msg::fbuf::p2pmsg::LogRecordRequest *content_as_LogRecordRequest() const { - return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordRequest ? static_cast(content()) : nullptr; + const msg::fbuf::p2pmsg::HpfsLogRequest *content_as_HpfsLogRequest() const { + return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogRequest ? static_cast(content()) : nullptr; } - const msg::fbuf::p2pmsg::LogRecordResponse *content_as_LogRecordResponse() const { - return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_LogRecordResponse ? static_cast(content()) : nullptr; + const msg::fbuf::p2pmsg::HpfsLogResponse *content_as_HpfsLogResponse() const { + return content_type() == msg::fbuf::p2pmsg::P2PMsgContent_HpfsLogResponse ? static_cast(content()) : nullptr; } void *mutable_content() { return GetPointer(VT_CONTENT); @@ -384,12 +384,12 @@ template<> inline const msg::fbuf::p2pmsg::PeerListResponseMsg *P2PMsg::content_ return content_as_PeerListResponseMsg(); } -template<> inline const msg::fbuf::p2pmsg::LogRecordRequest *P2PMsg::content_as() const { - return content_as_LogRecordRequest(); +template<> inline const msg::fbuf::p2pmsg::HpfsLogRequest *P2PMsg::content_as() const { + return content_as_HpfsLogRequest(); } -template<> inline const msg::fbuf::p2pmsg::LogRecordResponse *P2PMsg::content_as() const { - return content_as_LogRecordResponse(); +template<> inline const msg::fbuf::p2pmsg::HpfsLogResponse *P2PMsg::content_as() const { + return content_as_HpfsLogResponse(); } struct P2PMsgBuilder { @@ -412,6 +412,7 @@ struct P2PMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + P2PMsgBuilder &operator=(const P2PMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -512,6 +513,7 @@ struct PeerChallengeMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerChallengeMsgBuilder &operator=(const PeerChallengeMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -603,6 +605,7 @@ struct PeerChallengeResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerChallengeResponseMsgBuilder &operator=(const PeerChallengeResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -690,6 +693,7 @@ struct UserInputBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + UserInputBuilder &operator=(const UserInputBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -766,6 +770,7 @@ struct UserInputGroupBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + UserInputGroupBuilder &operator=(const UserInputGroupBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -826,6 +831,7 @@ struct NonUnlProposalMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + NonUnlProposalMsgBuilder &operator=(const NonUnlProposalMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1035,6 +1041,7 @@ struct ProposalMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + ProposalMsgBuilder &operator=(const ProposalMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1185,6 +1192,7 @@ struct NplMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + NplMsgBuilder &operator=(const NplMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1298,6 +1306,7 @@ struct HpfsRequestMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsRequestMsgBuilder &operator=(const HpfsRequestMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1434,6 +1443,7 @@ struct HpfsResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsResponseMsgBuilder &operator=(const HpfsResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1517,6 +1527,7 @@ struct HpfsFsEntryResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsFsEntryResponseBuilder &operator=(const HpfsFsEntryResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1597,6 +1608,7 @@ struct HpfsFileHashMapResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsFileHashMapResponseBuilder &operator=(const HpfsFileHashMapResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1670,6 +1682,7 @@ struct HpfsBlockResponseBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsBlockResponseBuilder &operator=(const HpfsBlockResponseBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1751,6 +1764,7 @@ struct HpfsFSHashEntryBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + HpfsFSHashEntryBuilder &operator=(const HpfsFSHashEntryBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -1784,8 +1798,8 @@ inline flatbuffers::Offset CreateHpfsFSHashEntryDirect( hash__); } -struct LogRecordRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef LogRecordRequestBuilder Builder; +struct HpfsLogRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef HpfsLogRequestBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_TARGET_RECORD_ID = 4, VT_MIN_RECORD_ID = 6 @@ -1812,43 +1826,43 @@ struct LogRecordRequest FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } }; -struct LogRecordRequestBuilder { - typedef LogRecordRequest Table; +struct HpfsLogRequestBuilder { + typedef HpfsLogRequest Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_target_record_id(flatbuffers::Offset target_record_id) { - fbb_.AddOffset(LogRecordRequest::VT_TARGET_RECORD_ID, target_record_id); + fbb_.AddOffset(HpfsLogRequest::VT_TARGET_RECORD_ID, target_record_id); } void add_min_record_id(flatbuffers::Offset min_record_id) { - fbb_.AddOffset(LogRecordRequest::VT_MIN_RECORD_ID, min_record_id); + fbb_.AddOffset(HpfsLogRequest::VT_MIN_RECORD_ID, min_record_id); } - explicit LogRecordRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit HpfsLogRequestBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - flatbuffers::Offset Finish() { + HpfsLogRequestBuilder &operator=(const HpfsLogRequestBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateLogRecordRequest( +inline flatbuffers::Offset CreateHpfsLogRequest( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset target_record_id = 0, flatbuffers::Offset min_record_id = 0) { - LogRecordRequestBuilder builder_(_fbb); + HpfsLogRequestBuilder builder_(_fbb); builder_.add_min_record_id(min_record_id); builder_.add_target_record_id(target_record_id); return builder_.Finish(); } -struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { - typedef LogRecordResponseBuilder Builder; +struct HpfsLogResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { + typedef HpfsLogResponseBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_MIN_RECORD_ID = 4, - VT_MAX_RECORD_ID = 6, - VT_LOG_RECORD_BYTES = 8 + VT_LOG_RECORD_BYTES = 6 }; const msg::fbuf::p2pmsg::SequenceHash *min_record_id() const { return GetPointer(VT_MIN_RECORD_ID); @@ -1856,12 +1870,6 @@ struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { msg::fbuf::p2pmsg::SequenceHash *mutable_min_record_id() { return GetPointer(VT_MIN_RECORD_ID); } - const msg::fbuf::p2pmsg::SequenceHash *max_record_id() const { - return GetPointer(VT_MAX_RECORD_ID); - } - msg::fbuf::p2pmsg::SequenceHash *mutable_max_record_id() { - return GetPointer(VT_MAX_RECORD_ID); - } const flatbuffers::Vector *log_record_bytes() const { return GetPointer *>(VT_LOG_RECORD_BYTES); } @@ -1872,60 +1880,52 @@ struct LogRecordResponse FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { return VerifyTableStart(verifier) && VerifyOffset(verifier, VT_MIN_RECORD_ID) && verifier.VerifyTable(min_record_id()) && - VerifyOffset(verifier, VT_MAX_RECORD_ID) && - verifier.VerifyTable(max_record_id()) && VerifyOffset(verifier, VT_LOG_RECORD_BYTES) && verifier.VerifyVector(log_record_bytes()) && verifier.EndTable(); } }; -struct LogRecordResponseBuilder { - typedef LogRecordResponse Table; +struct HpfsLogResponseBuilder { + typedef HpfsLogResponse Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; void add_min_record_id(flatbuffers::Offset min_record_id) { - fbb_.AddOffset(LogRecordResponse::VT_MIN_RECORD_ID, min_record_id); - } - void add_max_record_id(flatbuffers::Offset max_record_id) { - fbb_.AddOffset(LogRecordResponse::VT_MAX_RECORD_ID, max_record_id); + fbb_.AddOffset(HpfsLogResponse::VT_MIN_RECORD_ID, min_record_id); } void add_log_record_bytes(flatbuffers::Offset> log_record_bytes) { - fbb_.AddOffset(LogRecordResponse::VT_LOG_RECORD_BYTES, log_record_bytes); + fbb_.AddOffset(HpfsLogResponse::VT_LOG_RECORD_BYTES, log_record_bytes); } - explicit LogRecordResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb) + explicit HpfsLogResponseBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); } - flatbuffers::Offset Finish() { + HpfsLogResponseBuilder &operator=(const HpfsLogResponseBuilder &); + flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); - auto o = flatbuffers::Offset(end); + auto o = flatbuffers::Offset(end); return o; } }; -inline flatbuffers::Offset CreateLogRecordResponse( +inline flatbuffers::Offset CreateHpfsLogResponse( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset min_record_id = 0, - flatbuffers::Offset max_record_id = 0, flatbuffers::Offset> log_record_bytes = 0) { - LogRecordResponseBuilder builder_(_fbb); + HpfsLogResponseBuilder builder_(_fbb); builder_.add_log_record_bytes(log_record_bytes); - builder_.add_max_record_id(max_record_id); builder_.add_min_record_id(min_record_id); return builder_.Finish(); } -inline flatbuffers::Offset CreateLogRecordResponseDirect( +inline flatbuffers::Offset CreateHpfsLogResponseDirect( flatbuffers::FlatBufferBuilder &_fbb, flatbuffers::Offset min_record_id = 0, - flatbuffers::Offset max_record_id = 0, const std::vector *log_record_bytes = nullptr) { auto log_record_bytes__ = log_record_bytes ? _fbb.CreateVector(*log_record_bytes) : 0; - return msg::fbuf::p2pmsg::CreateLogRecordResponse( + return msg::fbuf::p2pmsg::CreateHpfsLogResponse( _fbb, min_record_id, - max_record_id, log_record_bytes__); } @@ -1958,6 +1958,7 @@ struct PeerRequirementAnnouncementMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerRequirementAnnouncementMsgBuilder &operator=(const PeerRequirementAnnouncementMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2013,6 +2014,7 @@ struct PeerCapacityAnnouncementMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerCapacityAnnouncementMsgBuilder &operator=(const PeerCapacityAnnouncementMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2046,6 +2048,7 @@ struct PeerListRequestMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerListRequestMsgBuilder &operator=(const PeerListRequestMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2090,6 +2093,7 @@ struct PeerListResponseMsgBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerListResponseMsgBuilder &operator=(const PeerListResponseMsgBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2177,6 +2181,7 @@ struct PeerPropertiesBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + PeerPropertiesBuilder &operator=(const PeerPropertiesBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2254,6 +2259,7 @@ struct SequenceHashBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + SequenceHashBuilder &operator=(const SequenceHashBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2312,6 +2318,7 @@ struct ByteArrayBuilder { : fbb_(_fbb) { start_ = fbb_.StartTable(); } + ByteArrayBuilder &operator=(const ByteArrayBuilder &); flatbuffers::Offset Finish() { const auto end = fbb_.EndTable(start_); auto o = flatbuffers::Offset(end); @@ -2385,12 +2392,12 @@ inline bool VerifyP2PMsgContent(flatbuffers::Verifier &verifier, const void *obj auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } - case P2PMsgContent_LogRecordRequest: { - auto ptr = reinterpret_cast(obj); + case P2PMsgContent_HpfsLogRequest: { + auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } - case P2PMsgContent_LogRecordResponse: { - auto ptr = reinterpret_cast(obj); + case P2PMsgContent_HpfsLogResponse: { + auto ptr = reinterpret_cast(obj); return verifier.VerifyTable(ptr); } default: return true; diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 23e43153..6c861633 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -143,7 +143,6 @@ namespace p2p struct hpfs_log_response { sequence_hash min_record_id; - sequence_hash max_record_id; std::vector log_record_bytes; }; diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index fbd056f8..be028027 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -232,7 +232,7 @@ namespace p2p LOG_DEBUG << "Ledger hpfs response rejected. Maximum response count reached. " << session.display_name(); } } - else if (mi.type == p2pmsg::P2PMsgContent_LogRecordRequest) + else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogRequest) { if (conf::cfg.node.history == conf::HISTORY::FULL) { @@ -249,7 +249,7 @@ namespace p2p LOG_DEBUG << "Hpfs log request rejected. Maximum request count reached. " << session.display_name(); } } - else if (mi.type == p2pmsg::P2PMsgContent_LogRecordResponse) + else if (mi.type == p2pmsg::P2PMsgContent_HpfsLogResponse) { if (conf::cfg.node.history == conf::HISTORY::FULL && sc::hpfs_log_sync::sync_ctx.is_syncing) { diff --git a/src/sc/hpfs_log_sync.cpp b/src/sc/hpfs_log_sync.cpp index a534b327..8ef973ed 100644 --- a/src/sc/hpfs_log_sync.cpp +++ b/src/sc/hpfs_log_sync.cpp @@ -88,6 +88,15 @@ namespace sc::hpfs_log_sync // Process any history responses from other nodes. if (!sync_ctx.target_log_record.empty() && check_hpfs_log_sync_responses() == 1) processed = true; + + // Here we check for the updated log records to check whether target has archived. + if (sync_ctx.is_syncing && get_verified_min_record() == 1) + { + LOG_INFO << "Hpfs log sync: sync target archived " << sync_ctx.target_log_record; + sync_ctx.target_log_record = {}; + sync_ctx.min_log_record = {}; + sync_ctx.is_syncing = false; + } } // Serve any history requests from other nodes. @@ -151,6 +160,9 @@ namespace sc::hpfs_log_sync log_record_responses.splice(log_record_responses.end(), p2p::ctx.collected_msgs.log_record_responses); } + for (const auto &[sess_id, log_response] : log_record_responses) + handle_hpfs_log_sync_response(log_response); + return log_record_responses.empty() ? 0 : 1; } @@ -173,11 +185,16 @@ namespace sc::hpfs_log_sync for (const auto &[session_id, lr] : log_record_requests) { - flatbuffers::FlatBufferBuilder fbuf(1024); + // Before serving the request check whether we have the requested min seq_no. + // And requested min hash matches with our corresponding hash. + if (!check_required_log_record_availability(lr)) + continue; + p2p::hpfs_log_response resp; - resp.max_record_id = lr.target_record_id; + if (sc::contract_fs.read_hpfs_logs(lr.min_record_id.seq_no, lr.target_record_id.seq_no, resp.log_record_bytes) == -1) + continue; resp.min_record_id = lr.min_record_id; - resp.log_record_bytes = std::vector(); + flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_hpfs_log_response(fbuf, resp); std::string_view msg = std::string_view(reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); @@ -197,21 +214,55 @@ namespace sc::hpfs_log_sync /** * Check requested sequence number is in node's log file. - * @param lr log record request information. - * @return true if requested sequence number is in node's log file. + * @param log_request log record request information. + * @return true if requested sequence number is in node's log file and requested hash mathces with ours. */ - bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record) + bool check_required_log_record_availability(const p2p::hpfs_log_request &log_request) { + // If requested min is the genesis we serve without checking. + const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + if (log_request.min_record_id == genesis) + return true; + + util::h32 root_hash; + if (sc::contract_fs.get_hash_from_index_by_seq_no(root_hash, log_request.min_record_id.seq_no) == -1) + return false; + + if (root_hash != log_request.min_record_id.hash) + { + LOG_DEBUG << "Requested root hash does not match with ours: seq no " << log_request.min_record_id.seq_no; + return false; + } + return true; } /** * Handle recieved ledger history response. - * @param lr log record request information. + * @param log_response log record response information. * @return 0 on successful log update. -1 on failure. */ - int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_log_record_seqno) + int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &log_response) { + p2p::sequence_hash requested_min_seq_id; + { + std::scoped_lock lock(sync_ctx.min_log_record_mutex); + requested_min_seq_id = sync_ctx.min_log_record; + } + + // Append only if the response contains min_seq_no staring from requested min seq_no. + const p2p::sequence_hash genesis{ledger::genesis.seq_no, hpfs::get_root_hash(ledger::genesis.config_hash, ledger::genesis.state_hash)}; + if (log_response.min_record_id != requested_min_seq_id) + { + LOG_DEBUG << "Invalid joining point in the received hpfs log response"; + return -1; + } + + if (sc::contract_fs.append_hpfs_log_records(log_response.log_record_bytes) == -1) + { + LOG_ERROR << errno << ": Error persisting hpfs log responses"; + return -1; + } return 0; } @@ -229,9 +280,8 @@ namespace sc::hpfs_log_sync return -1; } - const p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id(); - - if (last_from_index.seq_no == 0) + p2p::sequence_hash last_from_ledger = ledger::ctx.get_lcl_id(); + if (last_from_index.seq_no == ledger::genesis.seq_no || last_from_ledger.seq_no == ledger::genesis.seq_no) { // Request full ledger. std::scoped_lock lock(sync_ctx.min_log_record_mutex); @@ -239,6 +289,12 @@ namespace sc::hpfs_log_sync return 0; } + if (ledger::get_root_hash_from_ledger(last_from_ledger.hash, last_from_ledger.seq_no) == -1) + { + LOG_ERROR << "Error getting root hash from ledger for sequence number: " << last_from_index.seq_no; + return -1; + } + if (last_from_index == last_from_ledger) { // In sync. No need to sync. diff --git a/src/sc/hpfs_log_sync.hpp b/src/sc/hpfs_log_sync.hpp index 3150e4c6..4508e9e9 100644 --- a/src/sc/hpfs_log_sync.hpp +++ b/src/sc/hpfs_log_sync.hpp @@ -49,9 +49,9 @@ namespace sc::hpfs_log_sync int check_hpfs_log_sync_requests(); - bool check_required_log_record_availability(const p2p::sequence_hash &min_log_record); + bool check_required_log_record_availability(const p2p::hpfs_log_request &log_request); - int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &hr, std::string &new_lcl); + int handle_hpfs_log_sync_response(const p2p::hpfs_log_response &log_response); int get_verified_min_record(); diff --git a/test/bin/hpfs b/test/bin/hpfs index 3a37b7a0..d2396e16 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ