mirror of
https://github.com/EvernodeXRPL/hpcore.git
synced 2026-04-29 15:37:59 +00:00
Reorganized p2p context objects.
This commit is contained in:
@@ -48,8 +48,8 @@ void consensus()
|
||||
// the candidate proposal set (move and append). This is to have a private working set for the consensus
|
||||
// and avoid threading conflicts with network incoming proposals.
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(p2p::collected_msgs.proposals_mutex);
|
||||
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::collected_msgs.proposals);
|
||||
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.proposals_mutex);
|
||||
ctx.candidate_proposals.splice(ctx.candidate_proposals.end(), p2p::ctx.collected_msgs.proposals);
|
||||
}
|
||||
|
||||
LOG_DBG << "Started stage " << std::to_string(ctx.stage);
|
||||
@@ -172,7 +172,7 @@ void broadcast_nonunl_proposal()
|
||||
// Construct NUP.
|
||||
p2p::nonunl_proposal nup;
|
||||
|
||||
std::lock_guard<std::mutex> lock(p2p::collected_msgs.nonunl_proposals_mutex);
|
||||
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex);
|
||||
for (auto &[sid, user] : usr::ctx.users)
|
||||
{
|
||||
std::list<usr::user_submitted_message> usermsgs;
|
||||
@@ -198,8 +198,8 @@ void broadcast_nonunl_proposal()
|
||||
void verify_and_populate_candidate_user_inputs()
|
||||
{
|
||||
// Lock the list so any network activity is blocked.
|
||||
std::lock_guard<std::mutex> lock(p2p::collected_msgs.nonunl_proposals_mutex);
|
||||
for (const p2p::nonunl_proposal &p : p2p::collected_msgs.nonunl_proposals)
|
||||
std::lock_guard<std::mutex> lock(p2p::ctx.collected_msgs.nonunl_proposals_mutex);
|
||||
for (const p2p::nonunl_proposal &p : p2p::ctx.collected_msgs.nonunl_proposals)
|
||||
{
|
||||
for (const auto &[pubkey, umsgs] : p.user_messages)
|
||||
{
|
||||
@@ -244,7 +244,7 @@ void verify_and_populate_candidate_user_inputs()
|
||||
}
|
||||
}
|
||||
}
|
||||
p2p::collected_msgs.nonunl_proposals.clear();
|
||||
p2p::ctx.collected_msgs.nonunl_proposals.clear();
|
||||
}
|
||||
|
||||
p2p::proposal create_stage0_proposal()
|
||||
|
||||
@@ -13,47 +13,11 @@ namespace ssl = boost::asio::ssl;
|
||||
namespace p2p
|
||||
{
|
||||
|
||||
/**
|
||||
* Holds all the messages until they are processed by consensus.
|
||||
*/
|
||||
message_collection collected_msgs;
|
||||
// Holds global connected-peers and related objects.
|
||||
connected_context ctx;
|
||||
|
||||
/**
|
||||
* Peer connections exposing to the application
|
||||
*/
|
||||
std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
|
||||
std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
|
||||
|
||||
/**
|
||||
* Peer session handler instance. This instance's methods will be fired for any peer socket activity.
|
||||
*/
|
||||
p2p::peer_session_handler global_peer_session_handler;
|
||||
|
||||
/**
|
||||
* IO context used by the boost library in creating sockets
|
||||
*/
|
||||
net::io_context ioc;
|
||||
|
||||
/**
|
||||
* SSL context used by the boost library in providing tls support
|
||||
*/
|
||||
ssl::context ctx{ssl::context::tlsv13};
|
||||
|
||||
/**
|
||||
* The thread the peer server and client is running on. (not exposed out of this namespace)
|
||||
* Peer connection watchdog runs on this thread.
|
||||
*/
|
||||
std::thread peer_watchdog_thread;
|
||||
|
||||
/**
|
||||
* The thread the peer listener is running on. (not exposed out of this namespace)
|
||||
*/
|
||||
std::thread peer_thread;
|
||||
|
||||
/**
|
||||
* Used to pass down the default settings to the socket session
|
||||
*/
|
||||
sock::session_options default_sess_opts;
|
||||
// Holds objects used by socket listener.
|
||||
listener_context listener_ctx;
|
||||
|
||||
int init()
|
||||
{
|
||||
@@ -68,42 +32,41 @@ void start_peer_connections()
|
||||
boost::asio::ip::address address = net::ip::make_address(conf::cfg.listenip);
|
||||
|
||||
// Setting up the message max size. Retrieve it from config
|
||||
default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize;
|
||||
default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm;
|
||||
default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm;
|
||||
default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm;
|
||||
default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm;
|
||||
listener_ctx.default_sess_opts.max_socket_read_len = conf::cfg.peermaxsize;
|
||||
listener_ctx.default_sess_opts.max_rawbytes_per_minute = conf::cfg.peermaxcpm;
|
||||
listener_ctx.default_sess_opts.max_dupmsgs_per_minute = conf::cfg.peermaxdupmpm;
|
||||
listener_ctx.default_sess_opts.max_badmsgs_per_minute = conf::cfg.peermaxbadmpm;
|
||||
listener_ctx.default_sess_opts.max_badsigmsgs_per_minute = conf::cfg.peermaxbadsigpm;
|
||||
|
||||
// Start listening to peers
|
||||
std::make_shared<sock::socket_server<peer_outbound_message>>(
|
||||
ioc,
|
||||
ctx,
|
||||
listener_ctx.ioc,
|
||||
listener_ctx.ssl_ctx,
|
||||
tcp::endpoint{address, conf::cfg.peerport},
|
||||
global_peer_session_handler,
|
||||
default_sess_opts)
|
||||
listener_ctx.global_peer_session_handler,
|
||||
listener_ctx.default_sess_opts)
|
||||
->run();
|
||||
|
||||
LOG_INFO << "Started listening for incoming peer connections on " << conf::cfg.listenip << ":" << conf::cfg.peerport;
|
||||
|
||||
// Scan peers and trying to keep up the connections if drop. This action is run on a seperate thread.
|
||||
peer_watchdog_thread = std::thread([&] { peer_connection_watchdog(); });
|
||||
ctx.peer_watchdog_thread = std::thread([&] { peer_connection_watchdog(); });
|
||||
|
||||
// Peer listener thread.
|
||||
peer_thread = std::thread([&] { ioc.run(); });
|
||||
listener_ctx.listener_thread = std::thread([&] { listener_ctx.ioc.run(); });
|
||||
}
|
||||
|
||||
// Scan peer connections continually and attempt to maintain the connection if they drop
|
||||
void peer_connection_watchdog()
|
||||
{
|
||||
//todo: implement exit gracefully.
|
||||
while (true)
|
||||
{
|
||||
for (const auto &[peerid, ipport] : conf::cfg.peers)
|
||||
{
|
||||
if (peer_connections.find(peerid) == peer_connections.end())
|
||||
if (ctx.peer_connections.find(peerid) == ctx.peer_connections.end())
|
||||
{
|
||||
LOG_DBG << "Trying to connect : " << peerid;
|
||||
std::make_shared<sock::socket_client<peer_outbound_message>>(ioc, ctx, global_peer_session_handler, default_sess_opts)
|
||||
std::make_shared<sock::socket_client<peer_outbound_message>>(listener_ctx.ioc, listener_ctx.ssl_ctx, listener_ctx.global_peer_session_handler, listener_ctx.default_sess_opts)
|
||||
->run(ipport.first, ipport.second);
|
||||
}
|
||||
}
|
||||
@@ -117,16 +80,16 @@ void peer_connection_watchdog()
|
||||
*/
|
||||
void broadcast_message(const peer_outbound_message msg)
|
||||
{
|
||||
if (p2p::peer_connections.size() == 0)
|
||||
if (ctx.peer_connections.size() == 0)
|
||||
{
|
||||
LOG_DBG << "No peers to broadcast (not even self). Waiting until at least one peer connects.";
|
||||
while (p2p::peer_connections.size() == 0)
|
||||
while (ctx.peer_connections.size() == 0)
|
||||
util::sleep(100);
|
||||
}
|
||||
|
||||
//Broadcast while locking the peer_connections.
|
||||
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
|
||||
for (const auto &[k, session] : p2p::peer_connections)
|
||||
std::lock_guard<std::mutex> lock(ctx.peer_connections_mutex);
|
||||
for (const auto &[k, session] : ctx.peer_connections)
|
||||
session->send(msg);
|
||||
}
|
||||
|
||||
|
||||
@@ -35,16 +35,37 @@ struct message_collection
|
||||
std::mutex nonunl_proposals_mutex; // Mutex for non-unl proposals access race conditions.
|
||||
};
|
||||
|
||||
/**
|
||||
* Holds all the messages until they are processed by consensus.
|
||||
*/
|
||||
extern message_collection collected_msgs;
|
||||
struct connected_context
|
||||
{
|
||||
// Holds all the messages until they are processed by consensus.
|
||||
message_collection collected_msgs;
|
||||
|
||||
/**
|
||||
* This is used to store active peer connections mapped by the unique key of socket session
|
||||
*/
|
||||
extern std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
|
||||
extern std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
|
||||
// Set of currently connected outbound peer connections mapped by the uniqueid of socket session.
|
||||
std::unordered_map<std::string, sock::socket_session<peer_outbound_message> *> peer_connections;
|
||||
std::mutex peer_connections_mutex; // Mutex for peer connections access race conditions.
|
||||
|
||||
// Peer connection watchdog runs on this thread.
|
||||
std::thread peer_watchdog_thread;
|
||||
};
|
||||
extern connected_context ctx;
|
||||
|
||||
struct listener_context
|
||||
{
|
||||
// Peer session handler instance. This instance's methods will be fired for any peer socket activity.
|
||||
p2p::peer_session_handler global_peer_session_handler;
|
||||
|
||||
// IO context used by the boost library in creating sockets
|
||||
net::io_context ioc;
|
||||
|
||||
// SSL context used by the boost library in providing tls support
|
||||
ssl::context ssl_ctx{ssl::context::tlsv13};
|
||||
|
||||
// The thread the peer listener is running on.
|
||||
std::thread listener_thread;
|
||||
|
||||
// Used to pass down the default settings to the socket session
|
||||
sock::session_options default_sess_opts;
|
||||
};
|
||||
|
||||
int init();
|
||||
|
||||
|
||||
@@ -26,8 +26,8 @@ void peer_session_handler::on_connect(sock::socket_session<peer_outbound_message
|
||||
{
|
||||
if (!session->flags[sock::SESSION_FLAG::INBOUND])
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
|
||||
peer_connections.insert(std::make_pair(session->uniqueid, session));
|
||||
std::lock_guard<std::mutex> lock(ctx.peer_connections_mutex);
|
||||
ctx.peer_connections.try_emplace(session->uniqueid, session);
|
||||
LOG_DBG << "Adding peer to list: " << session->uniqueid;
|
||||
}
|
||||
}
|
||||
@@ -70,16 +70,16 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
|
||||
return;
|
||||
}
|
||||
|
||||
std::lock_guard<std::mutex> lock(collected_msgs.proposals_mutex); // Insert proposal with lock.
|
||||
std::lock_guard<std::mutex> lock(ctx.collected_msgs.proposals_mutex); // Insert proposal with lock.
|
||||
|
||||
collected_msgs.proposals.push_back(
|
||||
ctx.collected_msgs.proposals.push_back(
|
||||
p2pmsg::create_proposal_from_msg(*content->message_as_Proposal_Message(), container->pubkey(), container->timestamp()));
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_NonUnl_Proposal_Message) //message is a non-unl proposal message
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock.
|
||||
std::lock_guard<std::mutex> lock(ctx.collected_msgs.nonunl_proposals_mutex); // Insert non-unl proposal with lock.
|
||||
|
||||
collected_msgs.nonunl_proposals.push_back(
|
||||
ctx.collected_msgs.nonunl_proposals.push_back(
|
||||
p2pmsg::create_nonunl_proposal_from_msg(*content->message_as_NonUnl_Proposal_Message(), container->timestamp()));
|
||||
}
|
||||
else if (content_message_type == p2pmsg::Message_Npl_Message) //message is a NPL message
|
||||
@@ -99,8 +99,8 @@ void peer_session_handler::on_message(sock::socket_session<peer_outbound_message
|
||||
void peer_session_handler::on_close(sock::socket_session<peer_outbound_message> *session)
|
||||
{
|
||||
{
|
||||
std::lock_guard<std::mutex> lock(p2p::peer_connections_mutex);
|
||||
peer_connections.erase(session->uniqueid);
|
||||
std::lock_guard<std::mutex> lock(ctx.peer_connections_mutex);
|
||||
ctx.peer_connections.erase(session->uniqueid);
|
||||
}
|
||||
LOG_DBG << "Peer disonnected: " << session->uniqueid;
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user