mirror of
https://github.com/XRPLF/rippled.git
synced 2025-11-15 00:25:51 +00:00
Compare commits
6 Commits
revert-551
...
vlntb/fix-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
78b2dfe631 | ||
|
|
9ccad8eab3 | ||
|
|
db0982048a | ||
|
|
cdb9f8c21f | ||
|
|
4008662fd5 | ||
|
|
7463ea643f |
@@ -58,6 +58,9 @@ std::chrono::milliseconds constexpr peerHighLatency{300};
|
|||||||
|
|
||||||
/** How often we PING the peer to check for latency and sendq probe */
|
/** How often we PING the peer to check for latency and sendq probe */
|
||||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||||
|
|
||||||
|
uint16_t constexpr maxPingNumber = 5;
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
// TODO: Remove this exclusion once unit tests are added after the hotfix
|
||||||
@@ -94,6 +97,7 @@ PeerImp::PeerImp(
|
|||||||
, publicKey_(publicKey)
|
, publicKey_(publicKey)
|
||||||
, lastPingTime_(clock_type::now())
|
, lastPingTime_(clock_type::now())
|
||||||
, creationTime_(clock_type::now())
|
, creationTime_(clock_type::now())
|
||||||
|
, lastMessageTime_(clock_type::now())
|
||||||
, squelch_(app_.journal("Squelch"))
|
, squelch_(app_.journal("Squelch"))
|
||||||
, usage_(consumer)
|
, usage_(consumer)
|
||||||
, fee_{Resource::feeTrivialPeer, ""}
|
, fee_{Resource::feeTrivialPeer, ""}
|
||||||
@@ -732,8 +736,32 @@ PeerImp::onTimer(error_code const& ec)
|
|||||||
// Already waiting for PONG
|
// Already waiting for PONG
|
||||||
if (lastPingSeq_)
|
if (lastPingSeq_)
|
||||||
{
|
{
|
||||||
fail("Ping Timeout");
|
pingAttempts_++;
|
||||||
return;
|
|
||||||
|
auto now = clock_type::now();
|
||||||
|
auto lastMessageTimeCopy =
|
||||||
|
lastMessageTime_.load(std::memory_order_relaxed);
|
||||||
|
|
||||||
|
if ((now - lastMessageTimeCopy) < peerTimerInterval)
|
||||||
|
{
|
||||||
|
JLOG(journal_.info()) << "Message received within PingPong window, "
|
||||||
|
"skipping disconnect.";
|
||||||
|
pingAttempts_ = 0; // Reset attempts
|
||||||
|
}
|
||||||
|
else if (pingAttempts_ >= maxPingNumber)
|
||||||
|
{
|
||||||
|
fail("Ping Timeout");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
JLOG(journal_.info()) << "Missing Pong, sending Ping, attempt "
|
||||||
|
<< pingAttempts_ << " of " << maxPingNumber;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
else
|
||||||
|
{
|
||||||
|
pingAttempts_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
lastPingTime_ = clock_type::now();
|
lastPingTime_ = clock_type::now();
|
||||||
@@ -1068,6 +1096,8 @@ PeerImp::onMessageEnd(
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMManifests> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
auto const s = m->list_size();
|
auto const s = m->list_size();
|
||||||
|
|
||||||
if (s == 0)
|
if (s == 0)
|
||||||
@@ -1105,6 +1135,7 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
|
|||||||
if (m->seq() == lastPingSeq_)
|
if (m->seq() == lastPingSeq_)
|
||||||
{
|
{
|
||||||
lastPingSeq_.reset();
|
lastPingSeq_.reset();
|
||||||
|
pingAttempts_ = 0;
|
||||||
|
|
||||||
// Update latency estimate
|
// Update latency estimate
|
||||||
auto const rtt = std::chrono::round<std::chrono::milliseconds>(
|
auto const rtt = std::chrono::round<std::chrono::milliseconds>(
|
||||||
@@ -1125,6 +1156,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
// VFALCO NOTE I think we should drop the peer immediately
|
// VFALCO NOTE I think we should drop the peer immediately
|
||||||
if (!cluster())
|
if (!cluster())
|
||||||
{
|
{
|
||||||
@@ -1197,6 +1230,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMCluster> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
// Don't allow endpoints from peers that are not known tracking or are
|
// Don't allow endpoints from peers that are not known tracking or are
|
||||||
// not using a version of the message that we support:
|
// not using a version of the message that we support:
|
||||||
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
if (tracking_.load() != Tracking::converged || m->version() != 2)
|
||||||
@@ -1254,6 +1289,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMEndpoints> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
handleTransaction(m, true, false);
|
handleTransaction(m, true, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1370,6 +1407,8 @@ PeerImp::handleTransaction(
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
auto badData = [&](std::string const& msg) {
|
auto badData = [&](std::string const& msg) {
|
||||||
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
|
fee_.update(Resource::feeInvalidData, "get_ledger " + msg);
|
||||||
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
|
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
|
||||||
@@ -1459,6 +1498,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
JLOG(p_journal_.trace()) << "onMessage, TMProofPathRequest";
|
||||||
if (!ledgerReplayEnabled_)
|
if (!ledgerReplayEnabled_)
|
||||||
{
|
{
|
||||||
@@ -1498,6 +1539,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathRequest> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (!ledgerReplayEnabled_)
|
if (!ledgerReplayEnabled_)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
@@ -1514,6 +1557,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProofPathResponse> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
JLOG(p_journal_.trace()) << "onMessage, TMReplayDeltaRequest";
|
||||||
if (!ledgerReplayEnabled_)
|
if (!ledgerReplayEnabled_)
|
||||||
{
|
{
|
||||||
@@ -1553,6 +1598,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaRequest> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (!ledgerReplayEnabled_)
|
if (!ledgerReplayEnabled_)
|
||||||
{
|
{
|
||||||
fee_.update(
|
fee_.update(
|
||||||
@@ -1569,6 +1616,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
auto badData = [&](std::string const& msg) {
|
auto badData = [&](std::string const& msg) {
|
||||||
fee_.update(Resource::feeInvalidData, msg);
|
fee_.update(Resource::feeInvalidData, msg);
|
||||||
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
|
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
|
||||||
@@ -1660,6 +1709,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
protocol::TMProposeSet& set = *m;
|
protocol::TMProposeSet& set = *m;
|
||||||
|
|
||||||
auto const sig = makeSlice(set.signature());
|
auto const sig = makeSlice(set.signature());
|
||||||
@@ -1782,6 +1833,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMProposeSet> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMStatusChange> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
JLOG(p_journal_.trace()) << "Status: Change";
|
JLOG(p_journal_.trace()) << "Status: Change";
|
||||||
|
|
||||||
if (!m->has_networktime())
|
if (!m->has_networktime())
|
||||||
@@ -1998,6 +2051,8 @@ PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (!stringIsUint256Sized(m->hash()))
|
if (!stringIsUint256Sized(m->hash()))
|
||||||
{
|
{
|
||||||
fee_.update(Resource::feeMalformedRequest, "bad hash");
|
fee_.update(Resource::feeMalformedRequest, "bad hash");
|
||||||
@@ -2238,6 +2293,8 @@ PeerImp::onValidatorListMessage(
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMValidatorList> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
|
if (!supportsFeature(ProtocolFeature::ValidatorListPropagation))
|
||||||
@@ -2268,6 +2325,8 @@ void
|
|||||||
PeerImp::onMessage(
|
PeerImp::onMessage(
|
||||||
std::shared_ptr<protocol::TMValidatorListCollection> const& m)
|
std::shared_ptr<protocol::TMValidatorListCollection> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
try
|
try
|
||||||
{
|
{
|
||||||
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
if (!supportsFeature(ProtocolFeature::ValidatorList2Propagation))
|
||||||
@@ -2307,6 +2366,8 @@ PeerImp::onMessage(
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (m->validation().size() < 50)
|
if (m->validation().size() < 50)
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.warn()) << "Validation: Too small";
|
JLOG(p_journal_.warn()) << "Validation: Too small";
|
||||||
@@ -2434,6 +2495,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMValidation> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
protocol::TMGetObjectByHash& packet = *m;
|
protocol::TMGetObjectByHash& packet = *m;
|
||||||
|
|
||||||
JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
|
JLOG(p_journal_.trace()) << "received TMGetObjectByHash " << packet.type()
|
||||||
@@ -2590,6 +2653,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetObjectByHash> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactions> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (!txReduceRelayEnabled())
|
if (!txReduceRelayEnabled())
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.error())
|
JLOG(p_journal_.error())
|
||||||
@@ -2659,6 +2724,8 @@ PeerImp::handleHaveTransactions(
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
if (!txReduceRelayEnabled())
|
if (!txReduceRelayEnabled())
|
||||||
{
|
{
|
||||||
JLOG(p_journal_.error())
|
JLOG(p_journal_.error())
|
||||||
@@ -2683,6 +2750,8 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransactions> const& m)
|
|||||||
void
|
void
|
||||||
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
PeerImp::onMessage(std::shared_ptr<protocol::TMSquelch> const& m)
|
||||||
{
|
{
|
||||||
|
lastMessageTime_ = clock_type::now();
|
||||||
|
|
||||||
using on_message_fn =
|
using on_message_fn =
|
||||||
void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
|
void (PeerImp::*)(std::shared_ptr<protocol::TMSquelch> const&);
|
||||||
if (!strand_.running_in_this_thread())
|
if (!strand_.running_in_this_thread())
|
||||||
|
|||||||
@@ -112,8 +112,10 @@ private:
|
|||||||
|
|
||||||
std::optional<std::chrono::milliseconds> latency_;
|
std::optional<std::chrono::milliseconds> latency_;
|
||||||
std::optional<std::uint32_t> lastPingSeq_;
|
std::optional<std::uint32_t> lastPingSeq_;
|
||||||
|
uint16_t pingAttempts_ = 0;
|
||||||
clock_type::time_point lastPingTime_;
|
clock_type::time_point lastPingTime_;
|
||||||
clock_type::time_point const creationTime_;
|
clock_type::time_point const creationTime_;
|
||||||
|
std::atomic<clock_type::time_point> lastMessageTime_;
|
||||||
|
|
||||||
reduce_relay::Squelch<UptimeClock> squelch_;
|
reduce_relay::Squelch<UptimeClock> squelch_;
|
||||||
inline static std::atomic_bool reduceRelayReady_{false};
|
inline static std::atomic_bool reduceRelayReady_{false};
|
||||||
|
|||||||
Reference in New Issue
Block a user