mirror of
https://github.com/XRPLF/rippled.git
synced 2025-12-06 17:27:55 +00:00
Reduce PING frequency and simplify logic:
The existing code issues a PING to each peer every 8 seconds. While frequent PINGs allow us to estimate a peer's latency with a high degree of accuracy, this "inter-server polka dance" is inefficient and not useful. This commit, if merged, reduces the PING frequency to once every 60 seconds. Additionally, this commit simplifies the PING handling logic and merges the code used to check and disconnect peers which fail to track the network directly into the timer callback.
This commit is contained in:
@@ -126,11 +126,6 @@ public:
|
||||
virtual void
|
||||
checkTracking(std::uint32_t index) = 0;
|
||||
|
||||
/** Calls the check function on each peer
|
||||
*/
|
||||
virtual void
|
||||
check() = 0;
|
||||
|
||||
/** Returns the peer with the matching short id, or null. */
|
||||
virtual std::shared_ptr<Peer>
|
||||
findPeerByShortID(Peer::id_t const& id) const = 0;
|
||||
|
||||
@@ -100,9 +100,6 @@ OverlayImpl::Timer::on_timer(error_code ec)
|
||||
overlay_.sendEndpoints();
|
||||
overlay_.autoConnect();
|
||||
|
||||
if ((++overlay_.timer_count_ % Tuning::checkSeconds) == 0)
|
||||
overlay_.check();
|
||||
|
||||
if ((overlay_.timer_count_ % Tuning::checkIdlePeers) == 0)
|
||||
overlay_.deleteIdlePeers();
|
||||
|
||||
@@ -1187,12 +1184,6 @@ OverlayImpl::checkTracking(std::uint32_t index)
|
||||
[index](std::shared_ptr<PeerImp>&& sp) { sp->checkTracking(index); });
|
||||
}
|
||||
|
||||
void
|
||||
OverlayImpl::check()
|
||||
{
|
||||
for_each([](std::shared_ptr<PeerImp>&& sp) { sp->check(); });
|
||||
}
|
||||
|
||||
std::shared_ptr<Peer>
|
||||
OverlayImpl::findPeerByShortID(Peer::id_t const& id) const
|
||||
{
|
||||
|
||||
@@ -192,9 +192,6 @@ public:
|
||||
PeerSequence
|
||||
getActivePeers() const override;
|
||||
|
||||
void
|
||||
check() override;
|
||||
|
||||
void checkTracking(std::uint32_t) override;
|
||||
|
||||
std::shared_ptr<Peer>
|
||||
|
||||
@@ -55,6 +55,14 @@ using namespace std::chrono_literals;
|
||||
|
||||
namespace ripple {
|
||||
|
||||
namespace {
|
||||
/** The threshold above which we treat a peer connection as high latency */
|
||||
std::chrono::milliseconds constexpr peerHighLatency{300};
|
||||
|
||||
/** How often we PING the peer to check for latency and sendq probe */
|
||||
std::chrono::seconds constexpr peerTimerInterval{60};
|
||||
} // namespace
|
||||
|
||||
PeerImp::PeerImp(
|
||||
Application& app,
|
||||
id_t id,
|
||||
@@ -79,11 +87,12 @@ PeerImp::PeerImp(
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, m_inbound(true)
|
||||
, inbound_(true)
|
||||
, protocol_(protocol)
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, usage_(consumer)
|
||||
, fee_(Resource::feeLightPeer)
|
||||
@@ -169,7 +178,7 @@ PeerImp::run()
|
||||
previousLedgerHash_ = *previous;
|
||||
}
|
||||
|
||||
if (m_inbound)
|
||||
if (inbound_)
|
||||
doAccept();
|
||||
else
|
||||
doProtocolStart();
|
||||
@@ -194,7 +203,7 @@ PeerImp::stop()
|
||||
// at a higher level, but inbound connections are more numerous and
|
||||
// uncontrolled so to prevent log flooding the severity is reduced.
|
||||
//
|
||||
if (m_inbound)
|
||||
if (inbound_)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Stop";
|
||||
}
|
||||
@@ -295,7 +304,7 @@ PeerImp::cluster() const
|
||||
std::string
|
||||
PeerImp::getVersion() const
|
||||
{
|
||||
if (m_inbound)
|
||||
if (inbound_)
|
||||
return headers_["User-Agent"].to_string();
|
||||
return headers_["Server"].to_string();
|
||||
}
|
||||
@@ -308,7 +317,7 @@ PeerImp::json()
|
||||
ret[jss::public_key] = toBase58(TokenType::NodePublic, publicKey_);
|
||||
ret[jss::address] = remote_address_.to_string();
|
||||
|
||||
if (m_inbound)
|
||||
if (inbound_)
|
||||
ret[jss::inbound] = true;
|
||||
|
||||
if (cluster())
|
||||
@@ -506,7 +515,7 @@ PeerImp::close()
|
||||
timer_.cancel(ec);
|
||||
socket_.close(ec);
|
||||
overlay_.incPeerDisconnect();
|
||||
if (m_inbound)
|
||||
if (inbound_)
|
||||
{
|
||||
JLOG(journal_.debug()) << "Closed";
|
||||
}
|
||||
@@ -593,7 +602,7 @@ void
|
||||
PeerImp::setTimer()
|
||||
{
|
||||
error_code ec;
|
||||
timer_.expires_from_now(std::chrono::seconds(Tuning::timerSeconds), ec);
|
||||
timer_.expires_from_now(peerTimerInterval, ec);
|
||||
|
||||
if (ec)
|
||||
{
|
||||
@@ -646,49 +655,41 @@ PeerImp::onTimer(error_code const& ec)
|
||||
return;
|
||||
}
|
||||
|
||||
bool failedNoPing{false};
|
||||
boost::optional<std::uint32_t> pingSeq;
|
||||
// Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_
|
||||
// must be guarded by recentLock_.
|
||||
if (auto const t = tracking_.load(); !inbound_ && t != Tracking::converged)
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
if (no_ping_++ >= Tuning::noPing)
|
||||
{
|
||||
failedNoPing = true;
|
||||
}
|
||||
else if (!lastPingSeq_)
|
||||
{
|
||||
// Make the sequence unpredictable enough to prevent guessing
|
||||
lastPingSeq_ = rand_int<std::uint32_t>();
|
||||
lastPingTime_ = clock_type::now();
|
||||
pingSeq = lastPingSeq_;
|
||||
}
|
||||
else
|
||||
{
|
||||
// We have an outstanding ping, raise latency
|
||||
auto const minLatency =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
clock_type::now() - lastPingTime_);
|
||||
clock_type::duration duration;
|
||||
|
||||
if (latency_ < minLatency)
|
||||
latency_ = minLatency;
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
duration = clock_type::now() - trackingTime_;
|
||||
}
|
||||
|
||||
if ((t == Tracking::diverged &&
|
||||
(duration > app_.config().MAX_DIVERGED_TIME)) ||
|
||||
(t == Tracking::unknown &&
|
||||
(duration > app_.config().MAX_UNKNOWN_TIME)))
|
||||
{
|
||||
overlay_.peerFinder().on_failure(slot_);
|
||||
fail("Not useful");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
if (failedNoPing)
|
||||
// Already waiting for PONG
|
||||
if (lastPingSeq_)
|
||||
{
|
||||
fail("No ping reply received");
|
||||
fail("Ping Timeout");
|
||||
return;
|
||||
}
|
||||
|
||||
if (pingSeq)
|
||||
{
|
||||
protocol::TMPing message;
|
||||
message.set_type(protocol::TMPing::ptPING);
|
||||
message.set_seq(*pingSeq);
|
||||
lastPingTime_ = clock_type::now();
|
||||
lastPingSeq_ = rand_int<std::uint32_t>();
|
||||
|
||||
send(std::make_shared<Message>(message, protocol::mtPING));
|
||||
}
|
||||
protocol::TMPing message;
|
||||
message.set_type(protocol::TMPing::ptPING);
|
||||
message.set_seq(*lastPingSeq_);
|
||||
|
||||
send(std::make_shared<Message>(message, protocol::mtPING));
|
||||
|
||||
setTimer();
|
||||
}
|
||||
@@ -1044,31 +1045,25 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMPing> const& m)
|
||||
return;
|
||||
}
|
||||
|
||||
if (m->type() == protocol::TMPing::ptPONG)
|
||||
if (m->type() == protocol::TMPing::ptPONG && m->has_seq())
|
||||
{
|
||||
// Operations on lastPingSeq_, lastPingTime_, no_ping_, and latency_
|
||||
// must be guarded by recentLock_.
|
||||
std::lock_guard sl(recentLock_);
|
||||
|
||||
if (m->has_seq() && m->seq() == lastPingSeq_)
|
||||
// Only reset the ping sequence if we actually received a
|
||||
// PONG with the correct cookie. That way, any peers which
|
||||
// respond with incorrect cookies will eventually time out.
|
||||
if (m->seq() == lastPingSeq_)
|
||||
{
|
||||
no_ping_ = 0;
|
||||
|
||||
// Only reset the ping sequence if we actually received a
|
||||
// PONG with the correct cookie. That way, any peers which
|
||||
// respond with incorrect cookies will eventually time out.
|
||||
lastPingSeq_.reset();
|
||||
|
||||
// Update latency estimate
|
||||
auto const estimate =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(
|
||||
clock_type::now() - lastPingTime_);
|
||||
auto const rtt = std::chrono::round<std::chrono::milliseconds>(
|
||||
clock_type::now() - lastPingTime_);
|
||||
|
||||
std::lock_guard sl(recentLock_);
|
||||
|
||||
// Calculate the cumulative moving average of the latency:
|
||||
if (latency_)
|
||||
latency_ = (*latency_ * 7 + estimate) / 8;
|
||||
latency_ = (*latency_ * 7 + rtt) / 8;
|
||||
else
|
||||
latency_ = estimate;
|
||||
latency_ = rtt;
|
||||
}
|
||||
|
||||
return;
|
||||
@@ -1875,46 +1870,6 @@ PeerImp::checkTracking(std::uint32_t seq1, std::uint32_t seq2)
|
||||
}
|
||||
}
|
||||
|
||||
// Should this connection be rejected
|
||||
// and considered a failure
|
||||
void
|
||||
PeerImp::check()
|
||||
{
|
||||
if (m_inbound)
|
||||
return;
|
||||
|
||||
auto const sanity = tracking_.load();
|
||||
|
||||
if (sanity == Tracking::converged)
|
||||
return;
|
||||
|
||||
clock_type::duration duration;
|
||||
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
duration = clock_type::now() - trackingTime_;
|
||||
}
|
||||
|
||||
bool reject = false;
|
||||
|
||||
if (sanity == Tracking::diverged)
|
||||
reject = (duration > app_.config().MAX_DIVERGED_TIME);
|
||||
|
||||
if (sanity == Tracking::unknown)
|
||||
reject = (duration > app_.config().MAX_UNKNOWN_TIME);
|
||||
|
||||
if (reject)
|
||||
{
|
||||
overlay_.peerFinder().on_failure(slot_);
|
||||
post(
|
||||
strand_,
|
||||
std::bind(
|
||||
(void (PeerImp::*)(std::string const&)) & PeerImp::fail,
|
||||
shared_from_this(),
|
||||
"Not useful"));
|
||||
}
|
||||
}
|
||||
|
||||
void
|
||||
PeerImp::onMessage(std::shared_ptr<protocol::TMHaveTransactionSet> const& m)
|
||||
{
|
||||
@@ -2989,7 +2944,7 @@ bool
|
||||
PeerImp::isHighLatency() const
|
||||
{
|
||||
std::lock_guard sl(recentLock_);
|
||||
return latency_ >= Tuning::peerHighLatency;
|
||||
return latency_ >= peerHighLatency;
|
||||
}
|
||||
|
||||
void
|
||||
|
||||
@@ -90,7 +90,7 @@ private:
|
||||
// These are up here to prevent warnings about order of initializations
|
||||
//
|
||||
OverlayImpl& overlay_;
|
||||
bool const m_inbound;
|
||||
bool const inbound_;
|
||||
|
||||
// Protocol version to use for this link
|
||||
ProtocolVersion protocol_;
|
||||
@@ -144,9 +144,6 @@ private:
|
||||
//
|
||||
// o name_
|
||||
// o last_status_
|
||||
// o lastPingSeq_
|
||||
// o lastPingTime_
|
||||
// o no_ping_
|
||||
//
|
||||
// June 2019
|
||||
|
||||
@@ -163,7 +160,6 @@ private:
|
||||
std::queue<std::shared_ptr<Message>> send_queue_;
|
||||
bool gracefulClose_ = false;
|
||||
int large_sendq_ = 0;
|
||||
int no_ping_ = 0;
|
||||
std::unique_ptr<LoadEvent> load_event_;
|
||||
// The highest sequence of each PublisherList that has
|
||||
// been sent to or received from this peer.
|
||||
@@ -593,11 +589,12 @@ PeerImp::PeerImp(
|
||||
, timer_(waitable_timer{socket_.get_executor()})
|
||||
, remote_address_(slot->remote_endpoint())
|
||||
, overlay_(overlay)
|
||||
, m_inbound(false)
|
||||
, inbound_(false)
|
||||
, protocol_(protocol)
|
||||
, tracking_(Tracking::unknown)
|
||||
, trackingTime_(clock_type::now())
|
||||
, publicKey_(publicKey)
|
||||
, lastPingTime_(clock_type::now())
|
||||
, creationTime_(clock_type::now())
|
||||
, usage_(usage)
|
||||
, fee_(Resource::feeLightPeer)
|
||||
|
||||
@@ -42,19 +42,10 @@ enum {
|
||||
reply */
|
||||
maxReplyNodes = 8192,
|
||||
|
||||
/** How often we check connections (seconds) */
|
||||
checkSeconds = 32,
|
||||
|
||||
/** How often we latency/sendq probe connections */
|
||||
timerSeconds = 8,
|
||||
|
||||
/** How many timer intervals a sendq has to stay large before we disconnect
|
||||
*/
|
||||
sendqIntervals = 4,
|
||||
|
||||
/** How many timer intervals we can go without a ping reply */
|
||||
noPing = 10,
|
||||
|
||||
/** How many messages on a send queue before we refuse queries */
|
||||
dropSendQueue = 192,
|
||||
|
||||
@@ -68,9 +59,6 @@ enum {
|
||||
checkIdlePeers = 4,
|
||||
};
|
||||
|
||||
/** The threshold above which we treat a peer connection as high latency */
|
||||
std::chrono::milliseconds constexpr peerHighLatency{300};
|
||||
|
||||
} // namespace Tuning
|
||||
|
||||
} // namespace ripple
|
||||
|
||||
Reference in New Issue
Block a user