Introduce pubkey/unl tracking to Peer sessions. (#181)

* Adding pubkey in binary to comm sessions and is_unl flag to peer sessions.
* Removing display_name overloads and populating pubkey in user session.
* Changing peer session lookup key from uniqueid to pubkey (binary).
This commit is contained in:
Savinda Senevirathne
2020-12-02 13:05:06 +05:30
committed by GitHub
parent 596fd2b43c
commit f3055822ed
14 changed files with 81 additions and 85 deletions

View File

@@ -235,6 +235,12 @@ namespace comm
*/
const std::string comm_session::display_name()
{
if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED)
{
// Sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix).
return uniqueid.substr(2, 10) + (is_inbound ? ":in" : ":out");
}
return uniqueid + (is_inbound ? ":in" : ":out");
}

View File

@@ -47,7 +47,8 @@ namespace comm
virtual void handle_on_verified();
public:
std::string uniqueid;
std::string uniqueid; // Verified session: Pubkey in hex format, Unverified session: IP address.
std::string pubkey; // Pubkey in binary format.
const bool is_inbound;
const std::string host_address; // Connection host address of the remote party.
std::string issued_challenge;

View File

@@ -74,7 +74,6 @@ namespace p2p
}
// Converting the binary pub key into hexadecimal string.
// This will be used as the lookup key in storing peer sessions.
std::string pubkeyhex;
util::bin2hex(pubkeyhex, reinterpret_cast<const unsigned char *>(challenge_resp.pubkey.data()), challenge_resp.pubkey.length());
@@ -94,14 +93,17 @@ namespace p2p
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
const auto iter = ctx.peer_connections.find(pubkeyhex);
const auto iter = ctx.peer_connections.find(challenge_resp.pubkey);
if (iter == ctx.peer_connections.end())
{
// Add the new connection straight away, if we haven't seen it before.
session.uniqueid.swap(pubkeyhex);
session.pubkey = challenge_resp.pubkey;
session.is_unl = unl::exists(session.pubkey);
// Mark the connection as a verified connection.
session.mark_as_verified();
ctx.peer_connections.try_emplace(session.uniqueid, &session);
// Public key in binary format will be used as the lookup key in storing peer sessions.
ctx.peer_connections.try_emplace(session.pubkey, &session);
LOG_DEBUG << "Accepted verified connection [" << session.display_name() << "]";
return 0;
@@ -120,6 +122,8 @@ namespace p2p
if (!session.known_ipport.has_value())
session.known_ipport.swap(ex_session.known_ipport);
session.uniqueid.swap(pubkeyhex);
session.pubkey = challenge_resp.pubkey;
session.is_unl = unl::exists(session.pubkey);
// Mark the connection as a verified connection.
session.mark_as_verified();
@@ -128,7 +132,8 @@ namespace p2p
// We have to keep the peer requirements of the removed session object.
// If not, requirements received prior to connection dropping will be lost.
session.need_consensus_msg_forwarding = ex_session.need_consensus_msg_forwarding;
ctx.peer_connections.try_emplace(session.uniqueid, &session); // add new session.
// Public key in binary format will be used as the lookup key in storing peer sessions.
ctx.peer_connections.try_emplace(session.pubkey, &session); // add new session.
LOG_DEBUG << "Replacing existing connection [" << ex_session.display_name() << "] with [" << session.display_name() << "]";
return 0;
@@ -152,24 +157,24 @@ namespace p2p
* @param fbuf Peer outbound message to be broadcasted.
* @param send_to_self Whether to also send the message to self (this node).
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
* @param only_to_trusted_peers Whether this broadcast is only for the trusted nodes.
* @param unl_only Whether this broadcast is only for the trusted nodes.
*/
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool only_to_trusted_peers)
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only)
{
std::string_view msg = std::string_view(
reinterpret_cast<const char *>(fbuf.GetBufferPointer()), fbuf.GetSize());
broadcast_message(msg, send_to_self, is_msg_forwarding, only_to_trusted_peers);
broadcast_message(msg, send_to_self, is_msg_forwarding, unl_only);
}
/**
* Broadcast the given message to all connected outbound peers.
* @param message Message to be forwarded.
* @param is_msg_forwarding Whether this broadcast is for message forwarding.
* @param only_to_trusted_peers Whether this broadcast is only for the trusted nodes.
* @param unl_only Whether this broadcast is only for the trusted nodes.
* @param skipping_session Session to be skipped in message forwarding(optional).
*/
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool only_to_trusted_peers, const peer_comm_session *skipping_session)
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding, const bool unl_only, const peer_comm_session *skipping_session)
{
if (send_to_self)
self::send(message);
@@ -183,7 +188,7 @@ namespace p2p
// Messages are forwarded only to the requested nodes only in the message forwarding mode.
if ((skipping_session && skipping_session == session) ||
(is_msg_forwarding && !session->need_consensus_msg_forwarding) ||
(only_to_trusted_peers && !unl::exists(session->uniqueid, true)))
(unl_only && !session->is_unl))
continue;
session->send(message);
@@ -429,4 +434,17 @@ namespace p2p
return -1;
}
/**
* Update the peer trusted status on unl list updates.
*/
void update_unl_connections()
{
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
for (const auto &[k, session] : ctx.peer_connections)
{
session->is_unl = unl::exists(session->pubkey);
}
}
} // namespace p2p

View File

@@ -133,7 +133,7 @@ namespace p2p
// Holds all the messages until they are processed by consensus.
message_collection collected_msgs;
// Set of currently connected peer connections mapped by the uniqueid of socket session.
// Set of currently connected peer connections mapped by the pubkey of socket session.
std::unordered_map<std::string, peer_comm_session *> peer_connections;
std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
@@ -151,9 +151,9 @@ namespace p2p
int resolve_peer_challenge(peer_comm_session &session, const peer_challenge_response &challenge_resp);
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool only_to_trusted_peers = false);
void broadcast_message(const flatbuffers::FlatBufferBuilder &fbuf, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false);
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool only_to_trusted_peers = false, const peer_comm_session *skipping_session = NULL);
void broadcast_message(std::string_view message, const bool send_to_self, const bool is_msg_forwarding = false, const bool unl_only = false, const peer_comm_session *skipping_session = NULL);
void send_message_to_self(const flatbuffers::FlatBufferBuilder &fbuf);
@@ -178,6 +178,8 @@ namespace p2p
void sort_known_remotes();
int16_t get_available_capacity();
void update_unl_connections();
} // namespace p2p
#endif

View File

@@ -24,18 +24,4 @@ namespace p2p
p2p::handle_peer_on_verified(*this);
}
/**
* Returns printable name for the session based on uniqueid (used for logging).
*/
const std::string peer_comm_session::display_name()
{
if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED)
{
// Peer sessions use pubkey hex as unique id (skipping first 2 bytes key type prefix).
return uniqueid.substr(2, 10) + (is_inbound ? ":in" : ":out");
}
return comm_session::display_name();
}
} // namespace p2p

View File

@@ -21,11 +21,11 @@ namespace p2p
void handle_on_verified();
public:
std::optional<conf::ip_port_prop> known_ipport; // A known ip/port information that matches with our peer list configuration.
bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding.
const std::string display_name();
std::optional<conf::ip_port_prop> known_ipport; // A known ip/port information that matches with our peer list configuration.
bool need_consensus_msg_forwarding = false; // Holds whether this node requires consensus message forwarding.
bool is_unl = false; // Whether this session's pubkey is in unl list.
};
} // namespace comm
} // namespace p2p
#endif

View File

@@ -82,16 +82,16 @@ namespace p2p
if (p2p::validate_for_peer_msg_forwarding(session, container, content_message_type))
{
// Npl messages are forwarded only to trusted peers.
const bool only_to_trusted_peers = content_message_type == p2pmsg::Message_Npl_Message;
const bool unl_only = content_message_type == p2pmsg::Message_Npl_Message;
if (session.need_consensus_msg_forwarding)
{
// Forward messages received by weakly connected nodes to other peers.
p2p::broadcast_message(message, false, false, only_to_trusted_peers, &session);
p2p::broadcast_message(message, false, false, unl_only, &session);
}
else
{
// Forward message received from other nodes to weakly connected peers.
p2p::broadcast_message(message, false, true, only_to_trusted_peers, &session);
p2p::broadcast_message(message, false, true, unl_only, &session);
}
}
@@ -193,7 +193,7 @@ namespace p2p
if (ctx.collected_msgs.state_requests.size() < p2p::STATE_REQ_LIST_CAP)
{
std::string state_request_msg(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_requests.push_back(std::make_pair(session.uniqueid, std::move(state_request_msg)));
ctx.collected_msgs.state_requests.push_back(std::make_pair(session.pubkey, std::move(state_request_msg)));
}
else
{
@@ -211,7 +211,7 @@ namespace p2p
if (ctx.collected_msgs.state_responses.size() < p2p::STATE_RES_LIST_CAP)
{
std::string response(reinterpret_cast<const char *>(content_ptr), content_size);
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.uniqueid, std::move(response)));
ctx.collected_msgs.state_responses.push_back(std::make_pair(session.pubkey, std::move(response)));
}
else
{
@@ -228,7 +228,7 @@ namespace p2p
if (ledger::sync_ctx.collected_history_requests.size() < ledger::HISTORY_REQ_LIST_CAP)
{
const p2p::history_request hr = p2pmsg::create_history_request_from_msg(*content->message_as_History_Request_Message(), container->lcl());
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.uniqueid, std::move(hr)));
ledger::sync_ctx.collected_history_requests.push_back(std::make_pair(session.pubkey, std::move(hr)));
}
else
{
@@ -360,9 +360,9 @@ namespace p2p
int handle_peer_close(const p2p::peer_comm_session &session)
{
{
// Erase the corresponding uniqueid peer connection if it's this session.
// Erase the corresponding pubkey peer connection if it's this session.
std::scoped_lock<std::mutex> lock(ctx.peer_connections_mutex);
const auto itr = ctx.peer_connections.find(session.uniqueid);
const auto itr = ctx.peer_connections.find(session.pubkey);
if (itr != ctx.peer_connections.end() && itr->second == &session)
{
ctx.peer_connections.erase(itr);

View File

@@ -100,7 +100,14 @@ namespace state_serve
break;
}
LOG_DEBUG << "Serving state request from [" << session_id.substr(0, 10) << "]";
// Session id is in binary format. Converting to hex before printing.
std::string session_id_hex;
util::bin2hex(
session_id_hex,
reinterpret_cast<const unsigned char *>(session_id.data()),
session_id.length());
LOG_DEBUG << "Serving state request from [" << session_id_hex.substr(2, 10) << "]";
const msg::fbuf::p2pmsg::Content *content = msg::fbuf::p2pmsg::GetContent(request.data());

View File

@@ -3,6 +3,7 @@
#include "conf.hpp"
#include "unl.hpp"
#include "crypto.hpp"
#include "p2p/p2p.hpp"
/**
* Manages the UNL public keys of this node.
@@ -46,28 +47,11 @@ namespace unl
/**
* Check whether the given pubkey is in the unl list.
* @param pubkey Pubkey to check for existence.
* @param is_in_hex Whether the given pubkey is in hex format.
* @param bin_pubkey Pubkey to check for existence.
* @return Return true if the given pubkey is in the unl list.
*/
bool exists(const std::string &pubkey, const bool is_in_hex)
bool exists(const std::string &bin_pubkey)
{
std::string bin_pubkey = pubkey;
if (is_in_hex)
{
// If the given pubkey is in hex format, convert the public key to binary.
std::string temp_bin_pubkey;
temp_bin_pubkey.resize(crypto::PFXD_PUBKEY_BYTES);
if (util::hex2bin(
reinterpret_cast<unsigned char *>(temp_bin_pubkey.data()),
temp_bin_pubkey.length(),
pubkey) != 0)
{
LOG_ERROR << "Error decoding hex pubkey.\n";
return false;
}
bin_pubkey.swap(temp_bin_pubkey);
}
std::shared_lock lock(unl_mutex);
return list.find(bin_pubkey) != list.end();
}
@@ -81,6 +65,7 @@ namespace unl
return;
std::unique_lock lock(unl_mutex);
const size_t initial_count = list.size();
for (const std::string &pubkey : additions)
list.emplace(pubkey);
@@ -91,7 +76,17 @@ namespace unl
update_json_list();
conf::persist_unl_update(list);
LOG_INFO << "UNL updated. Count:" << list.size();
const size_t updated_count = list.size();
// Unlock unique lock. A shared lock is applied to the list inside the update unl connection function
// because it use unl::exists function call.
lock.unlock();
// Update the is_unl flag of peer sessions.
if (initial_count != updated_count)
p2p::update_unl_connections();
LOG_INFO << "UNL updated. Count:" << updated_count;
}
void update_json_list()

View File

@@ -11,7 +11,7 @@ namespace unl
size_t count();
std::set<std::string> get();
std::string get_json();
bool exists(const std::string &pubkey, const bool is_in_hex = false);
bool exists(const std::string &bin_pubkey);
void init(const std::set<std::string> &init_list);
void update(const std::vector<std::string> &additions, const std::vector<std::string> &removals);
void update_json_list();

View File

@@ -20,22 +20,5 @@ namespace usr
usr::handle_user_close(*this);
}
/**
* Returns printable name for the session based on uniqueid (used for logging).
*/
const std::string user_comm_session::display_name()
{
if (challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED)
{
// User sessions use binary pubkey as unique id. So we need to convert to hex.
std::string hex;
util::bin2hex(hex,
reinterpret_cast<const unsigned char *>(uniqueid.data()),
uniqueid.length());
return hex.substr(2, 10) + (is_inbound ? ":in" : ":out"); // Skipping first 2 bytes key type prefix.
}
return comm_session::display_name();
}
} // namespace usr

View File

@@ -17,9 +17,6 @@ namespace usr
int handle_connect();
int handle_message(std::string_view msg);
void handle_close();
public:
const std::string display_name();
};
} // namespace usr

View File

@@ -61,7 +61,7 @@ namespace usr
// Check whether this user is among authenticated users
// and perform authenticated msg processing.
const auto itr = ctx.users.find(session.uniqueid);
const auto itr = ctx.users.find(session.pubkey);
if (itr != ctx.users.end())
{
// This is an authed user.
@@ -95,7 +95,7 @@ namespace usr
{
// Session belongs to an authed user.
if (session.challenge_status == comm::CHALLENGE_VERIFIED)
remove_user(session.uniqueid);
remove_user(session.pubkey);
return 0;
}

View File

@@ -253,7 +253,8 @@ namespace usr
session.mark_as_verified(); // Mark connection as a verified connection.
session.issued_challenge.clear(); // Remove the stored challenge
session.uniqueid = pubkey;
session.uniqueid = pubkey_hex;
session.pubkey = pubkey;
// Add the user to the global authed user list
ctx.users.emplace(pubkey, usr::connected_user(session, pubkey, protocol));