diff --git a/src/ripple/app/ledger/InboundLedger.cpp b/src/ripple/app/ledger/InboundLedger.cpp index a9c2b7fa3..8aeb6ccbc 100644 --- a/src/ripple/app/ledger/InboundLedger.cpp +++ b/src/ripple/app/ledger/InboundLedger.cpp @@ -52,6 +52,9 @@ enum // how many timeouts before we get aggressive ,ledgerBecomeAggressiveThreshold = 6 + + // How many nodes to consider a fetch "small" + ,fetchSmallNodes = 32 }; InboundLedger::InboundLedger (uint256 const& hash, std::uint32_t seq, fcReason reason, @@ -505,6 +508,12 @@ void InboundLedger::trigger (Peer::ptr const& peer) if (mLedger) tmGL.set_ledgerseq (mLedger->getLedgerSeq ()); + // If the peer has high latency, query extra deep + if (peer && peer->isHighLatency ()) + tmGL.set_querydepth (2); + else + tmGL.set_querydepth (1); + // Get the state data first because it's the most likely to be useful // if we wind up abandoning this fetch. if (mHaveHeader && !mHaveState && !mFailed) @@ -568,6 +577,12 @@ void InboundLedger::trigger (Peer::ptr const& peer) { * (tmGL.add_nodeids ()) = id.getRawString (); } + + // If we're not querying for a lot of entries, + // query extra deep + if (nodeIDs.size() <= fetchSmallNodes) + tmGL.set_querydepth (tmGL.querydepth() + 1); + if (m_journal.trace) m_journal.trace << "Sending AS node " << nodeIDs.size () << " request to " << ( diff --git a/src/ripple/app/tx/TransactionAcquire.cpp b/src/ripple/app/tx/TransactionAcquire.cpp index e80b25a7e..73aa0d047 100644 --- a/src/ripple/app/tx/TransactionAcquire.cpp +++ b/src/ripple/app/tx/TransactionAcquire.cpp @@ -124,6 +124,7 @@ void TransactionAcquire::trigger (Peer::ptr const& peer) protocol::TMGetLedger tmGL; tmGL.set_ledgerhash (mHash.begin (), mHash.size ()); tmGL.set_itype (protocol::liTS_CANDIDATE); + tmGL.set_querydepth (3); // We probably need the whole thing if (getTimeouts () != 0) tmGL.set_querytype (protocol::qtINDIRECT); diff --git a/src/ripple/overlay/Peer.h b/src/ripple/overlay/Peer.h index f60bac171..e97045a4a 100644 --- a/src/ripple/overlay/Peer.h +++ b/src/ripple/overlay/Peer.h @@ -77,6 +77,10 @@ public: bool cluster() const = 0; + virtual + bool + isHighLatency() const = 0; + virtual RippleAddress const& getNodePublic() const = 0; diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 8cce3d351..d39239513 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -41,6 +41,7 @@ #include #include #include +#include #include #include // #include @@ -1833,7 +1834,7 @@ PeerImp::getLedger (std::shared_ptr const& m) protocol::TMGetLedger& packet = *m; std::shared_ptr map; protocol::TMLedgerData reply; - bool fatLeaves = true, fatRoot = false; + bool fatLeaves = true; if (packet.has_requestcookie ()) reply.set_requestcookie (packet.requestcookie ()); @@ -1843,13 +1844,14 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.itype () == protocol::liTS_CANDIDATE) { // Request is for a transaction candidate set - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLedger: Tx candidate set"; if ((!packet.has_ledgerhash () || packet.ledgerhash ().size () != 32)) { charge (Resource::feeInvalidRequest); - p_journal_.warning << "GetLedger: Tx candidate set invalid"; + if (p_journal_.warning) p_journal_.warning << + "GetLedger: Tx candidate set invalid"; return; } @@ -1862,14 +1864,14 @@ PeerImp::getLedger (std::shared_ptr const& m) { if (packet.has_querytype () && !packet.has_requestcookie ()) { - p_journal_.debug << + if (p_journal_.debug) p_journal_.debug << "GetLedger: Routing Tx set request"; auto const v = getPeerWithTree( overlay_, txHash, this); if (! v) { - p_journal_.info << + if (p_journal_.info) p_journal_.info << "GetLedger: Route TX set failed"; return; } @@ -1880,7 +1882,7 @@ PeerImp::getLedger (std::shared_ptr const& m) return; } - p_journal_.debug << + if (p_journal_.debug) p_journal_.debug << "GetLedger: Can't provide map "; charge (Resource::feeInvalidRequest); return; @@ -1890,19 +1892,18 @@ PeerImp::getLedger (std::shared_ptr const& m) reply.set_ledgerhash (txHash.begin (), txHash.size ()); reply.set_type (protocol::liTS_CANDIDATE); fatLeaves = false; // We'll already have most transactions - fatRoot = true; // Save a pass } else { if (getApp().getFeeTrack().isLoadedLocal() && ! cluster()) { - p_journal_.debug << + if (p_journal_.debug) p_journal_.debug << "GetLedger: Too busy"; return; } // Figure out what ledger they want - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLedger: Received"; Ledger::pointer ledger; @@ -1913,7 +1914,7 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.ledgerhash ().size () != 32) { charge (Resource::feeInvalidRequest); - p_journal_.warning << + if (p_journal_.warning) p_journal_.warning << "GetLedger: Invalid request"; return; } @@ -1923,8 +1924,8 @@ PeerImp::getLedger (std::shared_ptr const& m) logMe += to_string (ledgerhash); ledger = getApp().getLedgerMaster ().getLedgerByHash (ledgerhash); - if (!ledger && p_journal_.trace) - p_journal_.trace << + if (!ledger) + if (p_journal_.trace) p_journal_.trace << "GetLedger: Don't have " << ledgerhash; if (!ledger && (packet.has_querytype () && @@ -1939,7 +1940,7 @@ PeerImp::getLedger (std::shared_ptr const& m) overlay_, ledgerhash, seq, this); if (! v) { - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLedger: Cannot route"; return; } @@ -1947,7 +1948,7 @@ PeerImp::getLedger (std::shared_ptr const& m) packet.set_requestcookie (id ()); v->send (std::make_shared( packet, protocol::mtGET_LEDGER)); - p_journal_.debug << + if (p_journal_.debug) p_journal_.debug << "GetLedger: Request routed"; return; } @@ -1957,14 +1958,14 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.ledgerseq() < getApp().getLedgerMaster().getEarliestFetch()) { - p_journal_.debug << + if (p_journal_.debug) p_journal_.debug << "GetLedger: Early ledger request"; return; } ledger = getApp().getLedgerMaster ().getLedgerBySeq ( packet.ledgerseq ()); - if (!ledger && p_journal_.debug) - p_journal_.debug << + if (! ledger) + if (p_journal_.debug) p_journal_.debug << "GetLedger: Don't have " << packet.ledgerseq (); } else if (packet.has_ltype () && (packet.ltype () == protocol::ltCURRENT)) @@ -1982,7 +1983,7 @@ PeerImp::getLedger (std::shared_ptr const& m) else { charge (Resource::feeInvalidRequest); - p_journal_.warning << + if (p_journal_.warning) p_journal_.warning << "GetLedger: Unknown request"; return; } @@ -1992,20 +1993,20 @@ PeerImp::getLedger (std::shared_ptr const& m) { charge (Resource::feeInvalidRequest); - if (p_journal_.warning && ledger) - p_journal_.warning << + if (ledger) + if (p_journal_.warning) p_journal_.warning << "GetLedger: Invalid sequence"; return; } - if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < - getApp().getLedgerMaster().getEarliestFetch())) - { - p_journal_.debug << - "GetLedger: Early ledger request"; - return; - } + if (!packet.has_ledgerseq() && (ledger->getLedgerSeq() < + getApp().getLedgerMaster().getEarliestFetch())) + { + if (p_journal_.debug) p_journal_.debug << + "GetLedger: Early ledger request"; + return; + } // Fill out the reply uint256 lHash = ledger->getHash (); @@ -2016,7 +2017,7 @@ PeerImp::getLedger (std::shared_ptr const& m) if (packet.itype () == protocol::liBASE) { // they want the ledger base data - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLedger: Base data"; Serializer nData (128); ledger->addRaw (nData); @@ -2074,22 +2075,27 @@ PeerImp::getLedger (std::shared_ptr const& m) if (!map || (packet.nodeids_size () == 0)) { - p_journal_.warning << + if (p_journal_.warning) p_journal_.warning << "GetLedger: Can't find map or empty request"; charge (Resource::feeInvalidRequest); return; } - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLeder: " << logMe; + auto const depth = + packet.has_querydepth() ? + (std::min(packet.querydepth(), 3u)) : + (isHighLatency() ? 2 : 1); + for (int i = 0; i < packet.nodeids ().size (); ++i) { SHAMapNodeID mn (packet.nodeids (i).data (), packet.nodeids (i).size ()); if (!mn.isValid ()) { - p_journal_.warning << + if (p_journal_.warning) p_journal_.warning << "GetLedger: Invalid node " << logMe; charge (Resource::feeInvalidRequest); return; @@ -2100,10 +2106,10 @@ PeerImp::getLedger (std::shared_ptr const& m) try { - if (map->getNodeFat (mn, nodeIDs, rawNodes, fatRoot, fatLeaves)) + if (map->getNodeFat (mn, nodeIDs, rawNodes, fatLeaves, depth)) { assert (nodeIDs.size () == rawNodes.size ()); - p_journal_.trace << + if (p_journal_.trace) p_journal_.trace << "GetLedger: getNodeFat got " << rawNodes.size () << " nodes"; std::vector::iterator nodeIDIterator; std::vector< Blob >::iterator rawNodeIterator; @@ -2141,11 +2147,15 @@ PeerImp::getLedger (std::shared_ptr const& m) if (!packet.has_ledgerhash ()) info += ", no hash specified"; - p_journal_.warning << + if (p_journal_.warning) p_journal_.warning << "getNodeFat( " << mn << ") throws exception: " << info; } } + if (p_journal_.info) p_journal_.info << + "Got request for " << packet.nodeids().size() << " nodes at depth " << + depth << ", return " << reply.nodes().size() << " nodes"; + Message::pointer oPacket = std::make_shared ( reply, protocol::mtLEDGER_DATA); send (oPacket); @@ -2190,4 +2200,11 @@ PeerImp::getScore (bool haveItem) return score; } +bool +PeerImp::isHighLatency() const +{ + std::lock_guard sl (recentLock_); + return latency_.count() >= Tuning::peerHighLatency; +} + } // ripple diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 6945eecb6..618e3b426 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -144,7 +144,7 @@ private: std::uint64_t lastPingSeq_ = 0; clock_type::time_point lastPingTime_; - mutable std::mutex recentLock_; + std::mutex mutable recentLock_; protocol::TMStatusChange last_status_; protocol::TMHello hello_; Resource::Consumer usage_; @@ -307,6 +307,9 @@ public: int getScore (bool haveItem); + bool + isHighLatency() const override; + private: void close(); diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index d3b49ada0..8289a0403 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -46,6 +46,10 @@ enum consider it insane */ insaneLedgerLimit = 128, + /** How many milliseconds to consider high latency + on a peer connection */ + peerHighLatency = 120, + /** How often we check connections (seconds) */ checkSeconds = 10, }; diff --git a/src/ripple/proto/ripple.proto b/src/ripple/proto/ripple.proto index e47ff3057..dad8e1037 100644 --- a/src/ripple/proto/ripple.proto +++ b/src/ripple/proto/ripple.proto @@ -300,6 +300,7 @@ message TMGetLedger repeated bytes nodeIDs = 5; optional uint64 requestCookie = 6; optional TMQueryType queryType = 7; + optional uint32 queryDepth = 8; // How deep to go, number of extra levels } enum TMReplyError diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 2fac043b8..198bedcf8 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -146,8 +146,12 @@ public: // comparison/sync functions void getMissingNodes (std::vector& nodeIDs, std::vector& hashes, int max, SHAMapSyncFilter * filter); - bool getNodeFat (SHAMapNodeID node, std::vector& nodeIDs, - std::vector& rawNode, bool fatRoot, bool fatLeaves) const; + + bool getNodeFat (SHAMapNodeID node, + std::vector& nodeIDs, + std::vector& rawNode, + bool fatLeaves, std::uint32_t depth) const; + bool getRootNode (Serializer & s, SHANodeFormat format) const; std::vector getNeededHashes (int max, SHAMapSyncFilter * filter); SHAMapAddNode addRootNode (uint256 const& hash, Blob const& rootNode, SHANodeFormat format, diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index e6f5a9099..b52fd68b0 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -302,18 +302,24 @@ std::vector SHAMap::getNeededHashes (int max, SHAMapSyncFilter* filter) return nodeHashes; } -bool SHAMap::getNodeFat (SHAMapNodeID wanted, std::vector& nodeIDs, - std::vector& rawNodes, bool fatRoot, bool fatLeaves) const +bool SHAMap::getNodeFat (SHAMapNodeID wanted, + std::vector& nodeIDs, + std::vector& rawNodes, bool fatLeaves, + std::uint32_t depth) const { // Gets a node and some of its children + // to a specified depth SHAMapTreeNode* node = root_.get (); - SHAMapNodeID nodeID; while (node && node->isInner () && (nodeID.getDepth() < wanted.getDepth())) { int branch = nodeID.selectBranch (wanted.getNodeID()); + + if (node->isEmptyBranch (branch)) + return false; + node = descendThrow (node, branch); nodeID = nodeID.getChildNodeID (branch); } @@ -322,7 +328,7 @@ bool SHAMap::getNodeFat (SHAMapNodeID wanted, std::vector& nodeIDs { if (journal_.warning) journal_.warning << "peer requested node that is not in the map: " << wanted; - throw std::runtime_error ("Peer requested node not in map"); + return false; } if (node->isInner () && node->isEmpty ()) @@ -332,51 +338,56 @@ bool SHAMap::getNodeFat (SHAMapNodeID wanted, std::vector& nodeIDs return false; } - int count; - bool skipNode = false; - do + std::stack> stack; + stack.emplace (node, nodeID, depth); + + while (! stack.empty ()) { + std::tie (node, nodeID, depth) = stack.top (); + stack.pop (); - if (skipNode) - skipNode = false; - else + // Add this node to the reply + Serializer s; + node->addRaw (s, snfWIRE); + nodeIDs.push_back (nodeID); + rawNodes.push_back (std::move (s.peekData())); + + if (node->isInner()) { - Serializer s; - node->addRaw (s, snfWIRE); - nodeIDs.push_back (wanted); - rawNodes.push_back (std::move (s.peekData ())); - } - - if ((!fatRoot && wanted.isRoot ()) || node->isLeaf ()) // don't get a fat root_, can't get a fat leaf - return true; - - SHAMapTreeNode* nextNode = nullptr; - SHAMapNodeID nextNodeID; - - count = 0; - for (int i = 0; i < 16; ++i) - { - if (!node->isEmptyBranch (i)) + // We descend inner nodes with only a single child + // without decrementing the depth + int bc = node->getBranchCount(); + if ((depth > 0) || (bc == 1)) { - SHAMapNodeID nextNodeID = wanted.getChildNodeID (i); - nextNode = descendThrow (node, i); - ++count; - if (fatLeaves || nextNode->isInner ()) + // We need to process this node's children + for (int i = 0; i < 16; ++i) { - Serializer s; - nextNode->addRaw (s, snfWIRE); - nodeIDs.push_back (nextNodeID); - rawNodes.push_back (std::move (s.peekData ())); - skipNode = true; // Don't add this node again if we loop + if (! node->isEmptyBranch (i)) + { + SHAMapNodeID childID = nodeID.getChildNodeID (i); + SHAMapTreeNode* childNode = descendThrow (node, i); + + if (childNode->isInner () && + ((depth > 1) || (bc == 1))) + { + // If there's more than one child, reduce the depth + // If only one child, follow the chain + stack.emplace (childNode, childID, + (bc > 1) ? (depth - 1) : depth); + } + else if (childNode->isInner() || fatLeaves) + { + // Just include this node + Serializer s; + childNode->addRaw (s, snfWIRE); + nodeIDs.push_back (childID); + rawNodes.push_back (std::move (s.peekData ())); + } + } } } } - - node = nextNode; - wanted = nextNodeID; - - // So long as there's exactly one inner node, we take it - } while ((count == 1) && node->isInner()); + } return true; } diff --git a/src/ripple/shamap/tests/SHAMapSync.test.cpp b/src/ripple/shamap/tests/SHAMapSync.test.cpp index 7ec02b4bd..f22d591e4 100644 --- a/src/ripple/shamap/tests/SHAMapSync.test.cpp +++ b/src/ripple/shamap/tests/SHAMapSync.test.cpp @@ -124,7 +124,8 @@ public: destination.setSynching (); - unexpected (!source.getNodeFat (SHAMapNodeID (), nodeIDs, gotNodes, (rand () % 2) == 0, (rand () % 2) == 0), + unexpected (!source.getNodeFat (SHAMapNodeID (), nodeIDs, gotNodes, + (rand () % 2) == 0, rand () % 3), "GetNodeFat"); unexpected (gotNodes.size () < 1, "NodeSize"); @@ -152,7 +153,8 @@ public: // get as many nodes as possible based on this information for (nodeIDIterator = nodeIDs.begin (); nodeIDIterator != nodeIDs.end (); ++nodeIDIterator) { - if (!source.getNodeFat (*nodeIDIterator, gotNodeIDs, gotNodes, (rand () % 2) == 0, (rand () % 2) == 0)) + if (!source.getNodeFat (*nodeIDIterator, gotNodeIDs, gotNodes, + (rand () % 2) == 0, rand () % 3)) { fail ("GetNodeFat"); }