diff --git a/src/comm/comm_session.cpp b/src/comm/comm_session.cpp index 7090515f..8112864a 100644 --- a/src/comm/comm_session.cpp +++ b/src/comm/comm_session.cpp @@ -235,6 +235,12 @@ namespace comm */ const std::string comm_session::display_name() { + if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) + { + // Sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix). + return uniqueid.substr(2, 10) + (is_inbound ? ":in" : ":out"); + } + return uniqueid + (is_inbound ? ":in" : ":out"); } diff --git a/src/comm/comm_session.hpp b/src/comm/comm_session.hpp index 1525b198..2b64bb70 100644 --- a/src/comm/comm_session.hpp +++ b/src/comm/comm_session.hpp @@ -47,7 +47,8 @@ namespace comm virtual void handle_on_verified(); public: - std::string uniqueid; + std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address. + std::string pubkey; // Pubkey in binary format. const bool is_inbound; const std::string host_address; // Connection host address of the remote party. std::string issued_challenge; diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 60fadc56..b605c49d 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -74,7 +74,6 @@ namespace p2p } // Converting the binary pub key into hexadecimal string. - // This will be used as the lookup key in storing peer sessions. std::string pubkeyhex; util::bin2hex(pubkeyhex, reinterpret_cast(challenge_resp.pubkey.data()), challenge_resp.pubkey.length()); @@ -94,14 +93,17 @@ namespace p2p std::scoped_lock lock(ctx.peer_connections_mutex); - const auto iter = ctx.peer_connections.find(pubkeyhex); + const auto iter = ctx.peer_connections.find(challenge_resp.pubkey); if (iter == ctx.peer_connections.end()) { // Add the new connection straight away, if we haven't seen it before. session.uniqueid.swap(pubkeyhex); + session.pubkey = challenge_resp.pubkey; + session.is_unl = unl::exists(session.pubkey); // Mark the connection as a verified connection. session.mark_as_verified(); - ctx.peer_connections.try_emplace(session.uniqueid, &session); + // Public key in binary format will be used as the lookup key in storing peer sessions. + ctx.peer_connections.try_emplace(session.pubkey, &session); LOG_DEBUG << "Accepted verified connection [" << session.display_name() << "]"; return 0; @@ -120,6 +122,8 @@ namespace p2p if (!session.known_ipport.has_value()) session.known_ipport.swap(ex_session.known_ipport); session.uniqueid.swap(pubkeyhex); + session.pubkey = challenge_resp.pubkey; + session.is_unl = unl::exists(session.pubkey); // Mark the connection as a verified connection. session.mark_as_verified(); @@ -128,7 +132,8 @@ namespace p2p // We have to keep the peer requirements of the removed session object. // If not, requirements received prior to connection dropping will be lost. session.need_consensus_msg_forwarding = ex_session.need_consensus_msg_forwarding; - ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session. + // Public key in binary format will be used as the lookup key in storing peer sessions. + ctx.peer_connections.try_emplace(session.pubkey, &session); // add new session. LOG_DEBUG << "Replacing existing connection [" << ex_session.display_name() << "] with [" << session.display_name() << "]"; return 0; @@ -152,24 +157,24 @@ namespace p2p * @param fbuf Peer outbound message to be broadcasted. * @param send_to_self Whether to also send the message to self (this node). * @param is_msg_forwarding Whether this broadcast is for message forwarding. - * @param only_to_trusted_peers Whether this broadcast is only for the trusted nodes. + * @param unl_only Whether this broadcast is only for the trusted nodes. */ - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool only_to_trusted_peers) + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only) { std::string_view msg = std::string_view( reinterpret_cast(fbuf.GetBufferPointer()), fbuf.GetSize()); - broadcast_message(msg, send_to_self, is_msg_forwarding, only_to_trusted_peers); + broadcast_message(msg, send_to_self, is_msg_forwarding, unl_only); } /** * Broadcast the given message to all connected outbound peers. * @param message Message to be forwarded. * @param is_msg_forwarding Whether this broadcast is for message forwarding. - * @param only_to_trusted_peers Whether this broadcast is only for the trusted nodes. + * @param unl_only Whether this broadcast is only for the trusted nodes. * @param skipping_session Session to be skipped in message forwarding(optional). */ - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool only_to_trusted_peers, const peer_comm_session *skipping_session) + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session) { if (send_to_self) self::send(message); @@ -183,7 +188,7 @@ namespace p2p // Messages are forwarded only to the requested nodes only in the message forwarding mode. if ((skipping_session && skipping_session == session) || (is_msg_forwarding && !session->need_consensus_msg_forwarding) || - (only_to_trusted_peers && !unl::exists(session->uniqueid, true))) + (unl_only && !session->is_unl)) continue; session->send(message); @@ -429,4 +434,17 @@ namespace p2p return -1; } + /** + * Update the peer trusted status on unl list updates. + */ + void update_unl_connections() + { + std::scoped_lock lock(ctx.peer_connections_mutex); + + for (const auto &[k, session] : ctx.peer_connections) + { + session->is_unl = unl::exists(session->pubkey); + } + } + } // namespace p2p \ No newline at end of file diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 2a89da21..757b234e 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -133,7 +133,7 @@ namespace p2p // Holds all the messages until they are processed by consensus. message_collection collected_msgs; - // Set of currently connected peer connections mapped by the uniqueid of socket session. + // Set of currently connected peer connections mapped by the pubkey of socket session. std::unordered_map peer_connections; std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. @@ -151,9 +151,9 @@ namespace p2p int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp); - void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool only_to_trusted_peers = false); + void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false); - void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool only_to_trusted_peers = false, const peer_comm_session *skipping_session = NULL); + void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL); void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf); @@ -178,6 +178,8 @@ namespace p2p void sort_known_remotes(); int16_t get_available_capacity(); + + void update_unl_connections(); } // namespace p2p #endif \ No newline at end of file diff --git a/src/p2p/peer_comm_session.cpp b/src/p2p/peer_comm_session.cpp index 7b56e327..359b97a3 100644 --- a/src/p2p/peer_comm_session.cpp +++ b/src/p2p/peer_comm_session.cpp @@ -24,18 +24,4 @@ namespace p2p p2p::handle_peer_on_verified(*this); } - /** - * Returns printable name for the session based on uniqueid (used for logging). - */ - const std::string peer_comm_session::display_name() - { - if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) - { - // Peer sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix). - return uniqueid.substr(2, 10) + (is_inbound ? ":in" : ":out"); - } - - return comm_session::display_name(); - } - } // namespace p2p \ No newline at end of file diff --git a/src/p2p/peer_comm_session.hpp b/src/p2p/peer_comm_session.hpp index 07865840..79613498 100644 --- a/src/p2p/peer_comm_session.hpp +++ b/src/p2p/peer_comm_session.hpp @@ -21,11 +21,11 @@ namespace p2p void handle_on_verified(); public: - std::optional known_ipport; // A known ip/port information that matches with our peer list configuration. - bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding. - const std::string display_name(); + std::optional known_ipport; // A known ip/port information that matches with our peer list configuration. + bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding. + bool is_unl = false; // Whether this session's pubkey is in unl list. }; -} // namespace comm +} // namespace p2p #endif diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index e9091901..9c7bc72e 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -82,16 +82,16 @@ namespace p2p if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type)) { // Npl messages are forwarded only to trusted peers. - const bool only_to_trusted_peers = content_message_type == p2pmsg::Message_Npl_Message; + const bool unl_only = content_message_type == p2pmsg::Message_Npl_Message; if (session.need_consensus_msg_forwarding) { // Forward messages received by weakly connected nodes to other peers. - p2p::broadcast_message(message, false, false, only_to_trusted_peers, &session); + p2p::broadcast_message(message, false, false, unl_only, &session); } else { // Forward message received from other nodes to weakly connected peers. - p2p::broadcast_message(message, false, true, only_to_trusted_peers, &session); + p2p::broadcast_message(message, false, true, unl_only, &session); } } @@ -193,7 +193,7 @@ namespace p2p if (ctx.collected_msgs.state_requests.size() < p2p::STATE_REQ_LIST_CAP) { std::string state_request_msg(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg))); + ctx.collected_msgs.state_requests.push_back(std::make_pair(session.pubkey, std::move(state_request_msg))); } else { @@ -211,7 +211,7 @@ namespace p2p if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP) { std::string response(reinterpret_cast(content_ptr), content_size); - ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response))); + ctx.collected_msgs.state_responses.push_back(std::make_pair(session.pubkey, std::move(response))); } else { @@ -228,7 +228,7 @@ namespace p2p if (ledger::sync_ctx.collected_history_requests.size() < ledger::HISTORY_REQ_LIST_CAP) { const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message(), container->lcl()); - ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr))); + ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.pubkey, std::move(hr))); } else { @@ -360,9 +360,9 @@ namespace p2p int handle_peer_close(const p2p::peer_comm_session &session) { { - // Erase the corresponding uniqueid peer connection if it's this session. + // Erase the corresponding pubkey peer connection if it's this session. std::scoped_lock lock(ctx.peer_connections_mutex); - const auto itr = ctx.peer_connections.find(session.uniqueid); + const auto itr = ctx.peer_connections.find(session.pubkey); if (itr != ctx.peer_connections.end() && itr->second == &session) { ctx.peer_connections.erase(itr); diff --git a/src/state/state_serve.cpp b/src/state/state_serve.cpp index fb23944f..40499b11 100644 --- a/src/state/state_serve.cpp +++ b/src/state/state_serve.cpp @@ -100,7 +100,14 @@ namespace state_serve break; } - LOG_DEBUG << "Serving state request from [" << session_id.substr(0, 10) << "]"; + // Session id is in binary format. Converting to hex before printing. + std::string session_id_hex; + util::bin2hex( + session_id_hex, + reinterpret_cast(session_id.data()), + session_id.length()); + + LOG_DEBUG << "Serving state request from [" << session_id_hex.substr(2, 10) << "]"; const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data()); diff --git a/src/unl.cpp b/src/unl.cpp index 2f322955..adf69bc0 100644 --- a/src/unl.cpp +++ b/src/unl.cpp @@ -3,6 +3,7 @@ #include "conf.hpp" #include "unl.hpp" #include "crypto.hpp" +#include "p2p/p2p.hpp" /** * Manages the UNL public keys of this node. @@ -46,28 +47,11 @@ namespace unl /** * Check whether the given pubkey is in the unl list. - * @param pubkey Pubkey to check for existence. - * @param is_in_hex Whether the given pubkey is in hex format. + * @param bin_pubkey Pubkey to check for existence. * @return Return true if the given pubkey is in the unl list. */ - bool exists(const std::string &pubkey, const bool is_in_hex) + bool exists(const std::string &bin_pubkey) { - std::string bin_pubkey = pubkey; - if (is_in_hex) - { - // If the given pubkey is in hex format, convert the public key to binary. - std::string temp_bin_pubkey; - temp_bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES); - if (util::hex2bin( - reinterpret_cast(temp_bin_pubkey.data()), - temp_bin_pubkey.length(), - pubkey) != 0) - { - LOG_ERROR << "Error decoding hex pubkey.\n"; - return false; - } - bin_pubkey.swap(temp_bin_pubkey); - } std::shared_lock lock(unl_mutex); return list.find(bin_pubkey) != list.end(); } @@ -81,6 +65,7 @@ namespace unl return; std::unique_lock lock(unl_mutex); + const size_t initial_count = list.size(); for (const std::string &pubkey : additions) list.emplace(pubkey); @@ -91,7 +76,17 @@ namespace unl update_json_list(); conf::persist_unl_update(list); - LOG_INFO << "UNL updated. Count:" << list.size(); + const size_t updated_count = list.size(); + + // Unlock unique lock. A shared lock is applied to the list inside the update unl connection function + // because it use unl::exists function call. + lock.unlock(); + + // Update the is_unl flag of peer sessions. + if (initial_count != updated_count) + p2p::update_unl_connections(); + + LOG_INFO << "UNL updated. Count:" << updated_count; } void update_json_list() diff --git a/src/unl.hpp b/src/unl.hpp index 1da2d5c9..f06a4b90 100644 --- a/src/unl.hpp +++ b/src/unl.hpp @@ -11,7 +11,7 @@ namespace unl size_t count(); std::set get(); std::string get_json(); - bool exists(const std::string &pubkey, const bool is_in_hex = false); + bool exists(const std::string &bin_pubkey); void init(const std::set &init_list); void update(const std::vector &additions, const std::vector &removals); void update_json_list(); diff --git a/src/usr/user_comm_session.cpp b/src/usr/user_comm_session.cpp index 3cefa820..c4d19fdc 100644 --- a/src/usr/user_comm_session.cpp +++ b/src/usr/user_comm_session.cpp @@ -20,22 +20,5 @@ namespace usr usr::handle_user_close(*this); } - /** - * Returns printable name for the session based on uniqueid (used for logging). - */ - const std::string user_comm_session::display_name() - { - if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED) - { - // User sessions use binary pubkey as unique id. So we need to convert to hex. - std::string hex; - util::bin2hex(hex, - reinterpret_cast(uniqueid.data()), - uniqueid.length()); - return hex.substr(2, 10) + (is_inbound ? ":in" : ":out"); // Skipping first 2 bytes key type prefix. - } - - return comm_session::display_name(); - } } // namespace usr \ No newline at end of file diff --git a/src/usr/user_comm_session.hpp b/src/usr/user_comm_session.hpp index baeb1e74..f78cd649 100644 --- a/src/usr/user_comm_session.hpp +++ b/src/usr/user_comm_session.hpp @@ -17,9 +17,6 @@ namespace usr int handle_connect(); int handle_message(std::string_view msg); void handle_close(); - - public: - const std::string display_name(); }; } // namespace usr diff --git a/src/usr/user_session_handler.cpp b/src/usr/user_session_handler.cpp index 72853efe..e80ccc33 100644 --- a/src/usr/user_session_handler.cpp +++ b/src/usr/user_session_handler.cpp @@ -61,7 +61,7 @@ namespace usr // Check whether this user is among authenticated users // and perform authenticated msg processing. - const auto itr = ctx.users.find(session.uniqueid); + const auto itr = ctx.users.find(session.pubkey); if (itr != ctx.users.end()) { // This is an authed user. @@ -95,7 +95,7 @@ namespace usr { // Session belongs to an authed user. if (session.challenge_status == comm::CHALLENGE_VERIFIED) - remove_user(session.uniqueid); + remove_user(session.pubkey); return 0; } diff --git a/src/usr/usr.cpp b/src/usr/usr.cpp index e77f780f..969b9c43 100644 --- a/src/usr/usr.cpp +++ b/src/usr/usr.cpp @@ -253,7 +253,8 @@ namespace usr session.mark_as_verified(); // Mark connection as a verified connection. session.issued_challenge.clear(); // Remove the stored challenge - session.uniqueid = pubkey; + session.uniqueid = pubkey_hex; + session.pubkey = pubkey; // Add the user to the global authed user list ctx.users.emplace(pubkey, usr::connected_user(session, pubkey, protocol));