diff --git a/src/xrpld/overlay/detail/PeerImp.cpp b/src/xrpld/overlay/detail/PeerImp.cpp index d376e0c608..fbf418c9b9 100644 --- a/src/xrpld/overlay/detail/PeerImp.cpp +++ b/src/xrpld/overlay/detail/PeerImp.cpp @@ -97,6 +97,7 @@ PeerImp::PeerImp( , publicKey_(publicKey) , lastPingTime_(clock_type::now()) , creationTime_(clock_type::now()) + , lastMessageTime_(clock_type::now()) , squelch_(app_.journal("Squelch")) , usage_(consumer) , fee_{Resource::feeTrivialPeer, ""} @@ -737,14 +738,24 @@ PeerImp::onTimer(error_code const& ec) { pingAttempts_++; - if (pingAttempts_ >= maxPingNumber) + auto now = clock_type::now(); + auto lastMessageTimeCopy = + lastMessageTime_.load(std::memory_order_relaxed); + + if ((now - lastMessageTimeCopy) < peerTimerInterval) + { + JLOG(journal_.info()) << "Message received within PingPong window, " + "skipping disconnect."; + pingAttempts_ = 0; // Reset attempts + } + else if (pingAttempts_ >= maxPingNumber) { fail("Ping Timeout"); return; } else { - JLOG(journal_.info()) << "Missing PONG, sending PING, attempt " + JLOG(journal_.info()) << "Missing Pong, sending Ping, attempt " << pingAttempts_ << " of " << maxPingNumber; } } @@ -1085,6 +1096,8 @@ PeerImp::onMessageEnd( void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + auto const s = m->list_size(); if (s == 0) @@ -1143,6 +1156,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + // VFALCO NOTE I think we should drop the peer immediately if (!cluster()) { @@ -1215,6 +1230,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + // Don't allow endpoints from peers that are not known tracking or are // not using a version of the message that we support: if (tracking_.load() != Tracking::converged || m->version() != 2) @@ -1272,6 +1289,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + handleTransaction(m, true, false); } @@ -1388,6 +1407,8 @@ PeerImp::handleTransaction( void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + auto badData = [&](std::string const& msg) { fee_.update(Resource::feeInvalidData, "get_ledger " + msg); JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; @@ -1477,6 +1498,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest"; if (!ledgerReplayEnabled_) { @@ -1516,6 +1539,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (!ledgerReplayEnabled_) { fee_.update( @@ -1532,6 +1557,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest"; if (!ledgerReplayEnabled_) { @@ -1571,6 +1598,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (!ledgerReplayEnabled_) { fee_.update( @@ -1587,6 +1616,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + auto badData = [&](std::string const& msg) { fee_.update(Resource::feeInvalidData, msg); JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; @@ -1678,6 +1709,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + protocol::TMProposeSet& set = *m; auto const sig = makeSlice(set.signature()); @@ -1800,6 +1833,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + JLOG(p_journal_.trace()) << "Status: Change"; if (!m->has_networktime()) @@ -2016,6 +2051,8 @@ PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (!stringIsUint256Sized(m->hash())) { fee_.update(Resource::feeMalformedRequest, "bad hash"); @@ -2256,6 +2293,8 @@ PeerImp::onValidatorListMessage( void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + try { if (!supportsFeature(ProtocolFeature::ValidatorListPropagation)) @@ -2286,6 +2325,8 @@ void PeerImp::onMessage( std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + try { if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation)) @@ -2325,6 +2366,8 @@ PeerImp::onMessage( void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (m->validation().size() < 50) { JLOG(p_journal_.warn()) << "Validation: Too small"; @@ -2452,6 +2495,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + protocol::TMGetObjectByHash& packet = *m; JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type() @@ -2608,6 +2653,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (!txReduceRelayEnabled()) { JLOG(p_journal_.error()) @@ -2677,6 +2724,8 @@ PeerImp::handleHaveTransactions( void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + if (!txReduceRelayEnabled()) { JLOG(p_journal_.error()) @@ -2701,6 +2750,8 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { + lastMessageTime_ = clock_type::now(); + using on_message_fn = void (PeerImp::*)(std::shared_ptr const&); if (!strand_.running_in_this_thread()) diff --git a/src/xrpld/overlay/detail/PeerImp.h b/src/xrpld/overlay/detail/PeerImp.h index 4c389be365..2617fb6fd9 100644 --- a/src/xrpld/overlay/detail/PeerImp.h +++ b/src/xrpld/overlay/detail/PeerImp.h @@ -115,6 +115,7 @@ private: uint16_t pingAttempts_ = 0; clock_type::time_point lastPingTime_; clock_type::time_point const creationTime_; + std::atomic lastMessageTime_; reduce_relay::Squelch squelch_; inline static std::atomic_bool reduceRelayReady_{false};