Improve peer send queue management:

* Disconnect peers on sustained large send queues
* Disconnect peers on sustained failure to pong
* Refuse some queries if send queue is at target
* Allow latency to exceed ping timer interval
This commit is contained in:
JoelKatz
2015-06-05 10:06:01 -07:00
committed by Nik Bougalis
parent e0907ede4f
commit e95ab65396
3 changed files with 67 additions and 16 deletions

View File

@@ -189,10 +189,22 @@ PeerImp::send (Message::pointer const& m)
return; return;
if(detaching_) if(detaching_)
return; return;
auto sendq_size = send_queue_.size();
if (sendq_size < Tuning::targetSendQueue)
{
// To detect a peer that does not read from their
// side of the connection, we expect a peer to have
// a small senq periodically
large_sendq_ = 0;
}
send_queue_.push(m); send_queue_.push(m);
if(send_queue_.size() > 1)
if(sendq_size != 0)
return; return;
recent_empty_ = true;
boost::asio::async_write (stream_, boost::asio::buffer( boost::asio::async_write (stream_, boost::asio::buffer(
send_queue_.front()->getBuffer()), strand_.wrap(std::bind( send_queue_.front()->getBuffer()), strand_.wrap(std::bind(
&PeerImp::onWriteMessage, shared_from_this(), &PeerImp::onWriteMessage, shared_from_this(),
@@ -452,7 +464,7 @@ PeerImp::setTimer()
{ {
error_code ec; error_code ec;
timer_.expires_from_now( std::chrono::seconds( timer_.expires_from_now( std::chrono::seconds(
(lastPingSeq_ == 0) ? 3 : 15), ec); Tuning::timerSeconds), ec);
if (ec) if (ec)
{ {
@@ -499,25 +511,32 @@ PeerImp::onTimer (error_code const& ec)
return close(); return close();
} }
if (! recent_empty_) if (large_sendq_++ >= Tuning::sendqIntervals)
{ {
fail ("Timeout"); fail ("Large send queue");
return; return;
} }
recent_empty_ = false; if (no_ping_++ >= Tuning::noPing)
{
fail ("No ping reply received");
return;
}
// Make sequence unpredictable enough that a peer if (lastPingSeq_ == 0)
// can't fake their latency {
lastPingSeq_ += (rand() % 8192); // Make sequence unpredictable enough that a peer
lastPingTime_ = clock_type::now(); // can't fake their latency
lastPingSeq_ = (rand() % 65536);
lastPingTime_ = clock_type::now();
protocol::TMPing message; protocol::TMPing message;
message.set_type (protocol::TMPing::ptPING); message.set_type (protocol::TMPing::ptPING);
message.set_seq (lastPingSeq_); message.set_seq (lastPingSeq_);
send (std::make_shared<Message> ( send (std::make_shared<Message> (
message, protocol::mtPING)); message, protocol::mtPING));
}
setTimer(); setTimer();
} }
@@ -828,6 +847,7 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPing> const& m)
if ((lastPingSeq_ != 0) && (m->seq () == lastPingSeq_)) if ((lastPingSeq_ != 0) && (m->seq () == lastPingSeq_))
{ {
no_ping_ = 0;
auto estimate = std::chrono::duration_cast <std::chrono::milliseconds> auto estimate = std::chrono::duration_cast <std::chrono::milliseconds>
(clock_type::now() - lastPingTime_); (clock_type::now() - lastPingTime_);
if (latency_ == unknownLatency) if (latency_ == unknownLatency)
@@ -1454,6 +1474,14 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m)
if (packet.query ()) if (packet.query ())
{ {
// this is a query // this is a query
if (send_queue_.size() >= Tuning::dropSendQueue)
{
if (p_journal_.debug) p_journal_.debug <<
"GetObject: Large send queue";
return;
}
if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK) if (packet.type () == protocol::TMGetObjectByHash::otFETCH_PACK)
{ {
doFetchPack (m); doFetchPack (m);
@@ -1897,6 +1925,13 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
} }
else else
{ {
if (send_queue_.size() >= Tuning::dropSendQueue)
{
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Large send queue";
return;
}
if (getApp().getFeeTrack().isLoadedLocal() && ! cluster()) if (getApp().getFeeTrack().isLoadedLocal() && ! cluster())
{ {
if (p_journal_.debug) p_journal_.debug << if (p_journal_.debug) p_journal_.debug <<

View File

@@ -156,7 +156,8 @@ private:
beast::asio::streambuf write_buffer_; beast::asio::streambuf write_buffer_;
std::queue<Message::pointer> send_queue_; std::queue<Message::pointer> send_queue_;
bool gracefulClose_ = false; bool gracefulClose_ = false;
bool recent_empty_ = true; int large_sendq_ = 0;
int no_ping_ = 0;
std::unique_ptr <LoadEvent> load_event_; std::unique_ptr <LoadEvent> load_event_;
std::unique_ptr<Validators::Connection> validatorsConnection_; std::unique_ptr<Validators::Connection> validatorsConnection_;
bool hopsAware_ = false; bool hopsAware_ = false;

View File

@@ -52,6 +52,21 @@ enum
/** How often we check connections (seconds) */ /** How often we check connections (seconds) */
checkSeconds = 10, checkSeconds = 10,
/** How often we latency/sendq probe connections */
timerSeconds = 3,
/** How many timer intervals a sendq has to stay large before we disconnect */
sendqIntervals = 3,
/** How many timer intervals we can go without a ping reply */
noPing = 4,
/** How many messages on a send queue before we refuse queries */
dropSendQueue = 5,
/** How many messages we consider reasonable sustained on a send queue */
targetSendQueue = 16,
}; };
} // Tuning } // Tuning