Control query depth based on latency:

This changes TMGetLedger protocol in a backward-compatible way to include
a "query depth" parameter - the number of extra levels in the SHAMap tree
that a server should return in the corresponding TMLedgerData. Depending
on the value or absence of the field, a server may adjust the amount of
returned data based on the observed latency of the requestor: higher
latencies will return larger data sets (to compensate for greater
request/response turnaround times).
This commit is contained in:
JoelKatz
2015-04-29 13:09:16 -07:00
committed by Vinnie Falco
parent d44230b745
commit b1881e798b
10 changed files with 142 additions and 80 deletions

View File

@@ -41,6 +41,7 @@
#include <beast/weak_fn.h>
#include <boost/algorithm/string/predicate.hpp>
#include <boost/asio/io_service.hpp>
#include <algorithm>
#include <functional>
#include <beast/cxx14/memory.h> // <memory>
#include <sstream>
@@ -1833,7 +1834,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
protocol::TMGetLedger& packet = *m;
std::shared_ptr<SHAMap> map;
protocol::TMLedgerData reply;
bool fatLeaves = true, fatRoot = false;
bool fatLeaves = true;
if (packet.has_requestcookie ())
reply.set_requestcookie (packet.requestcookie ());
@@ -1843,13 +1844,14 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.itype () == protocol::liTS_CANDIDATE)
{
// Request is for a transaction candidate set
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: Tx candidate set";
if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32))
{
charge (Resource::feeInvalidRequest);
p_journal_.warning << "GetLedger: Tx candidate set invalid";
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Tx candidate set invalid";
return;
}
@@ -1862,14 +1864,14 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
{
if (packet.has_querytype () && !packet.has_requestcookie ())
{
p_journal_.debug <<
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Routing Tx set request";
auto const v = getPeerWithTree(
overlay_, txHash, this);
if (! v)
{
p_journal_.info <<
if (p_journal_.info) p_journal_.info <<
"GetLedger: Route TX set failed";
return;
}
@@ -1880,7 +1882,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
return;
}
p_journal_.debug <<
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Can't provide map ";
charge (Resource::feeInvalidRequest);
return;
@@ -1890,19 +1892,18 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
reply.set_ledgerhash (txHash.begin (), txHash.size ());
reply.set_type (protocol::liTS_CANDIDATE);
fatLeaves = false; // We'll already have most transactions
fatRoot = true; // Save a pass
}
else
{
if (getApp().getFeeTrack().isLoadedLocal() && ! cluster())
{
p_journal_.debug <<
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Too busy";
return;
}
// Figure out what ledger they want
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: Received";
Ledger::pointer ledger;
@@ -1913,7 +1914,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.ledgerhash ().size () != 32)
{
charge (Resource::feeInvalidRequest);
p_journal_.warning <<
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Invalid request";
return;
}
@@ -1923,8 +1924,8 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
logMe += to_string (ledgerhash);
ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash);
if (!ledger && p_journal_.trace)
p_journal_.trace <<
if (!ledger)
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: Don't have " << ledgerhash;
if (!ledger && (packet.has_querytype () &&
@@ -1939,7 +1940,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
overlay_, ledgerhash, seq, this);
if (! v)
{
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: Cannot route";
return;
}
@@ -1947,7 +1948,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
packet.set_requestcookie (id ());
v->send (std::make_shared<Message>(
packet, protocol::mtGET_LEDGER));
p_journal_.debug <<
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Request routed";
return;
}
@@ -1957,14 +1958,14 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.ledgerseq() <
getApp().getLedgerMaster().getEarliestFetch())
{
p_journal_.debug <<
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Early ledger request";
return;
}
ledger = getApp().getLedgerMaster ().getLedgerBySeq (
packet.ledgerseq ());
if (!ledger && p_journal_.debug)
p_journal_.debug <<
if (! ledger)
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Don't have " << packet.ledgerseq ();
}
else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT))
@@ -1982,7 +1983,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
else
{
charge (Resource::feeInvalidRequest);
p_journal_.warning <<
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Unknown request";
return;
}
@@ -1992,20 +1993,20 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
{
charge (Resource::feeInvalidRequest);
if (p_journal_.warning && ledger)
p_journal_.warning <<
if (ledger)
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Invalid sequence";
return;
}
if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() <
getApp().getLedgerMaster().getEarliestFetch()))
{
p_journal_.debug <<
"GetLedger: Early ledger request";
return;
}
if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() <
getApp().getLedgerMaster().getEarliestFetch()))
{
if (p_journal_.debug) p_journal_.debug <<
"GetLedger: Early ledger request";
return;
}
// Fill out the reply
uint256 lHash = ledger->getHash ();
@@ -2016,7 +2017,7 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (packet.itype () == protocol::liBASE)
{
// they want the ledger base data
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: Base data";
Serializer nData (128);
ledger->addRaw (nData);
@@ -2074,22 +2075,27 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (!map || (packet.nodeids_size () == 0))
{
p_journal_.warning <<
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Can't find map or empty request";
charge (Resource::feeInvalidRequest);
return;
}
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLeder: " << logMe;
auto const depth =
packet.has_querydepth() ?
(std::min(packet.querydepth(), 3u)) :
(isHighLatency() ? 2 : 1);
for (int i = 0; i < packet.nodeids ().size (); ++i)
{
SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
if (!mn.isValid ())
{
p_journal_.warning <<
if (p_journal_.warning) p_journal_.warning <<
"GetLedger: Invalid node " << logMe;
charge (Resource::feeInvalidRequest);
return;
@@ -2100,10 +2106,10 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
try
{
if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves))
if (map->getNodeFat (mn, nodeIDs, rawNodes, fatLeaves, depth))
{
assert (nodeIDs.size () == rawNodes.size ());
p_journal_.trace <<
if (p_journal_.trace) p_journal_.trace <<
"GetLedger: getNodeFat got " << rawNodes.size () << " nodes";
std::vector<SHAMapNodeID>::iterator nodeIDIterator;
std::vector< Blob >::iterator rawNodeIterator;
@@ -2141,11 +2147,15 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
if (!packet.has_ledgerhash ())
info += ", no hash specified";
p_journal_.warning <<
if (p_journal_.warning) p_journal_.warning <<
"getNodeFat( " << mn << ") throws exception: " << info;
}
}
if (p_journal_.info) p_journal_.info <<
"Got request for " << packet.nodeids().size() << " nodes at depth " <<
depth << ", return " << reply.nodes().size() << " nodes";
Message::pointer oPacket = std::make_shared<Message> (
reply, protocol::mtLEDGER_DATA);
send (oPacket);
@@ -2190,4 +2200,11 @@ PeerImp::getScore (bool haveItem)
return score;
}
bool
PeerImp::isHighLatency() const
{
std::lock_guard<std::mutex> sl (recentLock_);
return latency_.count() >= Tuning::peerHighLatency;
}
} // ripple