From e95ab65396a4e81156e497f7a0da73b2af2471c0 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Fri, 5 Jun 2015 10:06:01 -0700 Subject: [PATCH] Improve peer send queue management: * Disconnect peers on sustained large send queues * Disconnect peers on sustained failure to pong * Refuse some queries if send queue is at target * Allow latency to exceed ping timer interval --- src/ripple/overlay/impl/PeerImp.cpp | 65 ++++++++++++++++++++++------- src/ripple/overlay/impl/PeerImp.h | 3 +- src/ripple/overlay/impl/Tuning.h | 15 +++++++ 3 files changed, 67 insertions(+), 16 deletions(-) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 10d97b884..5fac48cc7 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -189,10 +189,22 @@ PeerImp::send (Message::pointer const& m) return; if(detaching_) return; + + auto sendq_size = send_queue_.size(); + + if (sendq_size < Tuning::targetSendQueue) + { + // To detect a peer that does not read from their + // side of the connection, we expect a peer to have + // a small senq periodically + large_sendq_ = 0; + } + send_queue_.push(m); - if(send_queue_.size() > 1) + + if(sendq_size != 0) return; - recent_empty_ = true; + boost::asio::async_write (stream_, boost::asio::buffer( send_queue_.front()->getBuffer()), strand_.wrap(std::bind( &PeerImp::onWriteMessage, shared_from_this(), @@ -452,7 +464,7 @@ PeerImp::setTimer() { error_code ec; timer_.expires_from_now( std::chrono::seconds( - (lastPingSeq_ == 0) ? 3 : 15), ec); + Tuning::timerSeconds), ec); if (ec) { @@ -499,25 +511,32 @@ PeerImp::onTimer (error_code const& ec) return close(); } - if (! recent_empty_) + if (large_sendq_++ >= Tuning::sendqIntervals) { - fail ("Timeout"); + fail ("Large send queue"); return; } - recent_empty_ = false; + if (no_ping_++ >= Tuning::noPing) + { + fail ("No ping reply received"); + return; + } - // Make sequence unpredictable enough that a peer - // can't fake their latency - lastPingSeq_ += (rand() % 8192); - lastPingTime_ = clock_type::now(); + if (lastPingSeq_ == 0) + { + // Make sequence unpredictable enough that a peer + // can't fake their latency + lastPingSeq_ = (rand() % 65536); + lastPingTime_ = clock_type::now(); - protocol::TMPing message; - message.set_type (protocol::TMPing::ptPING); - message.set_seq (lastPingSeq_); + protocol::TMPing message; + message.set_type (protocol::TMPing::ptPING); + message.set_seq (lastPingSeq_); - send (std::make_shared ( - message, protocol::mtPING)); + send (std::make_shared ( + message, protocol::mtPING)); + } setTimer(); } @@ -828,6 +847,7 @@ PeerImp::onMessage (std::shared_ptr const& m) if ((lastPingSeq_ != 0) && (m->seq () == lastPingSeq_)) { + no_ping_ = 0; auto estimate = std::chrono::duration_cast (clock_type::now() - lastPingTime_); if (latency_ == unknownLatency) @@ -1454,6 +1474,14 @@ PeerImp::onMessage (std::shared_ptr const& m) if (packet.query ()) { // this is a query + if (send_queue_.size() >= Tuning::dropSendQueue) + { + if (p_journal_.debug) p_journal_.debug << + "GetObject: Large send queue"; + return; + } + + if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) { doFetchPack (m); @@ -1897,6 +1925,13 @@ PeerImp::getLedger (std::shared_ptr const& m) } else { + if (send_queue_.size() >= Tuning::dropSendQueue) + { + if (p_journal_.debug) p_journal_.debug << + "GetLedger: Large send queue"; + return; + } + if (getApp().getFeeTrack().isLoadedLocal() && ! cluster()) { if (p_journal_.debug) p_journal_.debug << diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 18bccf9dd..b6c386151 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -156,7 +156,8 @@ private: beast::asio::streambuf write_buffer_; std::queue send_queue_; bool gracefulClose_ = false; - bool recent_empty_ = true; + int large_sendq_ = 0; + int no_ping_ = 0; std::unique_ptr load_event_; std::unique_ptr validatorsConnection_; bool hopsAware_ = false; diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 8289a0403..2fdd20e41 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -52,6 +52,21 @@ enum /** How often we check connections (seconds) */ checkSeconds = 10, + + /** How often we latency/sendq probe connections */ + timerSeconds = 3, + + /** How many timer intervals a sendq has to stay large before we disconnect */ + sendqIntervals = 3, + + /** How many timer intervals we can go without a ping reply */ + noPing = 4, + + /** How many messages on a send queue before we refuse queries */ + dropSendQueue = 5, + + /** How many messages we consider reasonable sustained on a send queue */ + targetSendQueue = 16, }; } // Tuning