Improvements for larger clsuters. (#316)

- Cluster script improvements.
- Skipped msg too old check unverified peer sessions.
- Moved known peer maintenance to peer loop.
This commit is contained in:
Ravin Perera
2021-05-30 12:32:51 +05:30
committed by GitHub
parent 2f0a6673d5
commit 07962bc3d5
11 changed files with 41 additions and 69 deletions

View File

@@ -42,10 +42,6 @@ namespace comm
return 0;
}
virtual void custom_connections()
{
}
private:
const std::string name;
const uint16_t listen_port;
@@ -64,9 +60,6 @@ namespace comm
// Accept any new incoming connection if available.
check_for_new_connection();
// Any connection logic in inherited classes.
custom_connections();
std::scoped_lock<std::mutex> lock(sessions_mutex);
// Initialize any new sessions.

View File

@@ -180,7 +180,7 @@ namespace conf
cfg.user.port = 8080;
cfg.user.idle_timeout = 0;
cfg.hpfs.log.log_level = "err";
cfg.hpfs.log.log_level = "wrn";
cfg.log.max_file_count = 50;
cfg.log.max_mbytes_per_file = 10;

View File

@@ -40,7 +40,8 @@ namespace msg::fbuf::p2pmsg
const auto p2p_msg = p2pmsg::GetP2PMsg(message.data());
// Check message timestamp (ignore this for large messages).
if (message.size() <= MAX_SIZE_FOR_TIME_CHECK)
// Don't perform this check for self messages (session==NULL) or sessions which are still in challenge verification stage.
if (session && session->challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED && message.size() <= MAX_SIZE_FOR_TIME_CHECK)
{
const uint64_t time_now = util::get_epoch_milliseconds();
if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 4)))

View File

@@ -122,7 +122,8 @@ namespace p2p
{
std::scoped_lock lock(ctx.server->req_known_remotes_mutex);
ctx.server->req_known_remotes.erase(std::remove_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(),
[&](const p2p::peer_properties &peer) {
[&](const p2p::peer_properties &peer)
{
return peer.ip_port.port == session.known_ipport->port;
}));
ctx.server->known_remote_count = ctx.server->req_known_remotes.size();
@@ -422,7 +423,8 @@ namespace p2p
{
std::scoped_lock<std::mutex> lock(ctx.server->req_known_remotes_mutex);
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == ip_port; });
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p)
{ return p.ip_port == ip_port; });
if (itr != ctx.server->req_known_remotes.end())
{
LOG_DEBUG << "Updating peer available capacity: Host address: " << itr->ip_port.host_address << ":" << itr->ip_port.port << ", Capacity: " << std::to_string(available_capacity);
@@ -463,7 +465,8 @@ namespace p2p
continue;
}
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == peer.ip_port; });
const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p)
{ return p.ip_port == peer.ip_port; });
// If the new peer is not in the peer list then add to the req_known_remotes
// Otherwise if new peer is recently updated (timestamp >) replace with the current one.
@@ -496,22 +499,19 @@ namespace p2p
* Sorting the known remote list according to the weight value.
*/
void sort_known_remotes()
{
std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(),
[](const peer_properties &p1, const peer_properties &p2) {
return get_peer_weight(p1) < 0 || get_peer_weight(p1) > get_peer_weight(p2);
});
}
/**
* Calculate the weight value for the peer.
* @param peer Properties of the peer.
* @returns -1 if available capacity is unlimited otherwise weight value.
*/
int32_t get_peer_weight(const peer_properties &peer)
{
const uint64_t time_now = util::get_epoch_milliseconds();
return peer.available_capacity >= 0 ? peer.available_capacity * 1000 * 60 / ceil(time_now - peer.timestamp) : -1;
for (peer_properties &peer : ctx.server->req_known_remotes)
{
const uint64_t time_diff = (time_now > peer.timestamp) ? (time_now - peer.timestamp) : 1;
peer.weight = peer.available_capacity >= 0 ? (peer.available_capacity * 1000 * 60) / time_diff : -1;
}
std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(),
[](const peer_properties &p1, const peer_properties &p2)
{
return (p1.weight > p2.weight);
});
}
/**

View File

@@ -29,6 +29,7 @@ namespace p2p
conf::peer_ip_port ip_port;
int16_t available_capacity = -1;
uint64_t timestamp = 0;
int64_t weight = 0;
};
struct sequence_hash
@@ -268,8 +269,6 @@ namespace p2p
void merge_peer_list(const std::vector<peer_properties> &peers);
int32_t get_peer_weight(const peer_properties &peer);
void sort_known_remotes();
int16_t get_available_capacity();

View File

@@ -23,13 +23,11 @@ namespace p2p
void peer_comm_server::start_custom_jobs()
{
// known_peers_thread = std::thread(&peer_comm_server::peer_monitor_loop, this);
peer_managing_thread = std::thread(&peer_comm_server::peer_managing_loop, this);
}
void peer_comm_server::stop_custom_jobs()
{
// known_peers_thread.join();
peer_managing_thread.join();
}
@@ -38,43 +36,25 @@ namespace p2p
return self::process_next_message();
}
void peer_comm_server::custom_connections()
{
if (custom_connection_invocations == 20 || custom_connection_invocations == -1)
{
maintain_known_connections();
custom_connection_invocations = 0;
}
custom_connection_invocations++;
}
// void peer_comm_server::peer_monitor_loop()
// {
// util::mask_signal();
// LOG_INFO << "Started peer monitor.";
// while (!is_shutting_down)
// {
// util::sleep(2000);
// maintain_known_connections();
// }
// LOG_INFO << "Stopped peer monitor.";
// }
void peer_comm_server::peer_managing_loop()
{
util::mask_signal();
LOG_INFO << "Started peer managing thread.";
int peer_managing_counter = 0;
uint16_t peer_managing_counter = 0;
uint16_t known_connections_counter = 0;
while (!is_shutting_down)
{
peer_managing_counter++;
known_connections_counter++;
if (known_connections_counter % 20 == 0)
{
maintain_known_connections();
known_connections_counter = 0;
}
// Send available peer capacity if peer max connections is configured.
if (conf::cfg.mesh.max_connections != 0)
@@ -202,9 +182,8 @@ namespace p2p
{
// Get the count of peers which are unl nodes.
// One is added to session list size only if we are a unl node, to reflect the self connection.
const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session) {
return session.is_unl;
}) +
const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session)
{ return session.is_unl; }) +
(conf::cfg.node.is_unl ? 1 : 0);
const bool current_state = connected_peer_count < (unl::count() * WEAKLY_CONNECTED_THRESHOLD);
if (is_weakly_connected != current_state)

View File

@@ -15,8 +15,7 @@ namespace p2p
{
private:
int custom_connection_invocations = -1;
// std::thread known_peers_thread; // Known peers connection establishment thread.
std::thread peer_managing_thread; // Thread to request known peer list from a random peer and announce available capacity.
std::thread peer_managing_thread; // Thread to manage peer connections.
uint16_t connected_status_check_counter = 0;
void maintain_known_connections();
@@ -27,7 +26,6 @@ namespace p2p
void start_custom_jobs();
void stop_custom_jobs();
int process_custom_messages();
void custom_connections();
public:
std::atomic<uint16_t> known_remote_count = 0;

Binary file not shown.

View File

@@ -476,12 +476,12 @@ if [ $nodeid = -1 ]; then
hostaddr=${hostaddrs[i]}
let n=$i+1
scp ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg &
scp -q ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg &
done
wait
else
let n=$nodeid+1
scp ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg
scp -q ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg
fi
rm -r ./cfg

View File

@@ -73,7 +73,7 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then
sudo chmod +x $contdir/start.sh
# Create stop.sh script (sending SIGINT to hpcore)
echo "pids=\$($contdir/getpid.sh hpcore) && [ ! -z \$pids ] && kill -2 \$pids" > $contdir/stop.sh
echo "pids=\$($contdir/getpid.sh hpcore) && [ ! -z \"\$pids\" ] && kill -2 \$pids" > $contdir/stop.sh
sudo chmod +x $contdir/stop.sh
# Create check.sh script (print pids belonging to this contract dir)
@@ -81,7 +81,9 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then
sudo chmod +x $contdir/check.sh
# Create kill.sh script
echo "pids=\$($contdir/getpid.sh hpcore hpfs hpws) && [ ! -z \$pids ] && sudo kill \$pids" > $contdir/kill.sh
echo "pids=\$($contdir/getpid.sh hpcore hpfs hpws) && [ ! -z \"\$pids\" ] && sudo kill \$pids" > $contdir/kill.sh
echo "fusermount -u $contdir/contract_fs/mnt > /dev/null 2>&1" >> $contdir/kill.sh
echo "fusermount -u $contdir/ledger_fs/mnt > /dev/null 2>&1" >> $contdir/kill.sh
sudo chmod +x $contdir/kill.sh
# Create lcl.sh script

View File

@@ -12,7 +12,7 @@ echo $nodeid. $hostaddr
if [ $mode = "new" ] || [ $mode = "updatebin" ]; then
echo "Uploading hp files to $basedir..."
scp -o StrictHostKeyChecking=no -rp hpfiles $sshuser@$hostaddr:$basedir/
scp -q -o StrictHostKeyChecking=no -rp hpfiles $sshuser@$hostaddr:$basedir/
echo "Upload finished."
fi
@@ -23,5 +23,5 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then
fi
if [ $mode = "new" ] || [ $mode = "reconfig" ] || [ $mode = "updateconfig" ]; then
scp -o StrictHostKeyChecking=no $sshuser@$hostaddr:$contdir/cfg/hp.cfg ./cfg/node$nodeid.cfg
scp -q -o StrictHostKeyChecking=no $sshuser@$hostaddr:$contdir/cfg/hp.cfg ./cfg/node$nodeid.cfg
fi