Tuning and cleanups for ledger fetching

* Track stats for ledger fetch and output
* Reduce entries queried on timeout
* Allow duplicate node requests on timeout
* Don't query deep on timeout
* Adjust latency tuning
* Change high latency cutoff
* Set absolute limit on reply entries
* Small optimizations
This commit is contained in:
David Schwartz
2015-10-07 13:06:59 -07:00
committed by Nik Bougalis
parent 61e5359231
commit fe89c74e3b
12 changed files with 104 additions and 60 deletions

View File

@@ -93,7 +93,9 @@ public:
// VFALCO TODO Make this the Listener / Observer pattern
bool addOnComplete (std::function<void (InboundLedger::pointer)>);
void trigger (Peer::ptr const&);
enum class TriggerReason { trAdded, trReply, trTimeout };
void trigger (Peer::ptr const&, TriggerReason);
bool tryLocal ();
void addPeers ();
bool checkLocal ();
@@ -109,7 +111,7 @@ public:
// VFALCO TODO Replace uint256 with something semanticallyh meaningful
void filterNodes (
std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>& nodeHashes,
int max, bool aggressive);
TriggerReason reason);
/** Return a Json::objectValue. */
Json::Value getJson (int);
@@ -125,7 +127,7 @@ private:
// For historical nodes, do not trigger too soon
// since a fetch pack is probably coming
if (mReason != fcHISTORY)
trigger (peer);
trigger (peer, TriggerReason::trAdded);
}
std::weak_ptr <PeerSet> pmDowncast ();
@@ -160,6 +162,8 @@ private:
std::set <uint256> mRecentNodes;
SHAMapAddNode mStats;
// Data we have received from peers
std::recursive_mutex mReceivedDataLock;
std::vector <PeerDataPairType> mReceivedData;

View File

@@ -49,7 +49,6 @@ public:
virtual void dropLedger (LedgerHash const& ledgerHash) = 0;
// VFALCO TODO Why is hash passed by value?
// VFALCO TODO Remove the dependency on the Peer object.
//
virtual bool gotLedgerData (LedgerHash const& ledgerHash,

View File

@@ -55,6 +55,15 @@ enum
// How many nodes to consider a fetch "small"
,fetchSmallNodes = 32
// Number of nodes to find initially
,missingNodesFind = 256
// Number of nodes to request for a reply
,reqNodesReply = 128
// Number of nodes to request blindly
,reqNodes = 8
};
InboundLedger::InboundLedger (
@@ -111,7 +120,15 @@ InboundLedger::~InboundLedger ()
if (entry.second->type () == protocol::liAS_NODE)
app_.getInboundLedgers().gotStaleData(entry.second);
}
if (! isDone())
{
JLOG (m_journal.debug) <<
"Acquire " << mHash << " abort " <<
((getTimeouts () == 0) ? std::string() :
(std::string ("timeouts:") +
to_string (getTimeouts ()) + " ")) <<
mStats.get ();
}
}
void InboundLedger::init (ScopedLockType& collectionLock)
@@ -291,7 +308,6 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
{
checkLocal();
mAggressive = true;
mByHash = true;
std::size_t pc = getPeerCount ();
@@ -304,10 +320,10 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&)
// otherwise, we need to trigger before we add
// so each peer gets triggered once
if (mReason != fcHISTORY)
trigger (Peer::ptr ());
trigger (Peer::ptr (), TriggerReason::trTimeout);
addPeers ();
if (mReason == fcHISTORY)
trigger (Peer::ptr ());
trigger (Peer::ptr (), TriggerReason::trTimeout);
}
}
@@ -350,8 +366,13 @@ void InboundLedger::done ()
mSignaled = true;
touch ();
if (m_journal.trace) m_journal.trace <<
"Done acquiring ledger " << mHash;
JLOG (m_journal.debug) <<
"Acquire " << mHash <<
(isFailed () ? " fail " : " ") <<
((getTimeouts () == 0) ? std::string() :
(std::string ("timeouts:") +
to_string (getTimeouts ()) + " ")) <<
mStats.get ();
assert (isComplete () || isFailed ());
@@ -391,7 +412,7 @@ bool InboundLedger::addOnComplete (
/** Request more nodes, perhaps from a specific peer
*/
void InboundLedger::trigger (Peer::ptr const& peer)
void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason)
{
ScopedLockType sl (mLock);
@@ -515,9 +536,16 @@ void InboundLedger::trigger (Peer::ptr const& peer)
if (mLedger)
tmGL.set_ledgerseq (mLedger->info().seq);
// If the peer has high latency, query extra deep
if (peer && peer->isHighLatency ())
if (reason != TriggerReason::trReply)
{
// If we're querying blind, don't query deep
tmGL.set_querydepth (0);
}
else if (peer && peer->isHighLatency ())
{
// If the peer has high latency, query extra deep
tmGL.set_querydepth (2);
}
else
tmGL.set_querydepth (1);
@@ -546,15 +574,14 @@ void InboundLedger::trigger (Peer::ptr const& peer)
{
std::vector<SHAMapNodeID> nodeIDs;
std::vector<uint256> nodeHashes;
// VFALCO Why 256? Make this a constant
nodeIDs.reserve (256);
nodeHashes.reserve (256);
nodeIDs.reserve (missingNodesFind);
nodeHashes.reserve (missingNodesFind);
AccountStateSF filter(app_);
// Release the lock while we process the large state map
sl.unlock();
mLedger->stateMap().getMissingNodes (
nodeIDs, nodeHashes, 256, &filter);
nodeIDs, nodeHashes, missingNodesFind, &filter);
sl.lock();
// Make sure nothing happened while we released the lock
@@ -574,9 +601,7 @@ void InboundLedger::trigger (Peer::ptr const& peer)
}
else
{
// VFALCO Why 128? Make this a constant
if (!mAggressive)
filterNodes (nodeIDs, nodeHashes, 128, !isProgress ());
filterNodes (nodeIDs, nodeHashes, reason);
if (!nodeIDs.empty ())
{
@@ -631,11 +656,11 @@ void InboundLedger::trigger (Peer::ptr const& peer)
{
std::vector<SHAMapNodeID> nodeIDs;
std::vector<uint256> nodeHashes;
nodeIDs.reserve (256);
nodeHashes.reserve (256);
nodeIDs.reserve (missingNodesFind);
nodeHashes.reserve (missingNodesFind);
TransactionStateSF filter(app_);
mLedger->txMap().getMissingNodes (
nodeIDs, nodeHashes, 256, &filter);
nodeIDs, nodeHashes, missingNodesFind, &filter);
if (nodeIDs.empty ())
{
@@ -651,8 +676,7 @@ void InboundLedger::trigger (Peer::ptr const& peer)
}
else
{
if (!mAggressive)
filterNodes (nodeIDs, nodeHashes, 128, !isProgress ());
filterNodes (nodeIDs, nodeHashes, reason);
if (!nodeIDs.empty ())
{
@@ -687,11 +711,16 @@ void InboundLedger::trigger (Peer::ptr const& peer)
}
void InboundLedger::filterNodes (std::vector<SHAMapNodeID>& nodeIDs,
std::vector<uint256>& nodeHashes, int max, bool aggressive)
std::vector<uint256>& nodeHashes, TriggerReason reason)
{
// ask for new nodes in preference to ones we've already asked for
assert (nodeIDs.size () == nodeHashes.size ());
int const max = (reason == TriggerReason::trReply) ?
reqNodesReply : reqNodes;
bool const aggressive =
(reason == TriggerReason::trTimeout);
std::vector<bool> duplicates;
duplicates.reserve (nodeIDs.size ());
@@ -711,7 +740,9 @@ void InboundLedger::filterNodes (std::vector<SHAMapNodeID>& nodeIDs,
if (dupCount == nodeIDs.size ())
{
// all duplicates
if (!aggressive)
// we don't want to send any query at all
// except on a timeout, where we need to query everyone
if (! aggressive)
{
nodeIDs.clear ();
nodeHashes.clear ();
@@ -735,7 +766,8 @@ void InboundLedger::filterNodes (std::vector<SHAMapNodeID>& nodeIDs,
nodeHashes[insertPoint] = nodeHashes[i];
}
++insertPoint;
if (++insertPoint >= max)
break;
}
if (m_journal.trace) m_journal.trace <<
@@ -791,8 +823,6 @@ bool InboundLedger::takeHeader (std::string const& data)
app_.getNodeStore ().store (
hotLEDGER, std::move (s.modData ()), mHash);
progress ();
if (mLedger->info().txHash.isZero ())
mHaveTransactions = true;
@@ -859,7 +889,6 @@ bool InboundLedger::takeTxNode (const std::vector<SHAMapNodeID>& nodeIDs,
}
}
progress ();
return true;
}
@@ -934,7 +963,6 @@ bool InboundLedger::takeAsNode (const std::vector<SHAMapNodeID>& nodeIDs,
}
}
progress ();
return true;
}
@@ -952,7 +980,6 @@ bool InboundLedger::takeAsRootNode (Blob const& data, SHAMapAddNode& san)
if (!mHaveHeader)
{
assert(false);
san.incInvalid();
return false;
}
@@ -976,7 +1003,6 @@ bool InboundLedger::takeTxRootNode (Blob const& data, SHAMapAddNode& san)
if (!mHaveHeader)
{
assert(false);
san.incInvalid();
return false;
}
@@ -1031,6 +1057,9 @@ bool InboundLedger::gotData (std::weak_ptr<Peer> peer,
{
ScopedLockType sl (mReceivedDataLock);
if (isDone ())
return false;
mReceivedData.push_back (PeerDataPairType (peer, data));
if (mReceiveDispatched)
@@ -1093,12 +1122,10 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
"Included TX root invalid";
}
if (!san.isInvalid ())
if (san.isUseful ())
progress ();
else
if (m_journal.debug) m_journal.debug <<
"Peer sends invalid base data";
mStats += san;
return san.getGood ();
}
@@ -1136,28 +1163,26 @@ int InboundLedger::processData (std::shared_ptr<Peer> peer,
node.nodedata ().end ()));
}
SHAMapAddNode ret;
SHAMapAddNode san;
if (packet.type () == protocol::liTX_NODE)
{
takeTxNode (nodeIDs, nodeData, ret);
takeTxNode (nodeIDs, nodeData, san);
if (m_journal.debug) m_journal.debug <<
"Ledger TX node stats: " << ret.get();
"Ledger TX node stats: " << san.get();
}
else
{
takeAsNode (nodeIDs, nodeData, ret);
takeAsNode (nodeIDs, nodeData, san);
if (m_journal.debug) m_journal.debug <<
"Ledger AS node stats: " << ret.get();
"Ledger AS node stats: " << san.get();
}
if (!ret.isInvalid ())
if (san.isUseful ())
progress ();
else
if (m_journal.debug) m_journal.debug <<
"Peer sends invalid node data";
return ret.getGood ();
mStats += san;
return san.getGood ();
}
return -1;
@@ -1196,8 +1221,8 @@ void InboundLedger::runData ()
int count = processData (peer, *(entry.second));
if (count > chosenPeerCount)
{
chosenPeer = peer;
chosenPeerCount = count;
chosenPeer = std::move (peer);
}
}
}
@@ -1205,7 +1230,7 @@ void InboundLedger::runData ()
} while (1);
if (chosenPeer)
trigger (chosenPeer);
trigger (chosenPeer, TriggerReason::trReply);
}
Json::Value InboundLedger::getJson (int)

View File

@@ -144,7 +144,6 @@ public:
*/
// means "We got some data from an inbound ledger"
// VFALCO TODO Why is hash passed by value?
// VFALCO TODO Remove the dependency on the Peer object.
/** We received a TMLedgerData from a peer.
*/

View File

@@ -285,6 +285,9 @@ public:
if (!nodeLedger)
{
m_journal.debug << "Ledger " << ledgerIndex << " not available";
app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire(
ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
return false;
}
@@ -309,6 +312,7 @@ public:
if (doNodes && !nodeLedger->walkLedger(app_.journal ("Ledger")))
{
m_journal.debug << "Ledger " << ledgerIndex << " is missing nodes";
app_.getLedgerMaster().clearLedger (ledgerIndex);
app_.getInboundLedgers().acquire(
ledgerHash, ledgerIndex, InboundLedger::fcGENERIC);
return false;

View File

@@ -81,7 +81,6 @@ public:
void progress ()
{
mProgress = true;
mAggressive = false;
}
void touch ()
@@ -167,7 +166,6 @@ protected:
int mTimeouts;
bool mComplete;
bool mFailed;
bool mAggressive;
bool mTxnData;
clock_type::time_point mLastAction;
bool mProgress;

View File

@@ -2157,7 +2157,9 @@ PeerImp::getLedger (std::shared_ptr<protocol::TMGetLedger> const& m)
(std::min(packet.querydepth(), 3u)) :
(isHighLatency() ? 2 : 1);
for (int i = 0; i < packet.nodeids ().size (); ++i)
for (int i = 0;
(i < packet.nodeids().size() &&
(reply.nodes().size() < Tuning::maxReplyNodes)); ++i)
{
SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ());
@@ -2248,10 +2250,17 @@ PeerImp::getScore (bool haveItem) const
// Score for being very likely to have the thing we are
// look for
// Should be roughly spRandom
static const int spHaveItem = 10000;
// Score reduction for each millisecond of latency
static const int spLatency = 100;
// Should be roughly spRandom divided by
// the maximum reasonable latency
static const int spLatency = 30;
// Penalty for unknown latency
// Should be roughly spRandom
static const int spNoLatency = 8000;
int score = rand() % spRandom;
@@ -2266,6 +2275,8 @@ PeerImp::getScore (bool haveItem) const
}
if (latency != std::chrono::milliseconds (-1))
score -= latency.count() * spLatency;
else
score -= spNoLatency;
return score;
}

View File

@@ -45,7 +45,6 @@ PeerSet::PeerSet (Application& app, uint256 const& hash, int interval, bool txnD
, mTimeouts (0)
, mComplete (false)
, mFailed (false)
, mAggressive (false)
, mTxnData (txnData)
, mProgress (false)
, mTimer (app_.getIOService ())

View File

@@ -46,9 +46,13 @@ enum
consider it insane */
insaneLedgerLimit = 128,
/** The maximum number of ledger entries in a single
reply */
maxReplyNodes = 8192,
/** How many milliseconds to consider high latency
on a peer connection */
peerHighLatency = 120,
peerHighLatency = 250,
/** How often we check connections (seconds) */
checkSeconds = 10,

View File

@@ -117,7 +117,8 @@ SHAMapNodeID
SHAMapNodeID::getParentNodeID () const
{
assert (mDepth);
return SHAMapNodeID (mDepth - 1, mNodeID);
return SHAMapNodeID (mDepth - 1,
mNodeID & Masks (mDepth - 1));
}
inline

View File

@@ -61,7 +61,7 @@ SHAMapNodeID::SHAMapNodeID (int depth, uint256 const& hash)
: mNodeID (hash), mDepth (depth)
{
assert ((depth >= 0) && (depth < 65));
mNodeID &= Masks(depth);
assert (mNodeID == (mNodeID & Masks(depth)));
}
SHAMapNodeID::SHAMapNodeID (void const* ptr, int len)

View File

@@ -207,7 +207,7 @@ SHAMap::getMissingNodes(std::vector<SHAMapNodeID>& nodeIDs, std::vector<uint256>
!static_cast<SHAMapInnerNode*>(d)->isFullBelow(generation))
{
stack.push (std::make_tuple (node, nodeID,
firstChild, currentChild, fullBelow));
firstChild, currentChild, fullBelow));
// Switch to processing the child node
node = static_cast<SHAMapInnerNode*>(d);