diff --git a/src/comm/comm_server.hpp b/src/comm/comm_server.hpp index 4d178960..99891fd7 100644 --- a/src/comm/comm_server.hpp +++ b/src/comm/comm_server.hpp @@ -42,10 +42,6 @@ namespace comm return 0; } - virtual void custom_connections() - { - } - private: const std::string name; const uint16_t listen_port; @@ -64,9 +60,6 @@ namespace comm // Accept any new incoming connection if available. check_for_new_connection(); - // Any connection logic in inherited classes. - custom_connections(); - std::scoped_lock lock(sessions_mutex); // Initialize any new sessions. diff --git a/src/conf.cpp b/src/conf.cpp index f5631866..9c2488ef 100644 --- a/src/conf.cpp +++ b/src/conf.cpp @@ -180,7 +180,7 @@ namespace conf cfg.user.port = 8080; cfg.user.idle_timeout = 0; - cfg.hpfs.log.log_level = "err"; + cfg.hpfs.log.log_level = "wrn"; cfg.log.max_file_count = 50; cfg.log.max_mbytes_per_file = 10; diff --git a/src/msg/fbuf/p2pmsg_conversion.cpp b/src/msg/fbuf/p2pmsg_conversion.cpp index 2d4ceeff..c7caf6ff 100644 --- a/src/msg/fbuf/p2pmsg_conversion.cpp +++ b/src/msg/fbuf/p2pmsg_conversion.cpp @@ -40,7 +40,8 @@ namespace msg::fbuf::p2pmsg const auto p2p_msg = p2pmsg::GetP2PMsg(message.data()); // Check message timestamp (ignore this for large messages). - if (message.size() <= MAX_SIZE_FOR_TIME_CHECK) + // Don't perform this check for self messages (session==NULL) or sessions which are still in challenge verification stage. + if (session && session->challenge_status == comm::CHALLENGE_STATUS::CHALLENGE_VERIFIED && message.size() <= MAX_SIZE_FOR_TIME_CHECK) { const uint64_t time_now = util::get_epoch_milliseconds(); if (p2p_msg->created_on() < (time_now - (conf::cfg.contract.roundtime * 4))) diff --git a/src/p2p/p2p.cpp b/src/p2p/p2p.cpp index ad938de8..469cd7ae 100644 --- a/src/p2p/p2p.cpp +++ b/src/p2p/p2p.cpp @@ -122,7 +122,8 @@ namespace p2p { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); ctx.server->req_known_remotes.erase(std::remove_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), - [&](const p2p::peer_properties &peer) { + [&](const p2p::peer_properties &peer) + { return peer.ip_port.port == session.known_ipport->port; })); ctx.server->known_remote_count = ctx.server->req_known_remotes.size(); @@ -422,7 +423,8 @@ namespace p2p { std::scoped_lock lock(ctx.server->req_known_remotes_mutex); - const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == ip_port; }); + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) + { return p.ip_port == ip_port; }); if (itr != ctx.server->req_known_remotes.end()) { LOG_DEBUG << "Updating peer available capacity: Host address: " << itr->ip_port.host_address << ":" << itr->ip_port.port << ", Capacity: " << std::to_string(available_capacity); @@ -463,7 +465,8 @@ namespace p2p continue; } - const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) { return p.ip_port == peer.ip_port; }); + const auto itr = std::find_if(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), [&](peer_properties &p) + { return p.ip_port == peer.ip_port; }); // If the new peer is not in the peer list then add to the req_known_remotes // Otherwise if new peer is recently updated (timestamp >) replace with the current one. @@ -496,22 +499,19 @@ namespace p2p * Sorting the known remote list according to the weight value. */ void sort_known_remotes() - { - std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), - [](const peer_properties &p1, const peer_properties &p2) { - return get_peer_weight(p1) < 0 || get_peer_weight(p1) > get_peer_weight(p2); - }); - } - - /** - * Calculate the weight value for the peer. - * @param peer Properties of the peer. - * @returns -1 if available capacity is unlimited otherwise weight value. - */ - int32_t get_peer_weight(const peer_properties &peer) { const uint64_t time_now = util::get_epoch_milliseconds(); - return peer.available_capacity >= 0 ? peer.available_capacity * 1000 * 60 / ceil(time_now - peer.timestamp) : -1; + for (peer_properties &peer : ctx.server->req_known_remotes) + { + const uint64_t time_diff = (time_now > peer.timestamp) ? (time_now - peer.timestamp) : 1; + peer.weight = peer.available_capacity >= 0 ? (peer.available_capacity * 1000 * 60) / time_diff : -1; + } + + std::sort(ctx.server->req_known_remotes.begin(), ctx.server->req_known_remotes.end(), + [](const peer_properties &p1, const peer_properties &p2) + { + return (p1.weight > p2.weight); + }); } /** diff --git a/src/p2p/p2p.hpp b/src/p2p/p2p.hpp index a55dd916..5d43fd38 100644 --- a/src/p2p/p2p.hpp +++ b/src/p2p/p2p.hpp @@ -29,6 +29,7 @@ namespace p2p conf::peer_ip_port ip_port; int16_t available_capacity = -1; uint64_t timestamp = 0; + int64_t weight = 0; }; struct sequence_hash @@ -268,8 +269,6 @@ namespace p2p void merge_peer_list(const std::vector &peers); - int32_t get_peer_weight(const peer_properties &peer); - void sort_known_remotes(); int16_t get_available_capacity(); diff --git a/src/p2p/peer_comm_server.cpp b/src/p2p/peer_comm_server.cpp index 8dceb019..241e52a1 100644 --- a/src/p2p/peer_comm_server.cpp +++ b/src/p2p/peer_comm_server.cpp @@ -23,13 +23,11 @@ namespace p2p void peer_comm_server::start_custom_jobs() { - // known_peers_thread = std::thread(&peer_comm_server::peer_monitor_loop, this); peer_managing_thread = std::thread(&peer_comm_server::peer_managing_loop, this); } void peer_comm_server::stop_custom_jobs() { - // known_peers_thread.join(); peer_managing_thread.join(); } @@ -38,43 +36,25 @@ namespace p2p return self::process_next_message(); } - void peer_comm_server::custom_connections() - { - if (custom_connection_invocations == 20 || custom_connection_invocations == -1) - { - maintain_known_connections(); - custom_connection_invocations = 0; - } - - custom_connection_invocations++; - } - - // void peer_comm_server::peer_monitor_loop() - // { - // util::mask_signal(); - - // LOG_INFO << "Started peer monitor."; - - // while (!is_shutting_down) - // { - // util::sleep(2000); - // maintain_known_connections(); - // } - - // LOG_INFO << "Stopped peer monitor."; - // } - void peer_comm_server::peer_managing_loop() { util::mask_signal(); LOG_INFO << "Started peer managing thread."; - int peer_managing_counter = 0; + uint16_t peer_managing_counter = 0; + uint16_t known_connections_counter = 0; while (!is_shutting_down) { peer_managing_counter++; + known_connections_counter++; + + if (known_connections_counter % 20 == 0) + { + maintain_known_connections(); + known_connections_counter = 0; + } // Send available peer capacity if peer max connections is configured. if (conf::cfg.mesh.max_connections != 0) @@ -202,9 +182,8 @@ namespace p2p { // Get the count of peers which are unl nodes. // One is added to session list size only if we are a unl node, to reflect the self connection. - const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session) { - return session.is_unl; - }) + + const int connected_peer_count = std::count_if(sessions.begin(), sessions.end(), [](const p2p::peer_comm_session &session) + { return session.is_unl; }) + (conf::cfg.node.is_unl ? 1 : 0); const bool current_state = connected_peer_count < (unl::count() * WEAKLY_CONNECTED_THRESHOLD); if (is_weakly_connected != current_state) diff --git a/src/p2p/peer_comm_server.hpp b/src/p2p/peer_comm_server.hpp index 1c0a4703..dea9112d 100644 --- a/src/p2p/peer_comm_server.hpp +++ b/src/p2p/peer_comm_server.hpp @@ -15,8 +15,7 @@ namespace p2p { private: int custom_connection_invocations = -1; - // std::thread known_peers_thread; // Known peers connection establishment thread. - std::thread peer_managing_thread; // Thread to request known peer list from a random peer and announce available capacity. + std::thread peer_managing_thread; // Thread to manage peer connections. uint16_t connected_status_check_counter = 0; void maintain_known_connections(); @@ -27,7 +26,6 @@ namespace p2p void start_custom_jobs(); void stop_custom_jobs(); int process_custom_messages(); - void custom_connections(); public: std::atomic known_remote_count = 0; diff --git a/test/bin/hpfs b/test/bin/hpfs index 33a4a6bd..e28edb39 100755 Binary files a/test/bin/hpfs and b/test/bin/hpfs differ diff --git a/test/vm-cluster/cluster.sh b/test/vm-cluster/cluster.sh index c445d834..111a5ef2 100755 --- a/test/vm-cluster/cluster.sh +++ b/test/vm-cluster/cluster.sh @@ -476,12 +476,12 @@ if [ $nodeid = -1 ]; then hostaddr=${hostaddrs[i]} let n=$i+1 - scp ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg & + scp -q ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg & done wait else let n=$nodeid+1 - scp ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg + scp -q ./cfg/node$n-merged.cfg $sshuser@$hostaddr:$contdir/cfg/hp.cfg fi rm -r ./cfg diff --git a/test/vm-cluster/setup-hp.sh b/test/vm-cluster/setup-hp.sh index bf34fd1a..f9f3fcfa 100755 --- a/test/vm-cluster/setup-hp.sh +++ b/test/vm-cluster/setup-hp.sh @@ -73,7 +73,7 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then sudo chmod +x $contdir/start.sh # Create stop.sh script (sending SIGINT to hpcore) - echo "pids=\$($contdir/getpid.sh hpcore) && [ ! -z \$pids ] && kill -2 \$pids" > $contdir/stop.sh + echo "pids=\$($contdir/getpid.sh hpcore) && [ ! -z \"\$pids\" ] && kill -2 \$pids" > $contdir/stop.sh sudo chmod +x $contdir/stop.sh # Create check.sh script (print pids belonging to this contract dir) @@ -81,7 +81,9 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then sudo chmod +x $contdir/check.sh # Create kill.sh script - echo "pids=\$($contdir/getpid.sh hpcore hpfs hpws) && [ ! -z \$pids ] && sudo kill \$pids" > $contdir/kill.sh + echo "pids=\$($contdir/getpid.sh hpcore hpfs hpws) && [ ! -z \"\$pids\" ] && sudo kill \$pids" > $contdir/kill.sh + echo "fusermount -u $contdir/contract_fs/mnt > /dev/null 2>&1" >> $contdir/kill.sh + echo "fusermount -u $contdir/ledger_fs/mnt > /dev/null 2>&1" >> $contdir/kill.sh sudo chmod +x $contdir/kill.sh # Create lcl.sh script diff --git a/test/vm-cluster/setup-node.sh b/test/vm-cluster/setup-node.sh index 30b141cc..954b6f3c 100755 --- a/test/vm-cluster/setup-node.sh +++ b/test/vm-cluster/setup-node.sh @@ -12,7 +12,7 @@ echo $nodeid. $hostaddr if [ $mode = "new" ] || [ $mode = "updatebin" ]; then echo "Uploading hp files to $basedir..." - scp -o StrictHostKeyChecking=no -rp hpfiles $sshuser@$hostaddr:$basedir/ + scp -q -o StrictHostKeyChecking=no -rp hpfiles $sshuser@$hostaddr:$basedir/ echo "Upload finished." fi @@ -23,5 +23,5 @@ if [ $mode = "new" ] || [ $mode = "reconfig" ]; then fi if [ $mode = "new" ] || [ $mode = "reconfig" ] || [ $mode = "updateconfig" ]; then - scp -o StrictHostKeyChecking=no $sshuser@$hostaddr:$contdir/cfg/hp.cfg ./cfg/node$nodeid.cfg + scp -q -o StrictHostKeyChecking=no $sshuser@$hostaddr:$contdir/cfg/hp.cfg ./cfg/node$nodeid.cfg fi \ No newline at end of file