From a89e7eb6c8bb02f796d9e9743e13c59d706db9f0 Mon Sep 17 00:00:00 2001 From: ravinsp <33562092+ravinsp@users.noreply.github.com> Date: Sun, 10 Nov 2019 08:45:20 +0530 Subject: [PATCH] Reorganized p2p context objects. --- src/cons/cons.cpp | 12 ++--- src/p2p/p2p.cpp | 79 +++++++++----------------------- src/p2p/p2p.hpp | 39 ++++++++++++---- src/p2p/peer_session_handler.cpp | 16 +++---- 4 files changed, 65 insertions(+), 81 deletions(-) diff --git a/src/cons/cons.cpp b/src/cons/cons.cpp index 44c3b478..b327c4c0 100644 --- a/src/cons/cons.cpp +++ b/src/cons/cons.cpp @@ -48,8 +48,8 @@ void consensus() // the candidate proposal set (move and append). This is to have a private working set for the consensus // and avoid threading conflicts with network incoming proposals. { - std::lock_guard lock(p2p::collected_msgs.proposals_mutex); - ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals); + std::lock_guard lock(p2p::ctx.collected_msgs.proposals_mutex); + ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::ctx.collected_msgs.proposals); } LOG_DBG << "Started stage " << std::to_string(ctx.stage); @@ -172,7 +172,7 @@ void broadcast_nonunl_proposal() // Construct NUP. p2p::nonunl_proposal nup; - std::lock_guard lock(p2p::collected_msgs.nonunl_proposals_mutex); + std::lock_guard lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); for (auto &[sid, user] : usr::ctx.users) { std::list usermsgs; @@ -198,8 +198,8 @@ void broadcast_nonunl_proposal() void verify_and_populate_candidate_user_inputs() { // Lock the list so any network activity is blocked. - std::lock_guard lock(p2p::collected_msgs.nonunl_proposals_mutex); - for (const p2p::nonunl_proposal &p : p2p::collected_msgs.nonunl_proposals) + std::lock_guard lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex); + for (const p2p::nonunl_proposal &p : p2p::ctx.collected_msgs.nonunl_proposals) { for (const auto &[pubkey, umsgs] : p.user_messages) { @@ -244,7 +244,7 @@ void verify_and_populate_candidate_user_inputs() } } } - p2p::collected_msgs.nonunl_proposals.clear(); + p2p::ctx.collected_msgs.nonunl_proposals.clear(); } p2p::proposal create_stage0_proposal() diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index 5a9d9ed4..3e77c862 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -13,47 +13,11 @@ namespace ssl = boost::asio::ssl; namespace p2p { -/** - * Holds all the messages until they are processed by consensus. - */ -message_collection collected_msgs; +// Holds global connected-peers and related objects. +connected_context ctx; -/** - * Peer connections exposing to the application - */ -std::unordered_map *> peer_connections; -std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. - -/** - * Peer session handler instance. This instance's methods will be fired for any peer socket activity. - */ -p2p::peer_session_handler global_peer_session_handler; - -/** - * IO context used by the boost library in creating sockets - */ -net::io_context ioc; - -/** - * SSL context used by the boost library in providing tls support - */ -ssl::context ctx{ssl::context::tlsv13}; - -/** - * The thread the peer server and client is running on. (not exposed out of this namespace) - * Peer connection watchdog runs on this thread. - */ -std::thread peer_watchdog_thread; - -/** - * The thread the peer listener is running on. (not exposed out of this namespace) - */ -std::thread peer_thread; - -/** - * Used to pass down the default settings to the socket session - */ -sock::session_options default_sess_opts; +// Holds objects used by socket listener. +listener_context listener_ctx; int init() { @@ -68,42 +32,41 @@ void start_peer_connections() boost::asio::ip::address address = net::ip::make_address(conf::cfg.listenip); // Setting up the message max size. Retrieve it from config - default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize; - default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm; - default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm; - default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm; - default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm; + listener_ctx.default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize; + listener_ctx.default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm; + listener_ctx.default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm; + listener_ctx.default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm; + listener_ctx.default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm; // Start listening to peers std::make_shared>( - ioc, - ctx, + listener_ctx.ioc, + listener_ctx.ssl_ctx, tcp::endpoint{address, conf::cfg.peerport}, - global_peer_session_handler, - default_sess_opts) + listener_ctx.global_peer_session_handler, + listener_ctx.default_sess_opts) ->run(); LOG_INFO << "Started listening for incoming peer connections on " << conf::cfg.listenip << ":" << conf::cfg.peerport; // Scan peers and trying to keep up the connections if drop. This action is run on a seperate thread. - peer_watchdog_thread = std::thread([&] { peer_connection_watchdog(); }); + ctx.peer_watchdog_thread = std::thread([&] { peer_connection_watchdog(); }); // Peer listener thread. - peer_thread = std::thread([&] { ioc.run(); }); + listener_ctx.listener_thread = std::thread([&] { listener_ctx.ioc.run(); }); } // Scan peer connections continually and attempt to maintain the connection if they drop void peer_connection_watchdog() { - //todo: implement exit gracefully. while (true) { for (const auto &[peerid, ipport] : conf::cfg.peers) { - if (peer_connections.find(peerid) == peer_connections.end()) + if (ctx.peer_connections.find(peerid) == ctx.peer_connections.end()) { LOG_DBG << "Trying to connect : " << peerid; - std::make_shared>(ioc, ctx, global_peer_session_handler, default_sess_opts) + std::make_shared>(listener_ctx.ioc, listener_ctx.ssl_ctx, listener_ctx.global_peer_session_handler, listener_ctx.default_sess_opts) ->run(ipport.first, ipport.second); } } @@ -117,16 +80,16 @@ void peer_connection_watchdog() */ void broadcast_message(const peer_outbound_message msg) { - if (p2p::peer_connections.size() == 0) + if (ctx.peer_connections.size() == 0) { LOG_DBG << "No peers to broadcast (not even self). Waiting until at least one peer connects."; - while (p2p::peer_connections.size() == 0) + while (ctx.peer_connections.size() == 0) util::sleep(100); } //Broadcast while locking the peer_connections. - std::lock_guard lock(p2p::peer_connections_mutex); - for (const auto &[k, session] : p2p::peer_connections) + std::lock_guard lock(ctx.peer_connections_mutex); + for (const auto &[k, session] : ctx.peer_connections) session->send(msg); } diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index 94abe853..f4471043 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -35,16 +35,37 @@ struct message_collection std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions. }; -/** - * Holds all the messages until they are processed by consensus. - */ -extern message_collection collected_msgs; +struct connected_context +{ + // Holds all the messages until they are processed by consensus. + message_collection collected_msgs; -/** - * This is used to store active peer connections mapped by the unique key of socket session - */ -extern std::unordered_map *> peer_connections; -extern std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. + // Set of currently connected outbound peer connections mapped by the uniqueid of socket session. + std::unordered_map *> peer_connections; + std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions. + + // Peer connection watchdog runs on this thread. + std::thread peer_watchdog_thread; +}; +extern connected_context ctx; + +struct listener_context +{ + // Peer session handler instance. This instance's methods will be fired for any peer socket activity. + p2p::peer_session_handler global_peer_session_handler; + + // IO context used by the boost library in creating sockets + net::io_context ioc; + + // SSL context used by the boost library in providing tls support + ssl::context ssl_ctx{ssl::context::tlsv13}; + + // The thread the peer listener is running on. + std::thread listener_thread; + + // Used to pass down the default settings to the socket session + sock::session_options default_sess_opts; +}; int init(); diff --git a/src/p2p/peer_session_handler.cpp b/src/p2p/peer_session_handler.cpp index 2eb5ee86..cf4260da 100644 --- a/src/p2p/peer_session_handler.cpp +++ b/src/p2p/peer_session_handler.cpp @@ -26,8 +26,8 @@ void peer_session_handler::on_connect(sock::socket_sessionflags[sock::SESSION_FLAG::INBOUND]) { - std::lock_guard lock(p2p::peer_connections_mutex); - peer_connections.insert(std::make_pair(session->uniqueid, session)); + std::lock_guard lock(ctx.peer_connections_mutex); + ctx.peer_connections.try_emplace(session->uniqueid, session); LOG_DBG << "Adding peer to list: " << session->uniqueid; } } @@ -70,16 +70,16 @@ void peer_session_handler::on_message(sock::socket_session lock(collected_msgs.proposals_mutex); // Insert proposal with lock. + std::lock_guard lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock. - collected_msgs.proposals.push_back( + ctx.collected_msgs.proposals.push_back( p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp())); } else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message { - std::lock_guard lock(collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. + std::lock_guard lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock. - collected_msgs.nonunl_proposals.push_back( + ctx.collected_msgs.nonunl_proposals.push_back( p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp())); } else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message @@ -99,8 +99,8 @@ void peer_session_handler::on_message(sock::socket_session *session) { { - std::lock_guard lock(p2p::peer_connections_mutex); - peer_connections.erase(session->uniqueid); + std::lock_guard lock(ctx.peer_connections_mutex); + ctx.peer_connections.erase(session->uniqueid); } LOG_DBG << "Peer disonnected: " << session->uniqueid; }