From e95bda3bdf1f5237a091fbefcc8c9e3864e1ec64 Mon Sep 17 00:00:00 2001 From: JoelKatz Date: Tue, 28 Apr 2015 16:02:52 -0700 Subject: [PATCH] Peer latency tracking (RIPD-879): Track peer latency, report in RPC, make peer selection for fetching latency aware. This also cleans up the PeerImp timer to minimize resetting. Indirect routing is made latency-aware as well. --- src/ripple/app/ledger/InboundLedger.cpp | 12 +- src/ripple/app/tx/TransactionAcquire.cpp | 45 +------ src/ripple/overlay/Overlay.h | 45 ++++++- src/ripple/overlay/impl/OverlayImpl.cpp | 40 ++++++ src/ripple/overlay/impl/OverlayImpl.h | 15 ++- src/ripple/overlay/impl/PeerImp.cpp | 158 +++++++++++++++++++---- src/ripple/overlay/impl/PeerImp.h | 12 +- src/ripple/protocol/JsonFields.h | 1 + 8 files changed, 250 insertions(+), 78 deletions(-) diff --git a/src/ripple/app/ledger/InboundLedger.cpp b/src/ripple/app/ledger/InboundLedger.cpp index 69f82ff01..233756ac9 100644 --- a/src/ripple/app/ledger/InboundLedger.cpp +++ b/src/ripple/app/ledger/InboundLedger.cpp @@ -38,8 +38,14 @@ namespace ripple { enum { + // Number of peers to start with + peerCountStart = 4 + + // Number of peers to add on a timeout + ,peerCountAdd = 2 + // millisecond for each ledger timeout - ledgerAcquireTimeoutMillis = 2500 + ,ledgerAcquireTimeoutMillis = 2500 // how many timeouts before we giveup ,ledgerTimeoutRetriesMax = 10 @@ -296,7 +302,9 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) /** Add more peers to the set, if possible */ void InboundLedger::addPeers () { - getApp().overlay().selectPeers (&this, 6, ScoreHasLedger (getHash(), mSeq)); + getApp().overlay().selectPeers (*this, + (getPeerCount() > 0) ? peerCountStart : peerCountAdd, + ScoreHasLedger (getHash(), mSeq)); } std::weak_ptr InboundLedger::pmDowncast () diff --git a/src/ripple/app/tx/TransactionAcquire.cpp b/src/ripple/app/tx/TransactionAcquire.cpp index e34eec4c3..e80b25a7e 100644 --- a/src/ripple/app/tx/TransactionAcquire.cpp +++ b/src/ripple/app/tx/TransactionAcquire.cpp @@ -232,50 +232,7 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list& node void TransactionAcquire::addPeers (int numPeers) { - std::vector peerVec1, peerVec2; - - { - auto peers = getApp().overlay().getActivePeers(); - for (auto const& peer : peers) - { - if (peer->hasTxSet (mHash)) - peerVec1.push_back (peer); - else - peerVec2.push_back (peer); - } - } - - WriteLog (lsDEBUG, TransactionAcquire) << peerVec1.size() << " peers known to have " << mHash; - - if (peerVec1.size() != 0) - { - // First try peers known to have the set - std::random_shuffle (peerVec1.begin (), peerVec1.end ()); - - for (auto const& peer : peerVec1) - { - if (peerHas (peer)) - { - if (--numPeers <= 0) - return; - } - } - } - - if (peerVec2.size() != 0) - { - // Then try peers not known to have the set - std::random_shuffle (peerVec2.begin (), peerVec2.end ()); - - for (auto const& peer : peerVec2) - { - if (peerHas (peer)) - { - if (--numPeers <= 0) - return; - } - } - } + getApp().overlay().selectPeers (*this, numPeers, ScoreHasTxSet (getHash())); } void TransactionAcquire::init (int numPeers) diff --git a/src/ripple/overlay/Overlay.h b/src/ripple/overlay/Overlay.h index e28c47b6e..7b014c30a 100644 --- a/src/ripple/overlay/Overlay.h +++ b/src/ripple/overlay/Overlay.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_OVERLAY_OVERLAY_H_INCLUDED #define RIPPLE_OVERLAY_OVERLAY_H_INCLUDED +#include #include #include #include @@ -31,7 +32,7 @@ #include // #include #include -#include +#include namespace boost { namespace asio { namespace ssl { class context; } } } @@ -203,6 +204,48 @@ public: for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i) f (*i); } + + /** Select from active peers + + Scores all active peers. + Tries to accept the highest scoring peers, up to the requested count, + Returns the number of selected peers accepted. + + The score function must: + - Be callable as: + bool (PeerImp::ptr) + - Return a true if the peer is prefered + + The accept function must: + - Be callable as: + bool (PeerImp::ptr) + - Return a true if the peer is accepted + + */ + virtual + std::size_t + selectPeers (PeerSet& set, std::size_t limit, std::function< + bool(std::shared_ptr const&)> score) = 0; +}; + +struct ScoreHasLedger +{ + uint256 const& hash_; + std::uint32_t seq_; + bool operator()(std::shared_ptr const&) const; + + ScoreHasLedger (uint256 const& hash, std::uint32_t seq) + : hash_ (hash), seq_ (seq) + {} +}; + +struct ScoreHasTxSet +{ + uint256 const& hash_; + bool operator()(std::shared_ptr const&) const; + + ScoreHasTxSet (uint256 const& hash) : hash_ (hash) + {} }; } diff --git a/src/ripple/overlay/impl/OverlayImpl.cpp b/src/ripple/overlay/impl/OverlayImpl.cpp index 7bcf43708..032845ec5 100644 --- a/src/ripple/overlay/impl/OverlayImpl.cpp +++ b/src/ripple/overlay/impl/OverlayImpl.cpp @@ -570,6 +570,33 @@ OverlayImpl::onPeerDeactivate (Peer::id_t id, m_publicKeyMap.erase(publicKey); } +std::size_t +OverlayImpl::selectPeers (PeerSet& set, std::size_t limit, + std::function const&)> score) +{ + using item = std::pair>; + std::vector v; + { + std::lock_guard lock (mutex_); + v.reserve(m_publicKeyMap.size()); + for_each_unlocked ([&](std::shared_ptr && e) + { + v.emplace_back( + e->getScore(score(e)), std::move(e)); + }); + } + std::sort(v.begin(), v.end(), + [](item const& lhs, item const&rhs) + { + return lhs.first > rhs.first; + }); + std::size_t accepted = 0; + for (auto const& e : v) + if (set.peerHas(e.second) && ++accepted >= limit) + break; + return accepted; +} + /** The number of active peers on the network Active peers are only those peers that have completed the handshake and are running the Ripple protocol. @@ -829,6 +856,19 @@ OverlayImpl::sendEndpoints() } } +//------------------------------------------------------------------------------ + +bool ScoreHasLedger::operator()(std::shared_ptr const& bp) const +{ + auto const& p = std::dynamic_pointer_cast(bp); + return p->hasLedger (hash_, seq_); +} + +bool ScoreHasTxSet::operator()(std::shared_ptr const& bp) const +{ + auto const& p = std::dynamic_pointer_cast(bp); + return p->hasTxSet (hash_); +} //------------------------------------------------------------------------------ diff --git a/src/ripple/overlay/impl/OverlayImpl.h b/src/ripple/overlay/impl/OverlayImpl.h index 5b1ce9521..6378b6988 100644 --- a/src/ripple/overlay/impl/OverlayImpl.h +++ b/src/ripple/overlay/impl/OverlayImpl.h @@ -221,9 +221,8 @@ public: // template void - for_each (UnaryFunc&& f) + for_each_unlocked (UnaryFunc&& f) { - std::lock_guard lock (mutex_); for (auto const& e : m_publicKeyMap) { auto sp = e.second.lock(); @@ -232,6 +231,18 @@ public: } } + template + void + for_each (UnaryFunc&& f) + { + std::lock_guard lock (mutex_); + for_each_unlocked(f); + } + + std::size_t + selectPeers (PeerSet& set, std::size_t limit, std::function< + bool(std::shared_ptr const&)> score) override; + static bool isPeerUpgrade (beast::http::message const& request); diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 3bd6f3dac..602199960 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -144,6 +144,8 @@ PeerImp::run() } doProtocolStart(); } + + setTimer(); } void @@ -188,7 +190,7 @@ PeerImp::send (Message::pointer const& m) send_queue_.push(m); if(send_queue_.size() > 1) return; - setTimer(); + recent_empty_ = true; boost::asio::async_write (stream_, boost::asio::buffer( send_queue_.front()->getBuffer()), strand_.wrap(std::bind( &PeerImp::onWriteMessage, shared_from_this(), @@ -259,6 +261,17 @@ PeerImp::json() ret[jss::protocol] = to_string (protocol); } + { + std::chrono::milliseconds latency; + { + std::lock_guard sl (recentLock_); + latency = latency_; + } + + if (latency != std::chrono::milliseconds (-1)) + ret[jss::latency] = static_cast (latency.count()); + } + std::uint32_t minSeq, maxSeq; ledgerRange(minSeq, maxSeq); @@ -436,7 +449,9 @@ void PeerImp::setTimer() { error_code ec; - timer_.expires_from_now(std::chrono::seconds(15), ec); + timer_.expires_from_now( std::chrono::seconds( + (lastPingSeq_ == 0) ? 3 : 15), ec); + if (ec) { if (journal_.error) journal_.error << @@ -482,7 +497,27 @@ PeerImp::onTimer (error_code const& ec) return close(); } - fail("Timeout"); + if (! recent_empty_) + { + fail ("Timeout"); + return; + } + + recent_empty_ = false; + + // Make sequence unpredictable enough that a peer + // can't fake their latency + lastPingSeq_ += (rand() % 8192); + lastPingTime_ = clock_type::now(); + + protocol::TMPing message; + message.set_type (protocol::TMPing::ptPING); + message.set_seq (lastPingSeq_); + + send (std::make_shared ( + message, protocol::mtPING)); + + setTimer(); } void @@ -585,7 +620,6 @@ PeerImp::makeResponse (bool crawl, void PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred) { - cancelTimer(); if(! socket_.is_open()) return; if(ec == boost::asio::error::operation_aborted) @@ -604,7 +638,6 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred) if (write_buffer_.size() == 0) return doProtocolStart(); - setTimer(); stream_.async_write_some (write_buffer_.data(), strand_.wrap (std::bind (&PeerImp::onWriteResponse, shared_from_this(), beast::asio::placeholders::error, @@ -672,7 +705,6 @@ PeerImp::onReadMessage (error_code ec, std::size_t bytes_transferred) void PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) { - cancelTimer(); if(! socket_.is_open()) return; if(ec == boost::asio::error::operation_aborted) @@ -692,7 +724,6 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) if (! send_queue_.empty()) { // Timeout on writes only - setTimer(); return boost::asio::async_write (stream_, boost::asio::buffer( send_queue_.front()->getBuffer()), strand_.wrap(std::bind( &PeerImp::onWriteMessage, shared_from_this(), @@ -702,7 +733,6 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred) if (gracefulClose_) { - setTimer(); return stream_.async_shutdown(strand_.wrap(std::bind( &PeerImp::onShutdown, shared_from_this(), beast::asio::placeholders::error))); @@ -752,9 +782,35 @@ PeerImp::onMessage (std::shared_ptr const& m) { if (m->type () == protocol::TMPing::ptPING) { + // We have received a ping request, reply with a pong fee_ = Resource::feeMediumBurdenPeer; m->set_type (protocol::TMPing::ptPONG); send (std::make_shared (*m, protocol::mtPING)); + + return; + } + + if ((m->type () == protocol::TMPing::ptPONG) && m->has_seq ()) + { + // We have received a pong, update our latency estimate + auto unknownLatency = std::chrono::milliseconds (-1); + + std::lock_guard sl(recentLock_); + + if ((lastPingSeq_ != 0) && (m->seq () == lastPingSeq_)) + { + auto estimate = std::chrono::duration_cast + (clock_type::now() - lastPingTime_); + if (latency_ == unknownLatency) + latency_ = estimate; + else + latency_ = (latency_ * 7 + estimate) / 8; + } + else + latency_ = unknownLatency; + lastPingSeq_ = 0; + + return; } } @@ -1720,36 +1776,56 @@ PeerImp::checkValidation (Job&, STValidation::pointer val, // the TX tree with the specified root hash. // static -std::vector> -getPeersWithTree (OverlayImpl& ov, +std::shared_ptr +getPeerWithTree (OverlayImpl& ov, uint256 const& rootHash, PeerImp const* skip) { - std::vector> v; + std::shared_ptr ret; + int retScore = 0; + ov.for_each([&](std::shared_ptr const& p) { if (p->hasTxSet(rootHash) && p.get() != skip) - v.push_back(p); + { + auto score = p->getScore (true); + if (! ret || (score > retScore)) + { + ret = p; + retScore = score; + } + } }); - return v; + + return ret; } // Returns the set of peers that claim // to have the specified ledger. // static -std::vector> -getPeersWithLedger (OverlayImpl& ov, +std::shared_ptr +getPeerWithLedger (OverlayImpl& ov, uint256 const& ledgerHash, LedgerIndex ledger, PeerImp const* skip) { - std::vector> v; + std::shared_ptr ret; + int retScore = 0; + ov.for_each([&](std::shared_ptr const& p) { if (p->hasLedger(ledgerHash, ledger) && p.get() != skip) - v.push_back(p); + { + auto score = p->getScore (true); + if (! ret || (score > retScore)) + { + ret = p; + retScore = score; + } + } }); - return v; + + return ret; } // VFALCO NOTE This function is way too big and cumbersome. @@ -1791,19 +1867,17 @@ PeerImp::getLedger (std::shared_ptr const& m) p_journal_.debug << "GetLedger: Routing Tx set request"; - auto const v = getPeersWithTree( + auto const v = getPeerWithTree( overlay_, txHash, this); - if (v.empty()) + if (! v) { p_journal_.info << "GetLedger: Route TX set failed"; return; } - auto const& p = - v[rand () % v.size ()]; packet.set_requestcookie (id ()); - p->send (std::make_shared ( + v->send (std::make_shared ( packet, protocol::mtGET_LEDGER)); return; } @@ -1863,18 +1937,17 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.has_ledgerseq ()) seq = packet.ledgerseq (); - auto const v = getPeersWithLedger( + auto const v = getPeerWithLedger( overlay_, ledgerhash, seq, this); - if (v.empty ()) + if (! v) { p_journal_.trace << "GetLedger: Cannot route"; return; } - auto const& p = v[rand () % v.size ()]; packet.set_requestcookie (id ()); - p->send (std::make_shared( + v->send (std::make_shared( packet, protocol::mtGET_LEDGER)); p_journal_.debug << "GetLedger: Request routed"; @@ -2088,4 +2161,35 @@ PeerImp::peerTXData (Job&, uint256 const& hash, getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket); } +int +PeerImp::getScore (bool haveItem) +{ + // Random component of score, used to break ties and avoid + // overloading the "best" peer + static const int spRandom = 10000; + + // Score for being very likely to have the thing we are + // look for + static const int spHaveItem = 10000; + + // Score reduction for each millisecond of latency + static const int spLatency = 100; + + int score = rand() % spRandom; + + if (haveItem) + score += spHaveItem; + + std::chrono::milliseconds latency; + { + std::lock_guard sl (recentLock_); + + latency = latency_; + } + if (latency != std::chrono::milliseconds (-1)) + score -= latency.count() * spLatency; + + return score; +} + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 4e245bab1..6945eecb6 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -139,6 +139,11 @@ private: uint256 previousLedgerHash_; std::deque recentLedgers_; std::deque recentTxSets_; + + std::chrono::milliseconds latency_ = std::chrono::milliseconds (-1); + std::uint64_t lastPingSeq_ = 0; + clock_type::time_point lastPingTime_; + mutable std::mutex recentLock_; protocol::TMStatusChange last_status_; protocol::TMHello hello_; @@ -151,6 +156,7 @@ private: beast::asio::streambuf write_buffer_; std::queue send_queue_; bool gracefulClose_ = false; + bool recent_empty_ = true; std::unique_ptr load_event_; std::unique_ptr validatorsConnection_; bool hopsAware_ = false; @@ -297,6 +303,10 @@ public: bool hasRange (std::uint32_t uMin, std::uint32_t uMax) override; + // Called to determine our priority for querying + int + getScore (bool haveItem); + private: void close(); @@ -396,8 +406,6 @@ public: void onMessage (std::shared_ptr const& m); void onMessage (std::shared_ptr const& m); - //-------------------------------------------------------------------------- - private: State state() const { diff --git a/src/ripple/protocol/JsonFields.h b/src/ripple/protocol/JsonFields.h index f298efa16..826320751 100644 --- a/src/ripple/protocol/JsonFields.h +++ b/src/ripple/protocol/JsonFields.h @@ -180,6 +180,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe, // out: paths/Node, STPathSet, STAmount JSS ( key ); // out: WalletSeed JSS ( key_type ); // in/out: WalletPropose, TransactionSign +JSS ( latency ); // out: PeerImp JSS ( last ); // out: RPCVersion JSS ( last_close ); // out: NetworkOPs JSS ( ledger ); // in: NetworkOPs, LedgerCleaner,