Peer latency tracking (RIPD-879):

Track peer latency, report in RPC, make peer selection for
fetching latency aware.

This also cleans up the PeerImp timer to minimize
resetting. Indirect routing is made latency-aware as well.
This commit is contained in:
JoelKatz
2015-04-28 16:02:52 -07:00
committed by Vinnie Falco
parent c010a85ef5
commit e95bda3bdf
8 changed files with 250 additions and 78 deletions

View File

@@ -38,8 +38,14 @@ namespace ripple {
enum
{
// Number of peers to start with
peerCountStart = 4
// Number of peers to add on a timeout
,peerCountAdd = 2
// millisecond for each ledger timeout
ledgerAcquireTimeoutMillis = 2500
,ledgerAcquireTimeoutMillis = 2500
// how many timeouts before we giveup
,ledgerTimeoutRetriesMax = 10
@@ -296,7 +302,9 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
/** Add more peers to the set, if possible */
void InboundLedger::addPeers ()
{
getApp().overlay().selectPeers (&this, 6, ScoreHasLedger (getHash(), mSeq));
getApp().overlay().selectPeers (*this,
(getPeerCount() > 0) ? peerCountStart : peerCountAdd,
ScoreHasLedger (getHash(), mSeq));
}
std::weak_ptr<PeerSet> InboundLedger::pmDowncast ()

View File

@@ -232,50 +232,7 @@ SHAMapAddNode TransactionAcquire::takeNodes (const std::list<SHAMapNodeID>& node
void TransactionAcquire::addPeers (int numPeers)
{
std::vector <Peer::ptr> peerVec1, peerVec2;
{
auto peers = getApp().overlay().getActivePeers();
for (auto const& peer : peers)
{
if (peer->hasTxSet (mHash))
peerVec1.push_back (peer);
else
peerVec2.push_back (peer);
}
}
WriteLog (lsDEBUG, TransactionAcquire) << peerVec1.size() << " peers known to have " << mHash;
if (peerVec1.size() != 0)
{
// First try peers known to have the set
std::random_shuffle (peerVec1.begin (), peerVec1.end ());
for (auto const& peer : peerVec1)
{
if (peerHas (peer))
{
if (--numPeers <= 0)
return;
}
}
}
if (peerVec2.size() != 0)
{
// Then try peers not known to have the set
std::random_shuffle (peerVec2.begin (), peerVec2.end ());
for (auto const& peer : peerVec2)
{
if (peerHas (peer))
{
if (--numPeers <= 0)
return;
}
}
}
getApp().overlay().selectPeers (*this, numPeers, ScoreHasTxSet (getHash()));
}
void TransactionAcquire::init (int numPeers)

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
#define RIPPLE_OVERLAY_OVERLAY_H_INCLUDED
#include <ripple/app/peers/PeerSet.h>
#include <ripple/json/json_value.h>
#include <ripple/overlay/Peer.h>
#include <ripple/server/Handoff.h>
@@ -31,7 +32,7 @@
#include <beast/cxx14/type_traits.h> // <type_traits>
#include <boost/asio/buffer.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/optional.hpp>
#include <functional>
namespace boost { namespace asio { namespace ssl { class context; } } }
@@ -203,6 +204,48 @@ public:
for(PeerSequence::const_iterator i = peers.begin(); i != peers.end(); ++i)
f (*i);
}
/** Select from active peers
Scores all active peers.
Tries to accept the highest scoring peers, up to the requested count,
Returns the number of selected peers accepted.
The score function must:
- Be callable as:
bool (PeerImp::ptr)
- Return a true if the peer is prefered
The accept function must:
- Be callable as:
bool (PeerImp::ptr)
- Return a true if the peer is accepted
*/
virtual
std::size_t
selectPeers (PeerSet& set, std::size_t limit, std::function<
bool(std::shared_ptr<Peer> const&)> score) = 0;
};
struct ScoreHasLedger
{
uint256 const& hash_;
std::uint32_t seq_;
bool operator()(std::shared_ptr<Peer> const&) const;
ScoreHasLedger (uint256 const& hash, std::uint32_t seq)
: hash_ (hash), seq_ (seq)
{}
};
struct ScoreHasTxSet
{
uint256 const& hash_;
bool operator()(std::shared_ptr<Peer> const&) const;
ScoreHasTxSet (uint256 const& hash) : hash_ (hash)
{}
};
}

View File

@@ -570,6 +570,33 @@ OverlayImpl::onPeerDeactivate (Peer::id_t id,
m_publicKeyMap.erase(publicKey);
}
std::size_t
OverlayImpl::selectPeers (PeerSet& set, std::size_t limit,
std::function<bool(std::shared_ptr<Peer> const&)> score)
{
using item = std::pair<int, std::shared_ptr<PeerImp>>;
std::vector<item> v;
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
v.reserve(m_publicKeyMap.size());
for_each_unlocked ([&](std::shared_ptr<PeerImp> && e)
{
v.emplace_back(
e->getScore(score(e)), std::move(e));
});
}
std::sort(v.begin(), v.end(),
[](item const& lhs, item const&rhs)
{
return lhs.first > rhs.first;
});
std::size_t accepted = 0;
for (auto const& e : v)
if (set.peerHas(e.second) && ++accepted >= limit)
break;
return accepted;
}
/** The number of active peers on the network
Active peers are only those peers that have completed the handshake
and are running the Ripple protocol.
@@ -829,6 +856,19 @@ OverlayImpl::sendEndpoints()
}
}
//------------------------------------------------------------------------------
bool ScoreHasLedger::operator()(std::shared_ptr<Peer> const& bp) const
{
auto const& p = std::dynamic_pointer_cast<PeerImp>(bp);
return p->hasLedger (hash_, seq_);
}
bool ScoreHasTxSet::operator()(std::shared_ptr<Peer> const& bp) const
{
auto const& p = std::dynamic_pointer_cast<PeerImp>(bp);
return p->hasTxSet (hash_);
}
//------------------------------------------------------------------------------

View File

@@ -221,9 +221,8 @@ public:
//
template <class UnaryFunc>
void
for_each (UnaryFunc&& f)
for_each_unlocked (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for (auto const& e : m_publicKeyMap)
{
auto sp = e.second.lock();
@@ -232,6 +231,18 @@ public:
}
}
template <class UnaryFunc>
void
for_each (UnaryFunc&& f)
{
std::lock_guard <decltype(mutex_)> lock (mutex_);
for_each_unlocked(f);
}
std::size_t
selectPeers (PeerSet& set, std::size_t limit, std::function<
bool(std::shared_ptr<Peer> const&)> score) override;
static
bool
isPeerUpgrade (beast::http::message const& request);

View File

@@ -144,6 +144,8 @@ PeerImp::run()
}
doProtocolStart();
}
setTimer();
}
void
@@ -188,7 +190,7 @@ PeerImp::send (Message::pointer const& m)
send_queue_.push(m);
if(send_queue_.size() > 1)
return;
setTimer();
recent_empty_ = true;
boost::asio::async_write (stream_, boost::asio::buffer(
send_queue_.front()->getBuffer()), strand_.wrap(std::bind(
&PeerImp::onWriteMessage, shared_from_this(),
@@ -259,6 +261,17 @@ PeerImp::json()
ret[jss::protocol] = to_string (protocol);
}
{
std::chrono::milliseconds latency;
{
std::lock_guard<std::mutex> sl (recentLock_);
latency = latency_;
}
if (latency != std::chrono::milliseconds (-1))
ret[jss::latency] = static_cast<Json::UInt> (latency.count());
}
std::uint32_t minSeq, maxSeq;
ledgerRange(minSeq, maxSeq);
@@ -436,7 +449,9 @@ void
PeerImp::setTimer()
{
error_code ec;
timer_.expires_from_now(std::chrono::seconds(15), ec);
timer_.expires_from_now( std::chrono::seconds(
(lastPingSeq_ == 0) ? 3 : 15), ec);
if (ec)
{
if (journal_.error) journal_.error <<
@@ -482,7 +497,27 @@ PeerImp::onTimer (error_code const& ec)
return close();
}
fail("Timeout");
if (! recent_empty_)
{
fail ("Timeout");
return;
}
recent_empty_ = false;
// Make sequence unpredictable enough that a peer
// can't fake their latency
lastPingSeq_ += (rand() % 8192);
lastPingTime_ = clock_type::now();
protocol::TMPing message;
message.set_type (protocol::TMPing::ptPING);
message.set_seq (lastPingSeq_);
send (std::make_shared<Message> (
message, protocol::mtPING));
setTimer();
}
void
@@ -585,7 +620,6 @@ PeerImp::makeResponse (bool crawl,
void
PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred)
{
cancelTimer();
if(! socket_.is_open())
return;
if(ec == boost::asio::error::operation_aborted)
@@ -604,7 +638,6 @@ PeerImp::onWriteResponse (error_code ec, std::size_t bytes_transferred)
if (write_buffer_.size() == 0)
return doProtocolStart();
setTimer();
stream_.async_write_some (write_buffer_.data(),
strand_.wrap (std::bind (&PeerImp::onWriteResponse,
shared_from_this(), beast::asio::placeholders::error,
@@ -672,7 +705,6 @@ PeerImp::onReadMessage (error_code ec, std::size_t bytes_transferred)
void
PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred)
{
cancelTimer();
if(! socket_.is_open())
return;
if(ec == boost::asio::error::operation_aborted)
@@ -692,7 +724,6 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred)
if (! send_queue_.empty())
{
// Timeout on writes only
setTimer();
return boost::asio::async_write (stream_, boost::asio::buffer(
send_queue_.front()->getBuffer()), strand_.wrap(std::bind(
&PeerImp::onWriteMessage, shared_from_this(),
@@ -702,7 +733,6 @@ PeerImp::onWriteMessage (error_code ec, std::size_t bytes_transferred)
if (gracefulClose_)
{
setTimer();
return stream_.async_shutdown(strand_.wrap(std::bind(
&PeerImp::onShutdown, shared_from_this(),
beast::asio::placeholders::error)));
@@ -752,9 +782,35 @@ PeerImp::onMessage (std::shared_ptr <protocol::TMPing> const& m)
{
if (m->type () == protocol::TMPing::ptPING)
{
// We have received a ping request, reply with a pong
fee_ = Resource::feeMediumBurdenPeer;
m->set_type (protocol::TMPing::ptPONG);
send (std::make_shared<Message> (*m, protocol::mtPING));
return;
}
if ((m->type () == protocol::TMPing::ptPONG) && m->has_seq ())
{
// We have received a pong, update our latency estimate
auto unknownLatency = std::chrono::milliseconds (-1);
std::lock_guard<std::mutex> sl(recentLock_);
if ((lastPingSeq_ != 0) && (m->seq () == lastPingSeq_))
{
auto estimate = std::chrono::duration_cast <std::chrono::milliseconds>
(clock_type::now() - lastPingTime_);
if (latency_ == unknownLatency)
latency_ = estimate;
else
latency_ = (latency_ * 7 + estimate) / 8;
}
else
latency_ = unknownLatency;
lastPingSeq_ = 0;
return;
}
}
@@ -1720,36 +1776,56 @@ PeerImp::checkValidation (Job&, STValidation::pointer val,
// the TX tree with the specified root hash.
//
static
std::vector<std::shared_ptr<PeerImp>>
getPeersWithTree (OverlayImpl& ov,
std::shared_ptr<PeerImp>
getPeerWithTree (OverlayImpl& ov,
uint256 const& rootHash, PeerImp const* skip)
{
std::vector<std::shared_ptr<PeerImp>> v;
std::shared_ptr<PeerImp> ret;
int retScore = 0;
ov.for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (p->hasTxSet(rootHash) && p.get() != skip)
v.push_back(p);
{
auto score = p->getScore (true);
if (! ret || (score > retScore))
{
ret = p;
retScore = score;
}
}
});
return v;
return ret;
}
// Returns the set of peers that claim
// to have the specified ledger.
//
static
std::vector<std::shared_ptr<PeerImp>>
getPeersWithLedger (OverlayImpl& ov,
std::shared_ptr<PeerImp>
getPeerWithLedger (OverlayImpl& ov,
uint256 const& ledgerHash, LedgerIndex ledger,
PeerImp const* skip)
{
std::vector<std::shared_ptr<PeerImp>> v;
std::shared_ptr<PeerImp> ret;
int retScore = 0;
ov.for_each([&](std::shared_ptr<PeerImp> const& p)
{
if (p->hasLedger(ledgerHash, ledger) &&
p.get() != skip)
v.push_back(p);
{
auto score = p->getScore (true);
if (! ret || (score > retScore))
{
ret = p;
retScore = score;
}
}
});
return v;
return ret;
}
// VFALCO NOTE This function is way too big and cumbersome.
@@ -1791,19 +1867,17 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
p_journal_.debug <<
"GetLedger: Routing Tx set request";
auto const v = getPeersWithTree(
auto const v = getPeerWithTree(
overlay_, txHash, this);
if (v.empty())
if (! v)
{
p_journal_.info <<
"GetLedger: Route TX set failed";
return;
}
auto const& p =
v[rand () % v.size ()];
packet.set_requestcookie (id ());
p->send (std::make_shared<Message> (
v->send (std::make_shared<Message> (
packet, protocol::mtGET_LEDGER));
return;
}
@@ -1863,18 +1937,17 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.has_ledgerseq ())
seq = packet.ledgerseq ();
auto const v = getPeersWithLedger(
auto const v = getPeerWithLedger(
overlay_, ledgerhash, seq, this);
if (v.empty ())
if (! v)
{
p_journal_.trace <<
"GetLedger: Cannot route";
return;
}
auto const& p = v[rand () % v.size ()];
packet.set_requestcookie (id ());
p->send (std::make_shared<Message>(
v->send (std::make_shared<Message>(
packet, protocol::mtGET_LEDGER));
p_journal_.debug <<
"GetLedger: Request routed";
@@ -2088,4 +2161,35 @@ PeerImp::peerTXData (Job&, uint256 const& hash,
getApp().getInboundTransactions().gotData (hash, shared_from_this(), pPacket);
}
int
PeerImp::getScore (bool haveItem)
{
// Random component of score, used to break ties and avoid
// overloading the "best" peer
static const int spRandom = 10000;
// Score for being very likely to have the thing we are
// look for
static const int spHaveItem = 10000;
// Score reduction for each millisecond of latency
static const int spLatency = 100;
int score = rand() % spRandom;
if (haveItem)
score += spHaveItem;
std::chrono::milliseconds latency;
{
std::lock_guard<std::mutex> sl (recentLock_);
latency = latency_;
}
if (latency != std::chrono::milliseconds (-1))
score -= latency.count() * spLatency;
return score;
}
} // ripple

View File

@@ -139,6 +139,11 @@ private:
uint256 previousLedgerHash_;
std::deque<uint256> recentLedgers_;
std::deque<uint256> recentTxSets_;
std::chrono::milliseconds latency_ = std::chrono::milliseconds (-1);
std::uint64_t lastPingSeq_ = 0;
clock_type::time_point lastPingTime_;
mutable std::mutex recentLock_;
protocol::TMStatusChange last_status_;
protocol::TMHello hello_;
@@ -151,6 +156,7 @@ private:
beast::asio::streambuf write_buffer_;
std::queue<Message::pointer> send_queue_;
bool gracefulClose_ = false;
bool recent_empty_ = true;
std::unique_ptr <LoadEvent> load_event_;
std::unique_ptr<Validators::Connection> validatorsConnection_;
bool hopsAware_ = false;
@@ -297,6 +303,10 @@ public:
bool
hasRange (std::uint32_t uMin, std::uint32_t uMax) override;
// Called to determine our priority for querying
int
getScore (bool haveItem);
private:
void
close();
@@ -396,8 +406,6 @@ public:
void onMessage (std::shared_ptr <protocol::TMValidation> const& m);
void onMessage (std::shared_ptr <protocol::TMGetObjectByHash> const& m);
//--------------------------------------------------------------------------
private:
State state() const
{

View File

@@ -180,6 +180,7 @@ JSS ( issuer ); // in: RipplePathFind, Subscribe,
// out: paths/Node, STPathSet, STAmount
JSS ( key ); // out: WalletSeed
JSS ( key_type ); // in/out: WalletPropose, TransactionSign
JSS ( latency ); // out: PeerImp
JSS ( last ); // out: RPCVersion
JSS ( last_close ); // out: NetworkOPs
JSS ( ledger ); // in: NetworkOPs, LedgerCleaner,