diff --git a/src/ledger.cpp b/src/ledger.cpp index 83ce28c5..5e92ea8b 100644 --- a/src/ledger.cpp +++ b/src/ledger.cpp @@ -410,9 +410,11 @@ namespace ledger flatbuffers::FlatBufferBuilder fbuf(1024); p2pmsg::create_msg_from_history_request(fbuf, hr); - p2p::send_message_to_random_peer(fbuf); - LOG_DEBUG << "Ledger history request sent. Required lcl:" << required_lcl.substr(0, 15); + std::string target_pubkey; + p2p::send_message_to_random_peer(fbuf, target_pubkey); + + LOG_DEBUG << "Ledger history requested from [" << target_pubkey.substr(0, 10) << "]. Required lcl:" << required_lcl.substr(0, 15); } /** diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 6236146f..d7f9287f 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -125,7 +125,7 @@ namespace p2p session.challenge_status = comm::CHALLENGE_VERIFIED; ex_session.mark_for_closure(); - p2p::ctx.peer_connections.erase(iter); // remove existing session. + p2p::ctx.peer_connections.erase(iter); // remove existing session. // We have to keep the weekly connected status of the removed session object. // If not, connected status received prior to connection dropping will be lost. session.is_weakly_connected = ex_session.is_weakly_connected; @@ -245,8 +245,9 @@ namespace p2p /** * Sends the given message to a random peer (except self). * @param fbuf Peer outbound message to be sent to peer. + * @param target_pubkey Randomly selected target peer pubkey. */ - void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf) + void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey) { //Send while locking the peer_connections. std::scoped_lock lock(p2p::ctx.peer_connections_mutex); @@ -278,6 +279,7 @@ namespace p2p reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); session->send(msg); + target_pubkey = session->uniqueid; break; } } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 04d0d5fb..f4e49c73 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -110,7 +110,8 @@ namespace p2p std::list> state_requests; std::mutex state_requests_mutex; // Mutex for state requests access race conditions. - std::list state_responses; + // List of pairs indicating the session pubkey hex and the state responses. + std::list> state_responses; std::mutex state_responses_mutex; // Mutex for state responses access race conditions. }; @@ -143,7 +144,7 @@ namespace p2p void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); - void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf); + void send_message_to_random_peer(const flatbuffers::FlatBufferBuilder &fbuf, std::string &target_pubkey); bool validate_for_peer_msg_forwarding(const comm::comm_session &session, const msg::fbuf::p2pmsg::Container *container, const msg::fbuf::p2pmsg::Message &content_message_type); diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 79491d4b..caa8330c 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -173,7 +173,6 @@ namespace p2p } else if (content_message_type == p2pmsg::Message_State_Request_Message) { - // Insert request with lock. std::scoped_lock lock(ctx.collected_msgs.state_requests_mutex); std::string state_request_msg(reinterpret_cast(content_ptr), content_size); @@ -186,7 +185,7 @@ namespace p2p // Insert state_response with lock. std::scoped_lock lock(ctx.collected_msgs.state_responses_mutex); std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_responses.push_back(std::move(response)); + ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); } } else if (content_message_type == p2pmsg::Message_History_Request_Message) //message is a lcl history request message diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index 2dadc697..cce9e194 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -89,7 +89,12 @@ namespace state_serve // because the requester would have stopped waiting for us. const uint64_t time_now = util::get_epoch_milliseconds(); if ((time_now - time_start) > REQUEST_BATCH_TIMEOUT) + { + LOG_DEBUG << "State serve batch timeout. Abandonding state requests."; break; + } + + LOG_DEBUG << "Serving state request from [" << session_id.substr(0, 10) << "]"; const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data()); diff --git a/src/state/state_sync.cpp b/src/state/state_sync.cpp index bcaf4a81..eef7fe9a 100644 --- a/src/state/state_sync.cpp +++ b/src/state/state_sync.cpp @@ -119,7 +119,7 @@ namespace state_sync } } } - + hpfs::stop_fs_session(ctx.hpfs_mount_dir); } else @@ -162,7 +162,9 @@ namespace state_sync if (should_stop_request_loop(current_target)) return; - const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.data()); + LOG_DEBUG << "Processing state resposne from [" << response.first.substr(0, 10) << "]"; + + const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(response.second.data()); const msg::fbuf::p2pmsg::State_Response_Message *resp_msg = content->message_as_State_Response_Message(); // Check whether we are actually waiting for this response. If not, ignore it. @@ -173,7 +175,7 @@ namespace state_sync const auto pending_resp_itr = ctx.submitted_requests.find(key); if (pending_resp_itr == ctx.submitted_requests.end()) { - LOG_DEBUG << "Skipping state response due to hash mismatch."; + LOG_DEBUG << "State sync: Skipping state response due to hash mismatch."; continue; } @@ -260,9 +262,10 @@ namespace state_sync * @param is_file Whether the requested path if a file or dir. * @param block_id The requested block id. Only relevant if requesting a file block. Otherwise -1. * @param expected_hash The expected hash of the requested data. The peer will ignore the request if their hash is different. + * @param target_pubkey The peer pubkey the request was submitted to. */ void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, - const hpfs::h32 expected_hash, std::string_view lcl) + const hpfs::h32 expected_hash, std::string_view lcl, std::string &target_pubkey) { p2p::state_request sr; sr.parent_path = path; @@ -272,7 +275,7 @@ namespace state_sync flatbuffers::FlatBufferBuilder fbuf(1024); msg::fbuf::p2pmsg::create_msg_from_state_request(fbuf, sr, lcl); - p2p::send_message_to_random_peer(fbuf); //todo: send to a node that hold the majority state to improve reliability of retrieving state. + p2p::send_message_to_random_peer(fbuf, target_pubkey); //todo: send to a node that hold the majority state to improve reliability of retrieving state. } /** @@ -280,16 +283,17 @@ namespace state_sync */ void submit_request(const backlog_item &request, std::string_view lcl) { - LOG_DEBUG << "State sync: Submitting request. type:" << request.type - << " path:" << request.path << " block_id:" << request.block_id - << " hash:" << request.expected_hash; - const std::string key = std::string(request.path) .append(reinterpret_cast(&request.expected_hash), sizeof(hpfs::h32)); ctx.submitted_requests.try_emplace(key, request); const bool is_file = request.type != BACKLOG_ITEM_TYPE::DIR; - request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl); + std::string target_pubkey; + request_state_from_peer(request.path, is_file, request.block_id, request.expected_hash, lcl, target_pubkey); + + LOG_DEBUG << "State sync: Requesting from [" << target_pubkey.substr(0, 10) << "]. type:" << request.type + << " path:" << request.path << " block_id:" << request.block_id + << " hash:" << request.expected_hash; } /** diff --git a/src/state/state_sync.hpp b/src/state/state_sync.hpp index a8794e39..83ccab66 100644 --- a/src/state/state_sync.hpp +++ b/src/state/state_sync.hpp @@ -34,8 +34,8 @@ namespace state_sync // The current target state we are syncing towards. hpfs::h32 target_state; - // List of state responses flatbuffer messages to be processed. - std::list candidate_state_responses; + // List of sender pubkeys and state responses(flatbuffer messages) to be processed. + std::list> candidate_state_responses; // List of pending sync requests to be sent out. std::list pending_requests; @@ -69,7 +69,7 @@ namespace state_sync bool should_stop_request_loop(const hpfs::h32 current_target); void request_state_from_peer(const std::string &path, const bool is_file, const int32_t block_id, - const hpfs::h32 expected_hash, std::string_view lcl); + const hpfs::h32 expected_hash, std::string_view lcl, std::string &target_pubkey); void submit_request(const backlog_item &request, std::string_view lcl);