Compare commits

...

6 Commits

Author SHA1 Message Date
Valentin Balaschenko
78b2dfe631 check last message time 2025-04-17 13:38:02 +01:00
Valentin Balaschenko
9ccad8eab3 clang 2025-04-17 11:27:06 +01:00
Valentin Balaschenko
db0982048a increase max ping 2025-04-17 11:20:28 +01:00
Valentin Balaschenko
cdb9f8c21f increase max ping 2025-04-17 10:54:03 +01:00
Valentin Balaschenko
4008662fd5 clang 2025-04-15 18:40:43 +01:00
Valentin Balaschenko
7463ea643f inrease ping pong threshold 2025-04-15 18:36:59 +01:00
2 changed files with 73 additions and 2 deletions

View File

@@ -58,6 +58,9 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
/** How often we PING the peer to check for latency and sendq probe */ /** How often we PING the peer to check for latency and sendq probe */
std::chrono::seconds constexpr peerTimerInterval{60}; std::chrono::seconds constexpr peerTimerInterval{60};
uint16_t constexpr maxPingNumber = 5;
} // namespace } // namespace
// TODO: Remove this exclusion once unit tests are added after the hotfix // TODO: Remove this exclusion once unit tests are added after the hotfix
@@ -94,6 +97,7 @@ PeerImp::PeerImp(
, publicKey_(publicKey) , publicKey_(publicKey)
, lastPingTime_(clock_type::now()) , lastPingTime_(clock_type::now())
, creationTime_(clock_type::now()) , creationTime_(clock_type::now())
, lastMessageTime_(clock_type::now())
, squelch_(app_.journal("Squelch")) , squelch_(app_.journal("Squelch"))
, usage_(consumer) , usage_(consumer)
, fee_{Resource::feeTrivialPeer, ""} , fee_{Resource::feeTrivialPeer, ""}
@@ -732,8 +736,32 @@ PeerImp::onTimer(error_code const& ec)
// Already waiting for PONG // Already waiting for PONG
if (lastPingSeq_) if (lastPingSeq_)
{ {
fail("Ping Timeout"); pingAttempts_++;
return;
auto now = clock_type::now();
auto lastMessageTimeCopy =
lastMessageTime_.load(std::memory_order_relaxed);
if ((now - lastMessageTimeCopy) < peerTimerInterval)
{
JLOG(journal_.info()) << "Message received within PingPong window, "
"skipping disconnect.";
pingAttempts_ = 0; // Reset attempts
}
else if (pingAttempts_ >= maxPingNumber)
{
fail("Ping Timeout");
return;
}
else
{
JLOG(journal_.info()) << "Missing Pong, sending Ping, attempt "
<< pingAttempts_ << " of " << maxPingNumber;
}
}
else
{
pingAttempts_ = 0;
} }
lastPingTime_ = clock_type::now(); lastPingTime_ = clock_type::now();
@@ -1068,6 +1096,8 @@ PeerImp::onMessageEnd(
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
{ {
lastMessageTime_ = clock_type::now();
auto const s = m->list_size(); auto const s = m->list_size();
if (s == 0) if (s == 0)
@@ -1105,6 +1135,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
if (m->seq() == lastPingSeq_) if (m->seq() == lastPingSeq_)
{ {
lastPingSeq_.reset(); lastPingSeq_.reset();
pingAttempts_ = 0;
// Update latency estimate // Update latency estimate
auto const rtt = std::chrono::round<std::chrono::milliseconds>( auto const rtt = std::chrono::round<std::chrono::milliseconds>(
@@ -1125,6 +1156,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
{ {
lastMessageTime_ = clock_type::now();
// VFALCO NOTE I think we should drop the peer immediately // VFALCO NOTE I think we should drop the peer immediately
if (!cluster()) if (!cluster())
{ {
@@ -1197,6 +1230,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
{ {
lastMessageTime_ = clock_type::now();
// Don't allow endpoints from peers that are not known tracking or are // Don't allow endpoints from peers that are not known tracking or are
// not using a version of the message that we support: // not using a version of the message that we support:
if (tracking_.load() != Tracking::converged || m->version() != 2) if (tracking_.load() != Tracking::converged || m->version() != 2)
@@ -1254,6 +1289,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
{ {
lastMessageTime_ = clock_type::now();
handleTransaction(m, true, false); handleTransaction(m, true, false);
} }
@@ -1370,6 +1407,8 @@ PeerImp::handleTransaction(
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{ {
lastMessageTime_ = clock_type::now();
auto badData = [&](std::string const& msg) { auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, "get_ledger " + msg); fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
@@ -1459,6 +1498,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
{ {
lastMessageTime_ = clock_type::now();
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest"; JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
@@ -1498,6 +1539,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
fee_.update( fee_.update(
@@ -1514,6 +1557,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
{ {
lastMessageTime_ = clock_type::now();
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest"; JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
@@ -1553,6 +1598,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (!ledgerReplayEnabled_) if (!ledgerReplayEnabled_)
{ {
fee_.update( fee_.update(
@@ -1569,6 +1616,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{ {
lastMessageTime_ = clock_type::now();
auto badData = [&](std::string const& msg) { auto badData = [&](std::string const& msg) {
fee_.update(Resource::feeInvalidData, msg); fee_.update(Resource::feeInvalidData, msg);
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
@@ -1660,6 +1709,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
{ {
lastMessageTime_ = clock_type::now();
protocol::TMProposeSet& set = *m; protocol::TMProposeSet& set = *m;
auto const sig = makeSlice(set.signature()); auto const sig = makeSlice(set.signature());
@@ -1782,6 +1833,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
{ {
lastMessageTime_ = clock_type::now();
JLOG(p_journal_.trace()) << "Status: Change"; JLOG(p_journal_.trace()) << "Status: Change";
if (!m->has_networktime()) if (!m->has_networktime())
@@ -1998,6 +2051,8 @@ PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (!stringIsUint256Sized(m->hash())) if (!stringIsUint256Sized(m->hash()))
{ {
fee_.update(Resource::feeMalformedRequest, "bad hash"); fee_.update(Resource::feeMalformedRequest, "bad hash");
@@ -2238,6 +2293,8 @@ PeerImp::onValidatorListMessage(
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
{ {
lastMessageTime_ = clock_type::now();
try try
{ {
if (!supportsFeature(ProtocolFeature::ValidatorListPropagation)) if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
@@ -2268,6 +2325,8 @@ void
PeerImp::onMessage( PeerImp::onMessage(
std::shared_ptr<protocol::TMValidatorListCollection> const& m) std::shared_ptr<protocol::TMValidatorListCollection> const& m)
{ {
lastMessageTime_ = clock_type::now();
try try
{ {
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation)) if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
@@ -2307,6 +2366,8 @@ PeerImp::onMessage(
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (m->validation().size() < 50) if (m->validation().size() < 50)
{ {
JLOG(p_journal_.warn()) << "Validation: Too small"; JLOG(p_journal_.warn()) << "Validation: Too small";
@@ -2434,6 +2495,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
{ {
lastMessageTime_ = clock_type::now();
protocol::TMGetObjectByHash& packet = *m; protocol::TMGetObjectByHash& packet = *m;
JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type() JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
@@ -2590,6 +2653,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (!txReduceRelayEnabled()) if (!txReduceRelayEnabled())
{ {
JLOG(p_journal_.error()) JLOG(p_journal_.error())
@@ -2659,6 +2724,8 @@ PeerImp::handleHaveTransactions(
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
{ {
lastMessageTime_ = clock_type::now();
if (!txReduceRelayEnabled()) if (!txReduceRelayEnabled())
{ {
JLOG(p_journal_.error()) JLOG(p_journal_.error())
@@ -2683,6 +2750,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
{ {
lastMessageTime_ = clock_type::now();
using on_message_fn = using on_message_fn =
void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&); void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
if (!strand_.running_in_this_thread()) if (!strand_.running_in_this_thread())

View File

@@ -112,8 +112,10 @@ private:
std::optional<std::chrono::milliseconds> latency_; std::optional<std::chrono::milliseconds> latency_;
std::optional<std::uint32_t> lastPingSeq_; std::optional<std::uint32_t> lastPingSeq_;
uint16_t pingAttempts_ = 0;
clock_type::time_point lastPingTime_; clock_type::time_point lastPingTime_;
clock_type::time_point const creationTime_; clock_type::time_point const creationTime_;
std::atomic<clock_type::time_point> lastMessageTime_;
reduce_relay::Squelch<UptimeClock> squelch_; reduce_relay::Squelch<UptimeClock> squelch_;
inline static std::atomic_bool reduceRelayReady_{false}; inline static std::atomic_bool reduceRelayReady_{false};