diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index be643f8a..0628e9a5 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -10,259 +10,259 @@ namespace comm { -constexpr uint32_t INTERVALMS = 60000; -constexpr uint8_t SIZE_HEADER_LEN = 8; + constexpr uint32_t INTERVALMS = 60000; + constexpr uint8_t SIZE_HEADER_LEN = 8; -// Global instances of user and peer session handlers. -usr::user_session_handler user_sess_handler; -p2p::peer_session_handler peer_sess_handler; + // Global instances of user and peer session handlers. + usr::user_session_handler user_sess_handler; + p2p::peer_session_handler peer_sess_handler; -comm_session::comm_session( - std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, - const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4]) + comm_session::comm_session( + std::string_view ip, const int read_fd, const int write_fd, const SESSION_TYPE session_type, + const bool is_binary, const bool is_inbound, const uint64_t (&metric_thresholds)[4]) - : read_fd(read_fd), - write_fd(write_fd), - session_type(session_type), - uniqueid(std::to_string(read_fd).append(":").append(ip)), - is_binary(is_binary), - is_inbound(is_inbound) -{ - // Create new session_thresholds and insert it to thresholds vector. - // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector - // since enum's value is used as index in the vector to update vector values. - thresholds.reserve(sizeof metric_thresholds); - for (size_t i = 0; i < sizeof metric_thresholds; i++) - thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS)); -} + : read_fd(read_fd), + write_fd(write_fd), + session_type(session_type), + uniqueid(std::to_string(read_fd).append(":").append(ip)), + is_binary(is_binary), + is_inbound(is_inbound) + { + // Create new session_thresholds and insert it to thresholds vector. + // Have to maintain the SESSION_THRESHOLDS enum order in inserting new thresholds to thresholds vector + // since enum's value is used as index in the vector to update vector values. + thresholds.reserve(4); + for (size_t i = 0; i < 4; i++) + thresholds.push_back(session_threshold(metric_thresholds[i], INTERVALMS)); + } -int comm_session::on_connect() -{ - state = SESSION_STATE::ACTIVE; + int comm_session::on_connect() + { + state = SESSION_STATE::ACTIVE; - if (session_type == SESSION_TYPE::USER) - return user_sess_handler.on_connect(*this); - else - return peer_sess_handler.on_connect(*this); -} + if (session_type == SESSION_TYPE::USER) + return user_sess_handler.on_connect(*this); + else + return peer_sess_handler.on_connect(*this); + } -/** + /** * Attempts to read message data from the given socket fd and passes the message on to the session. * @param should_disconnect Whether the client fd must be disconnected. * @param max_msg_size The allowed max byte length of a message to be read. */ -int comm_session::attempt_read(const uint64_t max_msg_size) -{ - size_t available_bytes = 0; - if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 || - (max_msg_size > 0 && - available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0)))) - return -1; - - // Try to read a complete message using available bytes. - // If complete message is not available silently return. - if (available_bytes > 0) + int comm_session::attempt_read(const uint64_t max_msg_size) { - const uint32_t read_len = is_binary ? get_binary_msg_read_len(available_bytes) : available_bytes; + size_t available_bytes = 0; + if (ioctl(read_fd, FIONREAD, &available_bytes) == -1 || + (max_msg_size > 0 && + available_bytes > (max_msg_size + (is_binary ? SIZE_HEADER_LEN : 0)))) + return -1; - if (read_len == -1) + // Try to read a complete message using available bytes. + // If complete message is not available silently return. + if (available_bytes > 0) { + const uint32_t read_len = is_binary ? get_binary_msg_read_len(available_bytes) : available_bytes; + + if (read_len == -1) + { + return -1; + } + else if (read_len > 0) + { + int res = on_message(std::string_view(read_buffer.data(), read_len)); + read_buffer.clear(); // Clear the buffer after read operation. + read_buffer_filled_size = 0; + return res; + } + } + + return 0; + } + + int comm_session::on_message(std::string_view message) + { + increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.length()); + + if (session_type == SESSION_TYPE::USER) + return user_sess_handler.on_message(*this, message); + else + return peer_sess_handler.on_message(*this, message); + } + + int comm_session::send(std::string_view message) const + { + if (state == SESSION_STATE::CLOSED) + return -1; + + // Prepare the memory segments to map with writev(). + iovec memsegs[2]; + + if (is_binary) + { + // In binary mode, we need to prefix every message with the message size header. + uint8_t header_buf[SIZE_HEADER_LEN] = {0, 0, 0, 0, 0, 0, 0, 0}; + uint32_t len = message.length(); + // Reserve the first 4 bytes for future (TODO). + header_buf[4] = len >> 24; + header_buf[5] = (len >> 16) & 0xff; + header_buf[6] = (len >> 8) & 0xff; + header_buf[7] = len & 0xff; + + memsegs[0].iov_base = header_buf; + memsegs[0].iov_len = SIZE_HEADER_LEN; + memsegs[1].iov_base = (char *)message.data(); + memsegs[1].iov_len = message.length(); + } + else + { + // In text mode, we need to append every message with '\n' + memsegs[0].iov_base = (char *)message.data(); + memsegs[0].iov_len = message.length(); + memsegs[1].iov_base = (char *)"\n"; + memsegs[1].iov_len = 1; + } + + if (writev(write_fd, memsegs, 2) == -1) + { + LOG_ERR << errno << ": Session " << uniqueid << " send writev failed."; return -1; } - else if (read_len > 0) + return 0; + } + + void comm_session::close(const bool invoke_handler) + { + if (state == SESSION_STATE::CLOSED) + return; + + if (invoke_handler) { - int res = on_message(std::string_view(read_buffer.data(), read_len)); - read_buffer.clear(); // Clear the buffer after read operation. - read_buffer_filled_size = 0; - return res; + if (session_type == SESSION_TYPE::USER) + user_sess_handler.on_close(*this); + else + peer_sess_handler.on_close(*this); } + + ::close(read_fd); + state = SESSION_STATE::CLOSED; + + LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " + << uniqueid << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); } - return 0; -} - -int comm_session::on_message(std::string_view message) -{ - increment_metric(SESSION_THRESHOLDS::MAX_RAWBYTES_PER_MINUTE, message.length()); - - if (session_type == SESSION_TYPE::USER) - return user_sess_handler.on_message(*this, message); - else - return peer_sess_handler.on_message(*this, message); -} - -int comm_session::send(std::string_view message) const -{ - if (state == SESSION_STATE::CLOSED) - return -1; - - // Prepare the memory segments to map with writev(). - iovec memsegs[2]; - - if (is_binary) - { - // In binary mode, we need to prefix every message with the message size header. - uint8_t header_buf[SIZE_HEADER_LEN] = {0, 0, 0, 0, 0, 0, 0, 0}; - uint32_t len = message.length(); - // Reserve the first 4 bytes for future (TODO). - header_buf[4] = len >> 24; - header_buf[5] = (len >> 16) & 0xff; - header_buf[6] = (len >> 8) & 0xff; - header_buf[7] = len & 0xff; - - memsegs[0].iov_base = header_buf; - memsegs[0].iov_len = SIZE_HEADER_LEN; - memsegs[1].iov_base = (char *)message.data(); - memsegs[1].iov_len = message.length(); - } - else - { - // In text mode, we need to append every message with '\n' - memsegs[0].iov_base = (char *)message.data(); - memsegs[0].iov_len = message.length(); - memsegs[1].iov_base = (char *)"\n"; - memsegs[1].iov_len = 1; - } - - if (writev(write_fd, memsegs, 2) == -1) - { - LOG_ERR << errno << ": Session " << uniqueid << " send writev failed."; - return -1; - } - return 0; -} - -void comm_session::close(const bool invoke_handler) -{ - if (state == SESSION_STATE::CLOSED) - return; - - if (invoke_handler) - { - if (session_type == SESSION_TYPE::USER) - user_sess_handler.on_close(*this); - else - peer_sess_handler.on_close(*this); - } - - ::close(read_fd); - state = SESSION_STATE::CLOSED; - - LOG_DBG << (session_type == SESSION_TYPE::PEER ? "Peer" : "User") << " session closed: " - << uniqueid << (is_inbound ? "[in]" : "[out]") << (is_self ? "[self]" : ""); -} - -/** + /** * Retrieves the length of the binary message pending to be read. Only relevant for Binary mode. * @param available_bytes Count of bytes that is available to read from the client socket. * @return Length of the message if the complete message available to be read. 0 if reading must be skipped. -1 if client must be disconnected. */ -uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes) -{ - // If we have previously encountered a size header and we are waiting until all message - // bytes are received, we must have the expected message size > 0. - - size_t data_bytes = available_bytes; - - // If we are not tracking a previous size header, then we must check for a size header. - if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN) + uint32_t comm_session::get_binary_msg_read_len(const size_t available_bytes) { - // Read the size header. - uint8_t header_buf[SIZE_HEADER_LEN]; - if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1) - return -1; // Indicates that we should disconnect the client. + // If we have previously encountered a size header and we are waiting until all message + // bytes are received, we must have the expected message size > 0. - data_bytes -= SIZE_HEADER_LEN; + size_t data_bytes = available_bytes; - // We are using last 4 bytes (big endian) in the header for the message size. - uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7]; - - // Remember the expected msg size until sufficient bytes are available. - expected_msg_size = upcoming_msg_size; - read_buffer.resize(expected_msg_size); - } - - if (expected_msg_size > 0 && data_bytes > 0) - { - // Claculate bytes remaining to form complete message. - const size_t remaining_len = expected_msg_size - read_buffer_filled_size; - - // We know expected message size, and enough bytes are available to read complete expected message. - if (data_bytes >= remaining_len) + // If we are not tracking a previous size header, then we must check for a size header. + if (expected_msg_size == 0 && available_bytes >= SIZE_HEADER_LEN) { - // Complete the buffer by reading remaining bytes. - if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1) + // Read the size header. + uint8_t header_buf[SIZE_HEADER_LEN]; + if (read(read_fd, header_buf, SIZE_HEADER_LEN) == -1) return -1; // Indicates that we should disconnect the client. - read_buffer_filled_size += remaining_len; - const size_t read_len = expected_msg_size; - expected_msg_size = 0; // reset the expected msg size. - return read_len; + data_bytes -= SIZE_HEADER_LEN; + + // We are using last 4 bytes (big endian) in the header for the message size. + uint32_t upcoming_msg_size = (header_buf[4] << 24) + (header_buf[5] << 16) + (header_buf[6] << 8) + header_buf[7]; + + // Remember the expected msg size until sufficient bytes are available. + expected_msg_size = upcoming_msg_size; + read_buffer.resize(expected_msg_size); } - else + + if (expected_msg_size > 0 && data_bytes > 0) { - // Collect any available bytes to the buffer. - if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1) - return -1; // Indicates that we should disconnect the client. - read_buffer_filled_size += data_bytes; + // Claculate bytes remaining to form complete message. + const size_t remaining_len = expected_msg_size - read_buffer_filled_size; + + // We know expected message size, and enough bytes are available to read complete expected message. + if (data_bytes >= remaining_len) + { + // Complete the buffer by reading remaining bytes. + if (read(read_fd, read_buffer.data() + read_buffer_filled_size, remaining_len) == -1) + return -1; // Indicates that we should disconnect the client. + read_buffer_filled_size += remaining_len; + + const size_t read_len = expected_msg_size; + expected_msg_size = 0; // reset the expected msg size. + return read_len; + } + else + { + // Collect any available bytes to the buffer. + if (read(read_fd, read_buffer.data() + read_buffer_filled_size, data_bytes) == -1) + return -1; // Indicates that we should disconnect the client. + read_buffer_filled_size += data_bytes; + } } + + // Skip reading + return 0; } - // Skip reading - return 0; -} - -/** + /** * Set thresholds to the socket session */ -void comm_session::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms) -{ - session_threshold &t = thresholds[threshold_type]; - t.counter_value = 0; - t.intervalms = intervalms; - t.threshold_limit = threshold_limit; -} + void comm_session::set_threshold(const SESSION_THRESHOLDS threshold_type, const uint64_t threshold_limit, const uint32_t intervalms) + { + session_threshold &t = thresholds[threshold_type]; + t.counter_value = 0; + t.intervalms = intervalms; + t.threshold_limit = threshold_limit; + } -/* + /* * Increment the provided thresholds counter value with the provided amount and validate it against the * configured threshold limit. */ -void comm_session::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount) -{ - session_threshold &t = thresholds[threshold_type]; - - // Ignore the counter if limit is set as 0. - if (t.threshold_limit == 0) - return; - - const uint64_t time_now = util::get_epoch_milliseconds(); - - t.counter_value += amount; - if (t.timestamp == 0) + void comm_session::increment_metric(const SESSION_THRESHOLDS threshold_type, const uint64_t amount) { - // Reset counter timestamp. - t.timestamp = time_now; - } - else - { - // Check whether we have exceeded the threshold within the monitering interval. - const uint64_t elapsed_time = time_now - t.timestamp; - if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) - { - this->close(); + session_threshold &t = thresholds[threshold_type]; - t.timestamp = 0; - t.counter_value = 0; + // Ignore the counter if limit is set as 0. + if (t.threshold_limit == 0) + return; - LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; - corebill::report_violation(this->address); - } - else if (elapsed_time > t.intervalms) + const uint64_t time_now = util::get_epoch_milliseconds(); + + t.counter_value += amount; + if (t.timestamp == 0) { + // Reset counter timestamp. t.timestamp = time_now; - t.counter_value = amount; + } + else + { + // Check whether we have exceeded the threshold within the monitering interval. + const uint64_t elapsed_time = time_now - t.timestamp; + if (elapsed_time <= t.intervalms && t.counter_value > t.threshold_limit) + { + this->close(); + + t.timestamp = 0; + t.counter_value = 0; + + LOG_INFO << "Session " << this->uniqueid << " threshold exceeded. (type:" << threshold_type << " limit:" << t.threshold_limit << ")"; + corebill::report_violation(this->address); + } + else if (elapsed_time > t.intervalms) + { + t.timestamp = time_now; + t.counter_value = amount; + } } } -} } // namespace comm \ No newline at end of file diff --git a/src/fbschema/p2pmsg_content.fbs b/src/fbschema/p2pmsg_content.fbs index bebe1142..72cce89b 100644 --- a/src/fbschema/p2pmsg_content.fbs +++ b/src/fbschema/p2pmsg_content.fbs @@ -86,21 +86,19 @@ union State_Response{ File_HashMap_Response, Block_Response, Fs_Entry_Response } table State_Response_Message{ state_response:State_Response; hash:[ubyte]; + path: string; } table Fs_Entry_Response{ - path: string; entries: [State_FS_Hash_Entry]; } table File_HashMap_Response{ - path: string; file_length:uint64; hash_map:[ubyte]; } table Block_Response{ - path: string; block_id:uint32; data: [ubyte]; } diff --git a/src/fbschema/p2pmsg_content_generated.h b/src/fbschema/p2pmsg_content_generated.h index ba4e6f3d..94bda478 100644 --- a/src/fbschema/p2pmsg_content_generated.h +++ b/src/fbschema/p2pmsg_content_generated.h @@ -1277,7 +1277,8 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { VT_STATE_RESPONSE_TYPE = 4, VT_STATE_RESPONSE = 6, - VT_HASH = 8 + VT_HASH = 8, + VT_PATH = 10 }; fbschema::p2pmsg::State_Response state_response_type() const { return static_cast(GetField(VT_STATE_RESPONSE_TYPE, 0)); @@ -1304,6 +1305,12 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab flatbuffers::Vector *mutable_hash() { return GetPointer *>(VT_HASH); } + const flatbuffers::String *path() const { + return GetPointer(VT_PATH); + } + flatbuffers::String *mutable_path() { + return GetPointer(VT_PATH); + } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && VerifyField(verifier, VT_STATE_RESPONSE_TYPE) && @@ -1311,6 +1318,8 @@ struct State_Response_Message FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tab VerifyState_Response(verifier, state_response(), state_response_type()) && VerifyOffset(verifier, VT_HASH) && verifier.VerifyVector(hash()) && + VerifyOffset(verifier, VT_PATH) && + verifier.VerifyString(path()) && verifier.EndTable(); } }; @@ -1340,6 +1349,9 @@ struct State_Response_MessageBuilder { void add_hash(flatbuffers::Offset> hash) { fbb_.AddOffset(State_Response_Message::VT_HASH, hash); } + void add_path(flatbuffers::Offset path) { + fbb_.AddOffset(State_Response_Message::VT_PATH, path); + } explicit State_Response_MessageBuilder(flatbuffers::FlatBufferBuilder &_fbb) : fbb_(_fbb) { start_ = fbb_.StartTable(); @@ -1355,8 +1367,10 @@ inline flatbuffers::Offset CreateState_Response_Message( flatbuffers::FlatBufferBuilder &_fbb, fbschema::p2pmsg::State_Response state_response_type = fbschema::p2pmsg::State_Response_NONE, flatbuffers::Offset state_response = 0, - flatbuffers::Offset> hash = 0) { + flatbuffers::Offset> hash = 0, + flatbuffers::Offset path = 0) { State_Response_MessageBuilder builder_(_fbb); + builder_.add_path(path); builder_.add_hash(hash); builder_.add_state_response(state_response); builder_.add_state_response_type(state_response_type); @@ -1367,27 +1381,23 @@ inline flatbuffers::Offset CreateState_Response_MessageD flatbuffers::FlatBufferBuilder &_fbb, fbschema::p2pmsg::State_Response state_response_type = fbschema::p2pmsg::State_Response_NONE, flatbuffers::Offset state_response = 0, - const std::vector *hash = nullptr) { + const std::vector *hash = nullptr, + const char *path = nullptr) { auto hash__ = hash ? _fbb.CreateVector(*hash) : 0; + auto path__ = path ? _fbb.CreateString(path) : 0; return fbschema::p2pmsg::CreateState_Response_Message( _fbb, state_response_type, state_response, - hash__); + hash__, + path__); } struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef Fs_Entry_ResponseBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PATH = 4, - VT_ENTRIES = 6 + VT_ENTRIES = 4 }; - const flatbuffers::String *path() const { - return GetPointer(VT_PATH); - } - flatbuffers::String *mutable_path() { - return GetPointer(VT_PATH); - } const flatbuffers::Vector> *entries() const { return GetPointer> *>(VT_ENTRIES); } @@ -1396,8 +1406,6 @@ struct Fs_Entry_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PATH) && - verifier.VerifyString(path()) && VerifyOffset(verifier, VT_ENTRIES) && verifier.VerifyVector(entries()) && verifier.VerifyVectorOfTables(entries()) && @@ -1409,9 +1417,6 @@ struct Fs_Entry_ResponseBuilder { typedef Fs_Entry_Response Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_path(flatbuffers::Offset path) { - fbb_.AddOffset(Fs_Entry_Response::VT_PATH, path); - } void add_entries(flatbuffers::Offset>> entries) { fbb_.AddOffset(Fs_Entry_Response::VT_ENTRIES, entries); } @@ -1428,39 +1433,27 @@ struct Fs_Entry_ResponseBuilder { inline flatbuffers::Offset CreateFs_Entry_Response( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset path = 0, flatbuffers::Offset>> entries = 0) { Fs_Entry_ResponseBuilder builder_(_fbb); builder_.add_entries(entries); - builder_.add_path(path); return builder_.Finish(); } inline flatbuffers::Offset CreateFs_Entry_ResponseDirect( flatbuffers::FlatBufferBuilder &_fbb, - const char *path = nullptr, const std::vector> *entries = nullptr) { - auto path__ = path ? _fbb.CreateString(path) : 0; auto entries__ = entries ? _fbb.CreateVector>(*entries) : 0; return fbschema::p2pmsg::CreateFs_Entry_Response( _fbb, - path__, entries__); } struct File_HashMap_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef File_HashMap_ResponseBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PATH = 4, - VT_FILE_LENGTH = 6, - VT_HASH_MAP = 8 + VT_FILE_LENGTH = 4, + VT_HASH_MAP = 6 }; - const flatbuffers::String *path() const { - return GetPointer(VT_PATH); - } - flatbuffers::String *mutable_path() { - return GetPointer(VT_PATH); - } uint64_t file_length() const { return GetField(VT_FILE_LENGTH, 0); } @@ -1475,8 +1468,6 @@ struct File_HashMap_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Tabl } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PATH) && - verifier.VerifyString(path()) && VerifyField(verifier, VT_FILE_LENGTH) && VerifyOffset(verifier, VT_HASH_MAP) && verifier.VerifyVector(hash_map()) && @@ -1488,9 +1479,6 @@ struct File_HashMap_ResponseBuilder { typedef File_HashMap_Response Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_path(flatbuffers::Offset path) { - fbb_.AddOffset(File_HashMap_Response::VT_PATH, path); - } void add_file_length(uint64_t file_length) { fbb_.AddElement(File_HashMap_Response::VT_FILE_LENGTH, file_length, 0); } @@ -1510,26 +1498,21 @@ struct File_HashMap_ResponseBuilder { inline flatbuffers::Offset CreateFile_HashMap_Response( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset path = 0, uint64_t file_length = 0, flatbuffers::Offset> hash_map = 0) { File_HashMap_ResponseBuilder builder_(_fbb); builder_.add_file_length(file_length); builder_.add_hash_map(hash_map); - builder_.add_path(path); return builder_.Finish(); } inline flatbuffers::Offset CreateFile_HashMap_ResponseDirect( flatbuffers::FlatBufferBuilder &_fbb, - const char *path = nullptr, uint64_t file_length = 0, const std::vector *hash_map = nullptr) { - auto path__ = path ? _fbb.CreateString(path) : 0; auto hash_map__ = hash_map ? _fbb.CreateVector(*hash_map) : 0; return fbschema::p2pmsg::CreateFile_HashMap_Response( _fbb, - path__, file_length, hash_map__); } @@ -1537,16 +1520,9 @@ inline flatbuffers::Offset CreateFile_HashMap_ResponseDir struct Block_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { typedef Block_ResponseBuilder Builder; enum FlatBuffersVTableOffset FLATBUFFERS_VTABLE_UNDERLYING_TYPE { - VT_PATH = 4, - VT_BLOCK_ID = 6, - VT_DATA = 8 + VT_BLOCK_ID = 4, + VT_DATA = 6 }; - const flatbuffers::String *path() const { - return GetPointer(VT_PATH); - } - flatbuffers::String *mutable_path() { - return GetPointer(VT_PATH); - } uint32_t block_id() const { return GetField(VT_BLOCK_ID, 0); } @@ -1561,8 +1537,6 @@ struct Block_Response FLATBUFFERS_FINAL_CLASS : private flatbuffers::Table { } bool Verify(flatbuffers::Verifier &verifier) const { return VerifyTableStart(verifier) && - VerifyOffset(verifier, VT_PATH) && - verifier.VerifyString(path()) && VerifyField(verifier, VT_BLOCK_ID) && VerifyOffset(verifier, VT_DATA) && verifier.VerifyVector(data()) && @@ -1574,9 +1548,6 @@ struct Block_ResponseBuilder { typedef Block_Response Table; flatbuffers::FlatBufferBuilder &fbb_; flatbuffers::uoffset_t start_; - void add_path(flatbuffers::Offset path) { - fbb_.AddOffset(Block_Response::VT_PATH, path); - } void add_block_id(uint32_t block_id) { fbb_.AddElement(Block_Response::VT_BLOCK_ID, block_id, 0); } @@ -1596,26 +1567,21 @@ struct Block_ResponseBuilder { inline flatbuffers::Offset CreateBlock_Response( flatbuffers::FlatBufferBuilder &_fbb, - flatbuffers::Offset path = 0, uint32_t block_id = 0, flatbuffers::Offset> data = 0) { Block_ResponseBuilder builder_(_fbb); builder_.add_data(data); builder_.add_block_id(block_id); - builder_.add_path(path); return builder_.Finish(); } inline flatbuffers::Offset CreateBlock_ResponseDirect( flatbuffers::FlatBufferBuilder &_fbb, - const char *path = nullptr, uint32_t block_id = 0, const std::vector *data = nullptr) { - auto path__ = path ? _fbb.CreateString(path) : 0; auto data__ = data ? _fbb.CreateVector(*data) : 0; return fbschema::p2pmsg::CreateBlock_Response( _fbb, - path__, block_id, data__); } diff --git a/src/fbschema/p2pmsg_helpers.cpp b/src/fbschema/p2pmsg_helpers.cpp index a5fc3004..bc98cc93 100644 --- a/src/fbschema/p2pmsg_helpers.cpp +++ b/src/fbschema/p2pmsg_helpers.cpp @@ -18,7 +18,7 @@ namespace fbschema::p2pmsg constexpr size_t PEERCHALLENGE_LEN = 16; // Max size of messages which are subjected to time (too old) check. - constexpr size_t MAX_SIZE_FOR_TIME_CHECK = 10 * 1024 * 1024; // 10 MB + constexpr size_t MAX_SIZE_FOR_TIME_CHECK = 1 * 1024 * 1024; // 1 MB /** * This section contains Flatbuffer message reading/writing helpers. @@ -480,13 +480,13 @@ namespace fbschema::p2pmsg const flatbuffers::Offset resp = CreateFs_Entry_Response( builder, - sv_to_flatbuff_str(builder, path), statefshashentry_to_flatbuff_statefshashentry(builder, hash_nodes)); const flatbuffers::Offset st_resp = CreateState_Response_Message( builder, State_Response_Fs_Entry_Response, resp.Union(), - hash_to_flatbuff_bytes(builder, expected_hash)); + hash_to_flatbuff_bytes(builder, expected_hash), + sv_to_flatbuff_str(builder, path)); flatbuffers::Offset message = CreateContent(builder, Message_State_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -515,7 +515,6 @@ namespace fbschema::p2pmsg const flatbuffers::Offset resp = CreateFile_HashMap_Response( builder, - sv_to_flatbuff_str(builder, path), file_length, sv_to_flatbuff_bytes(builder, hashmap_sv)); @@ -523,7 +522,8 @@ namespace fbschema::p2pmsg builder, State_Response_File_HashMap_Response, resp.Union(), - hash_to_flatbuff_bytes(builder, expected_hash)); + hash_to_flatbuff_bytes(builder, expected_hash), + sv_to_flatbuff_str(builder, path)); flatbuffers::Offset message = CreateContent(builder, Message_State_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -547,7 +547,6 @@ namespace fbschema::p2pmsg const flatbuffers::Offset resp = CreateBlock_Response( builder, - sv_to_flatbuff_str(builder, block_resp.path), block_resp.block_id, sv_to_flatbuff_bytes(builder, block_resp.data)); @@ -555,7 +554,8 @@ namespace fbschema::p2pmsg builder, State_Response_Block_Response, resp.Union(), - hash_to_flatbuff_bytes(builder, block_resp.hash)); + hash_to_flatbuff_bytes(builder, block_resp.hash), + sv_to_flatbuff_str(builder, block_resp.path)); flatbuffers::Offset message = CreateContent(builder, Message_State_Response_Message, st_resp.Union()); builder.Finish(message); // Finished building message content to get serialised content. @@ -710,7 +710,7 @@ namespace fbschema::p2pmsg entry.name = flatbuff_str_to_sv(f_hash->name()); entry.is_file = f_hash->is_file(); entry.hash = flatbuff_bytes_to_hash(f_hash->hash()); - + fs_entries.emplace(entry.name, std::move(entry)); } } diff --git a/src/jsonschema/usrmsg_helpers.cpp b/src/jsonschema/usrmsg_helpers.cpp index affc7c6d..ba26952c 100644 --- a/src/jsonschema/usrmsg_helpers.cpp +++ b/src/jsonschema/usrmsg_helpers.cpp @@ -309,7 +309,8 @@ int extract_signed_input_container( return -1; } - // Verify the signature of the content. + // We do not verify the signature of the content here since we need to let each node + // (including self) to verify that individually after we broadcast the NUP proposal. const std::string content(d[FLD_CONTENT].GetString(), d[FLD_CONTENT].GetStringLength()); diff --git a/src/main.cpp b/src/main.cpp index 396c82c1..e5138c7c 100644 --- a/src/main.cpp +++ b/src/main.cpp @@ -13,6 +13,7 @@ #include "cons/cons.hpp" #include "hpfs/hpfs.hpp" #include "state/state_sync.hpp" +#include "state/state_serve.hpp" /** * Parses CLI args and extracts hot pocket command and parameters given. @@ -68,6 +69,7 @@ void deinit() cons::deinit(); sc::deinit(); state_sync::deinit(); + state_serve::deinit(); usr::deinit(); p2p::deinit(); hpfs::deinit(); @@ -191,7 +193,7 @@ int main(int argc, char **argv) << (conf::cfg.startup_mode == conf::OPERATING_MODE::OBSERVER ? "Observer" : "Proposer"); if (hpfs::init() != 0 || p2p::init() != 0 || usr::init() != 0 || - state_sync::init() != 0 || cons::init() != 0) + state_serve::init() != 0 || state_sync::init() != 0 || cons::init() != 0) { deinit(); return -1; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 3787ff31..82ad2d8e 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -14,6 +14,7 @@ namespace p2p // Holds global connected-peers and related objects. connected_context ctx; + uint64_t metric_thresholds[4]; bool init_success = false; /** @@ -22,6 +23,11 @@ namespace p2p */ int init() { + metric_thresholds[0] = conf::cfg.peermaxcpm; + metric_thresholds[1] = conf::cfg.peermaxdupmpm; + metric_thresholds[2] = conf::cfg.peermaxbadsigpm; + metric_thresholds[3] = conf::cfg.peermaxbadmpm; + //Entry point for p2p which will start peer connections to other nodes if (start_peer_connections() == -1) return -1; @@ -41,7 +47,6 @@ namespace p2p int start_peer_connections() { - const uint64_t metric_thresholds[] = {conf::cfg.peermaxcpm, conf::cfg.peermaxdupmpm, conf::cfg.peermaxbadsigpm, conf::cfg.peermaxbadmpm}; if (ctx.listener.start( conf::cfg.peerport, ".sock-peer", comm::SESSION_TYPE::PEER, true, false, metric_thresholds, conf::cfg.peers, conf::cfg.peermaxsize) == -1) return -1; diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 48b31bef..4fe09558 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -13,132 +13,136 @@ namespace p2p { -struct proposal -{ - std::string pubkey; - uint64_t timestamp; - uint64_t time; - uint8_t stage; - std::string lcl; - hpfs::h32 state; - std::set users; - std::set hash_inputs; - std::set hash_outputs; -}; + struct proposal + { + std::string pubkey; + uint64_t timestamp; + uint64_t time; + uint8_t stage; + std::string lcl; + hpfs::h32 state; + std::set users; + std::set hash_inputs; + std::set hash_outputs; + }; -struct nonunl_proposal -{ - std::unordered_map> user_messages; -}; + struct nonunl_proposal + { + std::unordered_map> user_messages; + }; -struct history_request -{ - std::string minimum_lcl; - std::string required_lcl; -}; + struct history_request + { + std::string minimum_lcl; + std::string required_lcl; + }; -struct history_ledger -{ - std::string state; - std::string lcl; - std::vector raw_ledger; -}; + struct history_ledger + { + std::string state; + std::string lcl; + std::vector raw_ledger; + }; -struct peer_challenge_response -{ - std::string challenge; - std::string signature; - std::string pubkey; -}; + struct peer_challenge_response + { + std::string challenge; + std::string signature; + std::string pubkey; + }; -enum LEDGER_RESPONSE_ERROR -{ - NONE = 0, - INVALID_MIN_LEDGER = 1, - REQ_LEDGER_NOT_FOUND = 2 -}; + enum LEDGER_RESPONSE_ERROR + { + NONE = 0, + INVALID_MIN_LEDGER = 1, + REQ_LEDGER_NOT_FOUND = 2 + }; -struct history_response -{ - std::map hist_ledgers; - LEDGER_RESPONSE_ERROR error; -}; + struct history_response + { + std::map hist_ledgers; + LEDGER_RESPONSE_ERROR error; + }; -struct npl_message -{ - std::string data; -}; + struct npl_message + { + std::string data; + }; -// Represents a state request sent to a peer. -struct state_request -{ - std::string parent_path; // The requested file or dir path. - bool is_file; // Whether the path is a file or dir. - int32_t block_id; // Block id of the file if we are requesting for file block. Otherwise -1. - hpfs::h32 expected_hash; // The expected hash of the requested result. -}; + // Represents a state request sent to a peer. + struct state_request + { + std::string parent_path; // The requested file or dir path. + bool is_file; // Whether the path is a file or dir. + int32_t block_id; // Block id of the file if we are requesting for file block. Otherwise -1. + hpfs::h32 expected_hash; // The expected hash of the requested result. + }; -// Represents state file system entry. -struct state_fs_hash_entry -{ - std::string name; // Name of the file/dir. - bool is_file; // Whether this is a file or dir. - hpfs::h32 hash; // Hash of the file or dir. -}; + // Represents state file system entry. + struct state_fs_hash_entry + { + std::string name; // Name of the file/dir. + bool is_file; // Whether this is a file or dir. + hpfs::h32 hash; // Hash of the file or dir. + }; -// Represents a file block data resposne. -struct block_response -{ - std::string path; // Path of the file. - uint32_t block_id; // Id of the block where the data belongs to. - std::string_view data; // The block data. - hpfs::h32 hash; // Hash of the bloc data. -}; + // Represents a file block data resposne. + struct block_response + { + std::string path; // Path of the file. + uint32_t block_id; // Id of the block where the data belongs to. + std::string_view data; // The block data. + hpfs::h32 hash; // Hash of the bloc data. + }; -struct message_collection -{ - std::list proposals; - std::mutex proposals_mutex; // Mutex for proposals access race conditions. + struct message_collection + { + std::list proposals; + std::mutex proposals_mutex; // Mutex for proposals access race conditions. - std::list nonunl_proposals; - std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. + std::list nonunl_proposals; + std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. - // NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract. - std::list npl_messages; - std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions. + // NPL messages are stored as string list because we are feeding the npl messages as it is (byte array) to the contract. + std::list npl_messages; + std::mutex npl_messages_mutex; // Mutex for npl_messages access race conditions. - std::list state_response; - std::mutex state_response_mutex; // Mutex for state response access race conditions. -}; + // List of pairs indicating the session pubkey hex and the state requests. + std::list> state_requests; + std::mutex state_requests_mutex; // Mutex for state requests access race conditions. -struct connected_context -{ - // Holds all the messages until they are processed by consensus. - message_collection collected_msgs; + std::list state_responses; + std::mutex state_responses_mutex; // Mutex for state responses access race conditions. + }; - // Set of currently connected peer connections mapped by the uniqueid of socket session. - std::unordered_map peer_connections; + struct connected_context + { + // Holds all the messages until they are processed by consensus. + message_collection collected_msgs; - std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. + // Set of currently connected peer connections mapped by the uniqueid of socket session. + std::unordered_map peer_connections; - comm::comm_server listener; -}; + std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. -extern connected_context ctx; + comm::comm_server listener; + }; -int init(); + extern connected_context ctx; -void deinit(); + int init(); -int start_peer_connections(); + void deinit(); -int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp); + int start_peer_connections(); -void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self); + int resolve_peer_challenge(comm::comm_session &session, const peer_challenge_response &challenge_resp); -void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self); -void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf); + void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); + + void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf); } // namespace p2p diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index c3def4d2..e6d95666 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -13,7 +13,6 @@ #include "peer_session_handler.hpp" #include "../cons/ledger_handler.hpp" #include "../state/state_sync.hpp" -#include "../state/state_serve.hpp" #include "../cons/cons.hpp" namespace p2pmsg = fbschema::p2pmsg; @@ -145,15 +144,16 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_State_Request_Message) { - const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); - flatbuffers::FlatBufferBuilder fbuf(1024); - - if (state_serve::create_state_response(fbuf, sr) == 0) + if (p2pmsg::validate_container_trust(container) != 0) { - std::string_view msg = std::string_view( - reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - session.send(msg); + LOG_DBG << "State request message rejected due to trust failure. " << session.uniqueid; + return 0; } + + // Insert request with lock. + std::lock_guard lock(ctx.collected_msgs.state_requests_mutex); + std::string state_request_msg(reinterpret_cast(content_ptr), content_size); + ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg))); } else if (content_message_type == p2pmsg::Message_State_Response_Message) { @@ -166,13 +166,19 @@ namespace p2p if (state_sync::ctx.is_syncing) // Only accept state responses if state is syncing. { // Insert state_response with lock. - std::lock_guard lock(ctx.collected_msgs.state_response_mutex); + std::lock_guard lock(ctx.collected_msgs.state_responses_mutex); std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_response.push_back(std::move(response)); + ctx.collected_msgs.state_responses.push_back(std::move(response)); } } else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message { + if (p2pmsg::validate_container_trust(container) != 0) + { + LOG_DBG << "History request message rejected due to trust failure. " << session.uniqueid; + return 0; + } + const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message()); //first check node has the required lcl available. -> if so send lcl history accordingly. const bool req_lcl_avail = cons::check_required_lcl_availability(hr); diff --git a/src/state/state_common.hpp b/src/state/state_common.hpp new file mode 100644 index 00000000..e0338d21 --- /dev/null +++ b/src/state/state_common.hpp @@ -0,0 +1,17 @@ +#ifndef _HP_CONS_STATE_COMMON_ +#define _HP_CONS_STATE_COMMON_ + +#include "../pchheader.hpp" +#include "../conf.hpp" + +namespace state_common +{ + constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + + inline uint16_t get_request_resubmit_timeout() + { + return conf::cfg.roundtime; + } +} + +#endif \ No newline at end of file diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index bcad23c6..cda33ae2 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -9,13 +9,101 @@ #include "../cons/cons.hpp" #include "../hplog.hpp" #include "state_serve.hpp" +#include "state_common.hpp" + +namespace p2pmsg = fbschema::p2pmsg; /** * Helper functions for serving state requests from other peers. */ namespace state_serve { - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; + constexpr uint16_t LOOP_WAIT = 100; // Milliseconds + + uint16_t REQUEST_BATCH_TIMEOUT; + + bool is_shutting_down = false; + bool init_success = false; + std::thread state_serve_thread; + + int init() + { + REQUEST_BATCH_TIMEOUT = state_common::get_request_resubmit_timeout() * 0.9; + state_serve_thread = std::thread(state_serve_loop); + init_success = true; + return 0; + } + + void deinit() + { + if (init_success) + { + is_shutting_down = true; + state_serve_thread.join(); + } + } + + void state_serve_loop() + { + util::mask_signal(); + + LOG_INFO << "State server started."; + + std::list> state_requests; + + while (!is_shutting_down) + { + util::sleep(LOOP_WAIT); + + { + std::lock_guard lock(p2p::ctx.collected_msgs.state_requests_mutex); + + // Move collected state requests over to local requests list. + if (!p2p::ctx.collected_msgs.state_requests.empty()) + state_requests.splice(state_requests.end(), p2p::ctx.collected_msgs.state_requests); + } + + uint64_t time_start = util::get_epoch_milliseconds(); + + for (auto &[session_id, request] : state_requests) + { + if (is_shutting_down) + break; + + const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(request.data()); + + const p2p::state_request sr = p2pmsg::create_state_request_from_msg(*content->message_as_State_Request_Message()); + flatbuffers::FlatBufferBuilder fbuf(1024); + + uint64_t time_now = util::get_epoch_milliseconds(); + + // If we have spent too much time handling state requests, abandon the entire batch + // because the requester would have stopped waiting for us. + if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT) + break; + + if (state_serve::create_state_response(fbuf, sr) == 0) + { + // Find the peer that we should send the state response to. + std::lock_guard lock(p2p::ctx.peer_connections_mutex); + const auto peer_itr = p2p::ctx.peer_connections.find(session_id); + + if (peer_itr != p2p::ctx.peer_connections.end()) + { + std::string_view msg = std::string_view( + reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); + + const comm::comm_session *session = peer_itr->second; + session->send(msg); + } + } + } + + state_requests.clear(); + } + + LOG_INFO << "State server stopped."; + } /** * Creates the reply message for a given state request. @@ -34,7 +122,7 @@ namespace state_serve std::vector block; if (get_file_block(block, sr.parent_path, sr.block_id, sr.expected_hash) == -1) { - LOG_ERR << "Error in getting file block."; + LOG_ERR << "Error in getting file block: " << sr.parent_path; return -1; } @@ -55,7 +143,7 @@ namespace state_serve std::size_t file_length = 0; if (get_file_block_hashes(block_hashes, file_length, sr.parent_path, sr.expected_hash) == -1) { - LOG_ERR << "Error in getting block hashes."; + LOG_ERR << "Error in getting block hashes: " << sr.parent_path; return -1; } @@ -70,7 +158,7 @@ namespace state_serve std::vector child_hash_nodes; if (get_fs_entry_hashes(child_hash_nodes, sr.parent_path, sr.expected_hash) == -1) { - LOG_ERR << "Error in getting fs entries."; + LOG_ERR << "Error in getting fs entries: " << sr.parent_path; return -1; } @@ -115,7 +203,7 @@ namespace state_serve // Get actual block data. { const std::string file_path = std::string(mount_dir).append(vpath); - const off_t block_offset = block_id * BLOCK_SIZE; + const off_t block_offset = block_id * state_common::BLOCK_SIZE; fd = open(file_path.c_str(), O_RDONLY); if (fd == -1) { @@ -142,7 +230,7 @@ namespace state_serve goto failure; } - const size_t read_len = MIN(BLOCK_SIZE, (st.st_size - block_offset)); + const size_t read_len = MIN(state_common::BLOCK_SIZE, (st.st_size - block_offset)); block.resize(read_len); lseek(fd, block_offset, SEEK_SET); diff --git a/src/state/state_serve.hpp b/src/state/state_serve.hpp index e3e1efc5..d4aa7d32 100644 --- a/src/state/state_serve.hpp +++ b/src/state/state_serve.hpp @@ -8,6 +8,12 @@ namespace state_serve { + int init(); + + void deinit(); + + void state_serve_loop(); + int create_state_response(flatbuffers::FlatBufferBuilder &fbuf, const p2p::state_request &sr); int get_file_block(std::vector &vec, const std::string_view vpath, diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index d1dbb6b4..f0cf3037 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -1,4 +1,3 @@ -#include "../state/state_sync.hpp" #include "../fbschema/p2pmsg_helpers.hpp" #include "../fbschema/p2pmsg_content_generated.h" #include "../fbschema/common_helpers.hpp" @@ -9,6 +8,8 @@ #include "../util.hpp" #include "../hpfs/hpfs.hpp" #include "../hpfs/h32.hpp" +#include "state_sync.hpp" +#include "state_common.hpp" namespace state_sync { @@ -16,33 +17,35 @@ namespace state_sync constexpr uint16_t IDLE_WAIT = 50; // Max number of requests that can be awaiting response at any given time. - constexpr uint16_t MAX_AWAITING_REQUESTS = 1; + constexpr uint16_t MAX_AWAITING_REQUESTS = 4; // Request loop sleep time (milliseconds). constexpr uint16_t REQUEST_LOOP_WAIT = 20; - constexpr size_t BLOCK_SIZE = 4 * 1024 * 1024; // 4MB; - constexpr int FILE_PERMS = 0644; // No. of milliseconds to wait before resubmitting a request. uint16_t REQUEST_RESUBMIT_TIMEOUT; - sync_context ctx; + bool init_success = false; int init() { - REQUEST_RESUBMIT_TIMEOUT = conf::cfg.roundtime / 2; + REQUEST_RESUBMIT_TIMEOUT = state_common::get_request_resubmit_timeout(); ctx.target_state = hpfs::h32_empty; ctx.state_sync_thread = std::thread(state_syncer_loop); + init_success = true; return 0; } void deinit() { - ctx.is_syncing = false; - ctx.is_shutting_down = true; - ctx.state_sync_thread.join(); + if (init_success) + { + ctx.is_syncing = false; + ctx.is_shutting_down = true; + ctx.state_sync_thread.join(); + } } /** @@ -144,11 +147,11 @@ namespace state_sync util::sleep(REQUEST_LOOP_WAIT); { - std::lock_guard lock(p2p::ctx.collected_msgs.state_response_mutex); + std::lock_guard lock(p2p::ctx.collected_msgs.state_responses_mutex); // Move collected state responses over to local candidate responses list. - if (!p2p::ctx.collected_msgs.state_response.empty()) - ctx.candidate_state_responses.splice(ctx.candidate_state_responses.end(), p2p::ctx.collected_msgs.state_response); + if (!p2p::ctx.collected_msgs.state_responses.empty()) + ctx.candidate_state_responses.splice(ctx.candidate_state_responses.end(), p2p::ctx.collected_msgs.state_responses); } for (auto &response : ctx.candidate_state_responses) @@ -159,12 +162,15 @@ namespace state_sync const fbschema::p2pmsg::Content *content = fbschema::p2pmsg::GetContent(response.data()); const fbschema::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message(); - // Check whether we are actually waiting for this response's hash. If not, ignore it. - const hpfs::h32 response_hash = fbschema::flatbuff_bytes_to_hash(resp_msg->hash()); - const auto pending_resp_itr = ctx.submitted_requests.find(response_hash); + // Check whether we are actually waiting for this response. If not, ignore it. + std::string_view hash = fbschema::flatbuff_bytes_to_sv(resp_msg->hash()); + std::string_view vpath = fbschema::flatbuff_str_to_sv(resp_msg->path()); + + const std::string key = std::string(vpath).append(hash); + const auto pending_resp_itr = ctx.submitted_requests.find(key); if (pending_resp_itr == ctx.submitted_requests.end()) { - LOG_DBG << "Skipping state response due to hash mismatch. Received:" << response_hash; + LOG_DBG << "Skipping state response due to hash mismatch."; continue; } @@ -175,11 +181,11 @@ namespace state_sync const fbschema::p2pmsg::State_Response msg_type = resp_msg->state_response_type(); if (msg_type == fbschema::p2pmsg::State_Response_Fs_Entry_Response) - handle_fs_entry_response(resp_msg->state_response_as_Fs_Entry_Response()); + handle_fs_entry_response(vpath, resp_msg->state_response_as_Fs_Entry_Response()); else if (msg_type == fbschema::p2pmsg::State_Response_File_HashMap_Response) - handle_file_hashmap_response(resp_msg->state_response_as_File_HashMap_Response()); + handle_file_hashmap_response(vpath, resp_msg->state_response_as_File_HashMap_Response()); else if (msg_type == fbschema::p2pmsg::State_Response_Block_Response) - handle_file_block_response(resp_msg->state_response_as_Block_Response()); + handle_file_block_response(vpath, resp_msg->state_response_as_Block_Response()); // After handling each response, check whether we have reached target state. hpfs::get_hash(updated_state, ctx.hpfs_mount_dir, "/"); @@ -269,7 +275,9 @@ namespace state_sync << " path:" << request.path << " block_id:" << request.block_id << " hash:" << request.expected_hash; - ctx.submitted_requests.try_emplace(request.expected_hash, request); + const std::string key = std::string(request.path) + .append(reinterpret_cast(&request.expected_hash), sizeof(hpfs::h32)); + ctx.submitted_requests.try_emplace(key, request); const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash); @@ -278,10 +286,9 @@ namespace state_sync /** * Process dir children response. */ - int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp) + int handle_fs_entry_response(std::string_view parent_vpath, const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp) { // Get the parent path of the fs entries we have received. - std::string_view parent_vpath = fbschema::flatbuff_str_to_sv(fs_entry_resp->path()); LOG_DBG << "State sync: Processing fs entries response for " << parent_vpath; // Get fs entries we have received. @@ -355,10 +362,9 @@ namespace state_sync /** * Process file block hash map response. */ - int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp) + int handle_file_hashmap_response(std::string_view file_vpath, const fbschema::p2pmsg::File_HashMap_Response *file_resp) { // Get the file path of the block hashes we have received. - std::string file_vpath = std::string(fbschema::flatbuff_str_to_sv(file_resp->path())); LOG_DBG << "State sync: Processing file block hashes response for " << file_vpath; // File block hashes on our side (file might not exist on our side). @@ -378,7 +384,7 @@ namespace state_sync { // Insert at front to give priority to block requests while preserving block order. if (block_id >= existing_hash_count || existing_hashes[block_id] != peer_hashes[block_id]) - ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, file_vpath, block_id, peer_hashes[block_id]}); + ctx.pending_requests.insert(insert_itr, backlog_item{BACKLOG_ITEM_TYPE::BLOCK, std::string(file_vpath), block_id, peer_hashes[block_id]}); } if (existing_hashes.size() >= peer_hash_count) @@ -395,10 +401,9 @@ namespace state_sync /** * Process file block response. */ - int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg) + int handle_file_block_response(std::string_view file_vpath, const fbschema::p2pmsg::Block_Response *block_msg) { // Get the file path of the block data we have received. - std::string_view file_vpath = fbschema::flatbuff_str_to_sv(block_msg->path()); const uint32_t block_id = block_msg->block_id(); std::string_view buf = fbschema::flatbuff_bytes_to_sv(block_msg->data()); @@ -414,7 +419,7 @@ namespace state_sync return -1; } - const off_t offset = block_id * BLOCK_SIZE; + const off_t offset = block_id * state_common::BLOCK_SIZE; const int res = pwrite(fd, buf.data(), buf.length(), offset); close(fd); if (res < buf.length()) diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp index 599b7323..6cd68f0e 100644 --- a/src/state/state_sync.hpp +++ b/src/state/state_sync.hpp @@ -40,8 +40,8 @@ namespace state_sync // List of pending sync requests to be sent out. std::list pending_requests; - // List of submitted requests we are awaiting responses for, keyed by expected response hash. - std::unordered_map submitted_requests; + // List of submitted requests we are awaiting responses for, keyed by expected response path+hash. + std::unordered_map submitted_requests; std::thread state_sync_thread; std::mutex target_state_update_lock; @@ -72,11 +72,11 @@ namespace state_sync void submit_request(const backlog_item &request); - int handle_fs_entry_response(const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp); + int handle_fs_entry_response(std::string_view parent_vpath, const fbschema::p2pmsg::Fs_Entry_Response *fs_entry_resp); - int handle_file_hashmap_response(const fbschema::p2pmsg::File_HashMap_Response *file_resp); + int handle_file_hashmap_response(std::string_view file_vpath, const fbschema::p2pmsg::File_HashMap_Response *file_resp); - int handle_file_block_response(const fbschema::p2pmsg::Block_Response *block_msg); + int handle_file_block_response(std::string_view file_vpath, const fbschema::p2pmsg::Block_Response *block_msg); } // namespace state_sync diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index 5ee3dec8..183aeecf 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -18,6 +18,7 @@ namespace usr // Holds global connected-users and related objects. connected_context ctx; + uint64_t metric_thresholds[4]; bool init_success = false; /** @@ -26,6 +27,11 @@ namespace usr */ int init() { + metric_thresholds[0] = conf::cfg.pubmaxcpm; + metric_thresholds[1] = 0; + metric_thresholds[2] = 0; + metric_thresholds[3] = conf::cfg.pubmaxbadmpm; + // Start listening for incoming user connections. if (start_listening() == -1) return -1; @@ -48,7 +54,6 @@ namespace usr */ int start_listening() { - const uint64_t metric_thresholds[] = {conf::cfg.pubmaxcpm, 0, 0, conf::cfg.pubmaxbadmpm}; if (ctx.listener.start( conf::cfg.pubport, ".sock-user", comm::SESSION_TYPE::USER, true, true, metric_thresholds, std::set(), conf::cfg.pubmaxsize) == -1) return -1;