diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 45883abbf7..e76b7a2bbd 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -93,7 +93,9 @@ public: // VFALCO TODO Make this the Listener / Observer pattern bool addOnComplete (std::function); - void trigger (Peer::ptr const&); + enum class TriggerReason { trAdded, trReply, trTimeout }; + void trigger (Peer::ptr const&, TriggerReason); + bool tryLocal (); void addPeers (); bool checkLocal (); @@ -109,7 +111,7 @@ public: // VFALCO TODO Replace uint256 with something semanticallyh meaningful void filterNodes ( std::vector& nodeIDs, std::vector& nodeHashes, - int max, bool aggressive); + TriggerReason reason); /** Return a Json::objectValue. */ Json::Value getJson (int); @@ -125,7 +127,7 @@ private: // For historical nodes, do not trigger too soon // since a fetch pack is probably coming if (mReason != fcHISTORY) - trigger (peer); + trigger (peer, TriggerReason::trAdded); } std::weak_ptr pmDowncast (); @@ -160,6 +162,8 @@ private: std::set mRecentNodes; + SHAMapAddNode mStats; + // Data we have received from peers std::recursive_mutex mReceivedDataLock; std::vector mReceivedData; diff --git a/src/ripple/app/ledger/InboundLedgers.h b/src/ripple/app/ledger/InboundLedgers.h index f74330eede..0823c7a8ec 100644 --- a/src/ripple/app/ledger/InboundLedgers.h +++ b/src/ripple/app/ledger/InboundLedgers.h @@ -49,7 +49,6 @@ public: virtual void dropLedger (LedgerHash const& ledgerHash) = 0; - // VFALCO TODO Why is hash passed by value? // VFALCO TODO Remove the dependency on the Peer object. // virtual bool gotLedgerData (LedgerHash const& ledgerHash, diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index c010c3ea6e..c0862ab470 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -55,6 +55,15 @@ enum // How many nodes to consider a fetch "small" ,fetchSmallNodes = 32 + + // Number of nodes to find initially + ,missingNodesFind = 256 + + // Number of nodes to request for a reply + ,reqNodesReply = 128 + + // Number of nodes to request blindly + ,reqNodes = 8 }; InboundLedger::InboundLedger ( @@ -111,7 +120,15 @@ InboundLedger::~InboundLedger () if (entry.second->type () == protocol::liAS_NODE) app_.getInboundLedgers().gotStaleData(entry.second); } - + if (! isDone()) + { + JLOG (m_journal.debug) << + "Acquire " << mHash << " abort " << + ((getTimeouts () == 0) ? std::string() : + (std::string ("timeouts:") + + to_string (getTimeouts ()) + " ")) << + mStats.get (); + } } void InboundLedger::init (ScopedLockType& collectionLock) @@ -291,7 +308,6 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) { checkLocal(); - mAggressive = true; mByHash = true; std::size_t pc = getPeerCount (); @@ -304,10 +320,10 @@ void InboundLedger::onTimer (bool wasProgress, ScopedLockType&) // otherwise, we need to trigger before we add // so each peer gets triggered once if (mReason != fcHISTORY) - trigger (Peer::ptr ()); + trigger (Peer::ptr (), TriggerReason::trTimeout); addPeers (); if (mReason == fcHISTORY) - trigger (Peer::ptr ()); + trigger (Peer::ptr (), TriggerReason::trTimeout); } } @@ -350,8 +366,13 @@ void InboundLedger::done () mSignaled = true; touch (); - if (m_journal.trace) m_journal.trace << - "Done acquiring ledger " << mHash; + JLOG (m_journal.debug) << + "Acquire " << mHash << + (isFailed () ? " fail " : " ") << + ((getTimeouts () == 0) ? std::string() : + (std::string ("timeouts:") + + to_string (getTimeouts ()) + " ")) << + mStats.get (); assert (isComplete () || isFailed ()); @@ -391,7 +412,7 @@ bool InboundLedger::addOnComplete ( /** Request more nodes, perhaps from a specific peer */ -void InboundLedger::trigger (Peer::ptr const& peer) +void InboundLedger::trigger (Peer::ptr const& peer, TriggerReason reason) { ScopedLockType sl (mLock); @@ -515,9 +536,16 @@ void InboundLedger::trigger (Peer::ptr const& peer) if (mLedger) tmGL.set_ledgerseq (mLedger->info().seq); - // If the peer has high latency, query extra deep - if (peer && peer->isHighLatency ()) + if (reason != TriggerReason::trReply) + { + // If we're querying blind, don't query deep + tmGL.set_querydepth (0); + } + else if (peer && peer->isHighLatency ()) + { + // If the peer has high latency, query extra deep tmGL.set_querydepth (2); + } else tmGL.set_querydepth (1); @@ -546,15 +574,14 @@ void InboundLedger::trigger (Peer::ptr const& peer) { std::vector nodeIDs; std::vector nodeHashes; - // VFALCO Why 256? Make this a constant - nodeIDs.reserve (256); - nodeHashes.reserve (256); + nodeIDs.reserve (missingNodesFind); + nodeHashes.reserve (missingNodesFind); AccountStateSF filter(app_); // Release the lock while we process the large state map sl.unlock(); mLedger->stateMap().getMissingNodes ( - nodeIDs, nodeHashes, 256, &filter); + nodeIDs, nodeHashes, missingNodesFind, &filter); sl.lock(); // Make sure nothing happened while we released the lock @@ -574,9 +601,7 @@ void InboundLedger::trigger (Peer::ptr const& peer) } else { - // VFALCO Why 128? Make this a constant - if (!mAggressive) - filterNodes (nodeIDs, nodeHashes, 128, !isProgress ()); + filterNodes (nodeIDs, nodeHashes, reason); if (!nodeIDs.empty ()) { @@ -631,11 +656,11 @@ void InboundLedger::trigger (Peer::ptr const& peer) { std::vector nodeIDs; std::vector nodeHashes; - nodeIDs.reserve (256); - nodeHashes.reserve (256); + nodeIDs.reserve (missingNodesFind); + nodeHashes.reserve (missingNodesFind); TransactionStateSF filter(app_); mLedger->txMap().getMissingNodes ( - nodeIDs, nodeHashes, 256, &filter); + nodeIDs, nodeHashes, missingNodesFind, &filter); if (nodeIDs.empty ()) { @@ -651,8 +676,7 @@ void InboundLedger::trigger (Peer::ptr const& peer) } else { - if (!mAggressive) - filterNodes (nodeIDs, nodeHashes, 128, !isProgress ()); + filterNodes (nodeIDs, nodeHashes, reason); if (!nodeIDs.empty ()) { @@ -687,11 +711,16 @@ void InboundLedger::trigger (Peer::ptr const& peer) } void InboundLedger::filterNodes (std::vector& nodeIDs, - std::vector& nodeHashes, int max, bool aggressive) + std::vector& nodeHashes, TriggerReason reason) { // ask for new nodes in preference to ones we've already asked for assert (nodeIDs.size () == nodeHashes.size ()); + int const max = (reason == TriggerReason::trReply) ? + reqNodesReply : reqNodes; + bool const aggressive = + (reason == TriggerReason::trTimeout); + std::vector duplicates; duplicates.reserve (nodeIDs.size ()); @@ -711,7 +740,9 @@ void InboundLedger::filterNodes (std::vector& nodeIDs, if (dupCount == nodeIDs.size ()) { // all duplicates - if (!aggressive) + // we don't want to send any query at all + // except on a timeout, where we need to query everyone + if (! aggressive) { nodeIDs.clear (); nodeHashes.clear (); @@ -735,7 +766,8 @@ void InboundLedger::filterNodes (std::vector& nodeIDs, nodeHashes[insertPoint] = nodeHashes[i]; } - ++insertPoint; + if (++insertPoint >= max) + break; } if (m_journal.trace) m_journal.trace << @@ -791,8 +823,6 @@ bool InboundLedger::takeHeader (std::string const& data) app_.getNodeStore ().store ( hotLEDGER, std::move (s.modData ()), mHash); - progress (); - if (mLedger->info().txHash.isZero ()) mHaveTransactions = true; @@ -859,7 +889,6 @@ bool InboundLedger::takeTxNode (const std::vector& nodeIDs, } } - progress (); return true; } @@ -934,7 +963,6 @@ bool InboundLedger::takeAsNode (const std::vector& nodeIDs, } } - progress (); return true; } @@ -952,7 +980,6 @@ bool InboundLedger::takeAsRootNode (Blob const& data, SHAMapAddNode& san) if (!mHaveHeader) { assert(false); - san.incInvalid(); return false; } @@ -976,7 +1003,6 @@ bool InboundLedger::takeTxRootNode (Blob const& data, SHAMapAddNode& san) if (!mHaveHeader) { assert(false); - san.incInvalid(); return false; } @@ -1031,6 +1057,9 @@ bool InboundLedger::gotData (std::weak_ptr peer, { ScopedLockType sl (mReceivedDataLock); + if (isDone ()) + return false; + mReceivedData.push_back (PeerDataPairType (peer, data)); if (mReceiveDispatched) @@ -1093,12 +1122,10 @@ int InboundLedger::processData (std::shared_ptr peer, "Included TX root invalid"; } - if (!san.isInvalid ()) + if (san.isUseful ()) progress (); - else - if (m_journal.debug) m_journal.debug << - "Peer sends invalid base data"; + mStats += san; return san.getGood (); } @@ -1136,28 +1163,26 @@ int InboundLedger::processData (std::shared_ptr peer, node.nodedata ().end ())); } - SHAMapAddNode ret; + SHAMapAddNode san; if (packet.type () == protocol::liTX_NODE) { - takeTxNode (nodeIDs, nodeData, ret); + takeTxNode (nodeIDs, nodeData, san); if (m_journal.debug) m_journal.debug << - "Ledger TX node stats: " << ret.get(); + "Ledger TX node stats: " << san.get(); } else { - takeAsNode (nodeIDs, nodeData, ret); + takeAsNode (nodeIDs, nodeData, san); if (m_journal.debug) m_journal.debug << - "Ledger AS node stats: " << ret.get(); + "Ledger AS node stats: " << san.get(); } - if (!ret.isInvalid ()) + if (san.isUseful ()) progress (); - else - if (m_journal.debug) m_journal.debug << - "Peer sends invalid node data"; - return ret.getGood (); + mStats += san; + return san.getGood (); } return -1; @@ -1196,8 +1221,8 @@ void InboundLedger::runData () int count = processData (peer, *(entry.second)); if (count > chosenPeerCount) { - chosenPeer = peer; chosenPeerCount = count; + chosenPeer = std::move (peer); } } } @@ -1205,7 +1230,7 @@ void InboundLedger::runData () } while (1); if (chosenPeer) - trigger (chosenPeer); + trigger (chosenPeer, TriggerReason::trReply); } Json::Value InboundLedger::getJson (int) diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 89d795b09f..2639a08d6f 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -144,7 +144,6 @@ public: */ // means "We got some data from an inbound ledger" - // VFALCO TODO Why is hash passed by value? // VFALCO TODO Remove the dependency on the Peer object. /** We received a TMLedgerData from a peer. */ diff --git a/src/ripple/app/ledger/impl/LedgerCleaner.cpp b/src/ripple/app/ledger/impl/LedgerCleaner.cpp index 19c56a2943..d8e51f6ac0 100644 --- a/src/ripple/app/ledger/impl/LedgerCleaner.cpp +++ b/src/ripple/app/ledger/impl/LedgerCleaner.cpp @@ -285,6 +285,9 @@ public: if (!nodeLedger) { m_journal.debug << "Ledger " << ledgerIndex << " not available"; + app_.getLedgerMaster().clearLedger (ledgerIndex); + app_.getInboundLedgers().acquire( + ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); return false; } @@ -309,6 +312,7 @@ public: if (doNodes && !nodeLedger->walkLedger(app_.journal ("Ledger"))) { m_journal.debug << "Ledger " << ledgerIndex << " is missing nodes"; + app_.getLedgerMaster().clearLedger (ledgerIndex); app_.getInboundLedgers().acquire( ledgerHash, ledgerIndex, InboundLedger::fcGENERIC); return false; diff --git a/src/ripple/overlay/PeerSet.h b/src/ripple/overlay/PeerSet.h index 0a8dba3738..6eb62aef83 100644 --- a/src/ripple/overlay/PeerSet.h +++ b/src/ripple/overlay/PeerSet.h @@ -81,7 +81,6 @@ public: void progress () { mProgress = true; - mAggressive = false; } void touch () @@ -167,7 +166,6 @@ protected: int mTimeouts; bool mComplete; bool mFailed; - bool mAggressive; bool mTxnData; clock_type::time_point mLastAction; bool mProgress; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 1074a47b8e..65ebd3227a 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -2157,7 +2157,9 @@ PeerImp::getLedger (std::shared_ptr const& m) (std::min(packet.querydepth(), 3u)) : (isHighLatency() ? 2 : 1); - for (int i = 0; i < packet.nodeids ().size (); ++i) + for (int i = 0; + (i < packet.nodeids().size() && + (reply.nodes().size() < Tuning::maxReplyNodes)); ++i) { SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ()); @@ -2248,10 +2250,17 @@ PeerImp::getScore (bool haveItem) const // Score for being very likely to have the thing we are // look for + // Should be roughly spRandom static const int spHaveItem = 10000; // Score reduction for each millisecond of latency - static const int spLatency = 100; + // Should be roughly spRandom divided by + // the maximum reasonable latency + static const int spLatency = 30; + + // Penalty for unknown latency + // Should be roughly spRandom + static const int spNoLatency = 8000; int score = rand() % spRandom; @@ -2266,6 +2275,8 @@ PeerImp::getScore (bool haveItem) const } if (latency != std::chrono::milliseconds (-1)) score -= latency.count() * spLatency; + else + score -= spNoLatency; return score; } diff --git a/src/ripple/overlay/impl/PeerSet.cpp b/src/ripple/overlay/impl/PeerSet.cpp index 5cedee1587..fd3ad7c3d6 100644 --- a/src/ripple/overlay/impl/PeerSet.cpp +++ b/src/ripple/overlay/impl/PeerSet.cpp @@ -45,7 +45,6 @@ PeerSet::PeerSet (Application& app, uint256 const& hash, int interval, bool txnD , mTimeouts (0) , mComplete (false) , mFailed (false) - , mAggressive (false) , mTxnData (txnData) , mProgress (false) , mTimer (app_.getIOService ()) diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 58c88f7eb6..76e1f1b7a3 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -46,9 +46,13 @@ enum consider it insane */ insaneLedgerLimit = 128, + /** The maximum number of ledger entries in a single + reply */ + maxReplyNodes = 8192, + /** How many milliseconds to consider high latency on a peer connection */ - peerHighLatency = 120, + peerHighLatency = 250, /** How often we check connections (seconds) */ checkSeconds = 10, diff --git a/src/ripple/shamap/SHAMapNodeID.h b/src/ripple/shamap/SHAMapNodeID.h index e98b0b7778..715c5d5e8d 100644 --- a/src/ripple/shamap/SHAMapNodeID.h +++ b/src/ripple/shamap/SHAMapNodeID.h @@ -117,7 +117,8 @@ SHAMapNodeID SHAMapNodeID::getParentNodeID () const { assert (mDepth); - return SHAMapNodeID (mDepth - 1, mNodeID); + return SHAMapNodeID (mDepth - 1, + mNodeID & Masks (mDepth - 1)); } inline diff --git a/src/ripple/shamap/impl/SHAMapNodeID.cpp b/src/ripple/shamap/impl/SHAMapNodeID.cpp index cdcbd0ce1a..0e3547f802 100644 --- a/src/ripple/shamap/impl/SHAMapNodeID.cpp +++ b/src/ripple/shamap/impl/SHAMapNodeID.cpp @@ -61,7 +61,7 @@ SHAMapNodeID::SHAMapNodeID (int depth, uint256 const& hash) : mNodeID (hash), mDepth (depth) { assert ((depth >= 0) && (depth < 65)); - mNodeID &= Masks(depth); + assert (mNodeID == (mNodeID & Masks(depth))); } SHAMapNodeID::SHAMapNodeID (void const* ptr, int len) diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index d004912000..fb1ecb45ea 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -207,7 +207,7 @@ SHAMap::getMissingNodes(std::vector& nodeIDs, std::vector !static_cast(d)->isFullBelow(generation)) { stack.push (std::make_tuple (node, nodeID, - firstChild, currentChild, fullBelow)); + firstChild, currentChild, fullBelow)); // Switch to processing the child node node = static_cast(d);