diff --git a/src/ripple/app/ledger/InboundLedger.h b/src/ripple/app/ledger/InboundLedger.h index 25f644476..287dbaf7f 100644 --- a/src/ripple/app/ledger/InboundLedger.h +++ b/src/ripple/app/ledger/InboundLedger.h @@ -39,9 +39,6 @@ class InboundLedger final : public TimeoutCounter, public: using clock_type = beast::abstract_clock; - using PeerDataPairType = - std::pair, std::shared_ptr>; - // These are the reasons we might acquire a ledger enum class Reason { HISTORY, // Acquiring past ledger @@ -193,7 +190,9 @@ private: // Data we have received from peers std::mutex mReceivedDataLock; - std::vector mReceivedData; + std::vector< + std::pair, std::shared_ptr>> + mReceivedData; bool mReceiveDispatched; std::unique_ptr mPeerSet; }; diff --git a/src/ripple/app/ledger/impl/InboundLedger.cpp b/src/ripple/app/ledger/impl/InboundLedger.cpp index 979c14544..6609759d0 100644 --- a/src/ripple/app/ledger/impl/InboundLedger.cpp +++ b/src/ripple/app/ledger/impl/InboundLedger.cpp @@ -33,7 +33,10 @@ #include #include +#include + #include +#include namespace ripple { @@ -57,15 +60,15 @@ enum { // Number of nodes to find initially , - missingNodesFind = 256 + missingNodesFind = 512 // Number of nodes to request for a reply , - reqNodesReply = 128 + reqNodesReply = 256 // Number of nodes to request blindly , - reqNodes = 8 + reqNodes = 12 }; // millisecond for each ledger timeout @@ -601,7 +604,7 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) tmBH.set_ledgerhash(hash_.begin(), hash_.size()); for (auto const& p : need) { - JLOG(journal_.warn()) << "Want: " << p.second; + JLOG(journal_.debug()) << "Want: " << p.second; if (!typeSet) { @@ -661,15 +664,15 @@ InboundLedger::trigger(std::shared_ptr const& peer, TriggerReason reason) if (reason != TriggerReason::reply) { // If we're querying blind, don't query deep - tmGL.set_querydepth(0); + tmGL.set_querydepth(1); } else if (peer && peer->isHighLatency()) { // If the peer has high latency, query extra deep - tmGL.set_querydepth(2); + tmGL.set_querydepth(3); } else - tmGL.set_querydepth(1); + tmGL.set_querydepth(2); // Get the state data first because it's the most likely to be useful // if we wind up abandoning this fetch. @@ -952,22 +955,23 @@ InboundLedger::receiveNode(protocol::TMLedgerData& packet, SHAMapAddNode& san) try { + auto const f = filter.get(); + for (auto const& node : packet.nodes()) { auto const nodeID = deserializeSHAMapNodeID(node.nodeid()); if (!nodeID) - { - san.incInvalid(); - return; - } + throw std::runtime_error("data does not properly deserialize"); if (nodeID->isRoot()) - san += map.addRootNode( - rootHash, makeSlice(node.nodedata()), filter.get()); + { + san += map.addRootNode(rootHash, makeSlice(node.nodedata()), f); + } else - san += map.addKnownNode( - *nodeID, makeSlice(node.nodedata()), filter.get()); + { + san += map.addKnownNode(*nodeID, makeSlice(node.nodedata()), f); + } if (!san.isGood()) { @@ -1120,19 +1124,19 @@ InboundLedger::processData( std::shared_ptr peer, protocol::TMLedgerData& packet) { - ScopedLockType sl(mtx_); - if (packet.type() == protocol::liBASE) { - if (packet.nodes_size() < 1) + if (packet.nodes().empty()) { - JLOG(journal_.warn()) << "Got empty header data"; + JLOG(journal_.warn()) << peer->id() << ": empty header data"; peer->charge(Resource::feeInvalidRequest); return -1; } SHAMapAddNode san; + ScopedLockType sl(mtx_); + try { if (!mHaveHeader) @@ -1177,13 +1181,18 @@ InboundLedger::processData( if ((packet.type() == protocol::liTX_NODE) || (packet.type() == protocol::liAS_NODE)) { - if (packet.nodes().size() == 0) + std::string type = packet.type() == protocol::liTX_NODE ? "liTX_NODE: " + : "liAS_NODE: "; + + if (packet.nodes().empty()) { - JLOG(journal_.info()) << "Got response with no nodes"; + JLOG(journal_.info()) << peer->id() << ": response with no nodes"; peer->charge(Resource::feeInvalidRequest); return -1; } + ScopedLockType sl(mtx_); + // Verify node IDs and data are complete for (auto const& node : packet.nodes()) { @@ -1198,14 +1207,10 @@ InboundLedger::processData( SHAMapAddNode san; receiveNode(packet, san); - if (packet.type() == protocol::liTX_NODE) - { - JLOG(journal_.debug()) << "Ledger TX node stats: " << san.get(); - } - else - { - JLOG(journal_.debug()) << "Ledger AS node stats: " << san.get(); - } + JLOG(journal_.debug()) + << "Ledger " + << ((packet.type() == protocol::liTX_NODE) ? "TX" : "AS") + << " node stats: " << san.get(); if (san.isUseful()) progress_ = true; @@ -1217,20 +1222,89 @@ InboundLedger::processData( return -1; } +namespace detail { +// Track the amount of useful data that each peer returns +struct PeerDataCounts +{ + // Map from peer to amount of useful the peer returned + std::unordered_map, int> counts; + // The largest amount of useful data that any peer returned + int maxCount = 0; + + // Update the data count for a peer + void + update(std::shared_ptr&& peer, int dataCount) + { + if (dataCount <= 0) + return; + maxCount = std::max(maxCount, dataCount); + auto i = counts.find(peer); + if (i == counts.end()) + { + counts.emplace(std::move(peer), dataCount); + return; + } + i->second = std::max(i->second, dataCount); + } + + // Prune all the peers that didn't return enough data. + void + prune() + { + // Remove all the peers that didn't return at least half as much data as + // the best peer + auto const thresh = maxCount / 2; + auto i = counts.begin(); + while (i != counts.end()) + { + if (i->second < thresh) + i = counts.erase(i); + else + ++i; + } + } + + // call F with the `peer` parameter with a random sample of at most n values + // of the counts vector. + template + void + sampleN(std::size_t n, F&& f) + { + if (counts.empty()) + return; + + std::minstd_rand rng{std::random_device{}()}; + std::sample( + counts.begin(), + counts.end(), + boost::make_function_output_iterator( + [&f](auto&& v) { f(v.first); }), + n, + rng); + } +}; +} // namespace detail + /** Process pending TMLedgerData - Query the 'best' peer + Query the a random sample of the 'best' peers */ void InboundLedger::runData() { - std::shared_ptr chosenPeer; - int chosenPeerCount = -1; + // Maximum number of peers to request data from + constexpr std::size_t maxUsefulPeers = 6; - std::vector data; + decltype(mReceivedData) data; + + // Reserve some memory so the first couple iterations don't reallocate + data.reserve(8); + + detail::PeerDataCounts dataCounts; for (;;) { data.clear(); + { std::lock_guard sl(mReceivedDataLock); @@ -1243,24 +1317,22 @@ InboundLedger::runData() data.swap(mReceivedData); } - // Select the peer that gives us the most nodes that are useful, - // breaking ties in favor of the peer that responded first. for (auto& entry : data) { if (auto peer = entry.first.lock()) { int count = processData(peer, *(entry.second)); - if (count > chosenPeerCount) - { - chosenPeerCount = count; - chosenPeer = std::move(peer); - } + dataCounts.update(std::move(peer), count); } } } - if (chosenPeer) - trigger(chosenPeer, TriggerReason::reply); + // Select a random sample of the peers that gives us the most nodes that are + // useful + dataCounts.prune(); + dataCounts.sampleN(maxUsefulPeers, [&](std::shared_ptr const& peer) { + trigger(peer, TriggerReason::reply); + }); } Json::Value diff --git a/src/ripple/app/ledger/impl/InboundLedgers.cpp b/src/ripple/app/ledger/impl/InboundLedgers.cpp index 76681ea0a..7ee49b454 100644 --- a/src/ripple/app/ledger/impl/InboundLedgers.cpp +++ b/src/ripple/app/ledger/impl/InboundLedgers.cpp @@ -74,6 +74,12 @@ public: reason != InboundLedger::Reason::SHARD || (seq != 0 && app_.getShardStore())); + // probably not the right rule + if (app_.getOPs().isNeedNetworkLedger() && + (reason != InboundLedger::Reason::GENERIC) && + (reason != InboundLedger::Reason::CONSENSUS)) + return {}; + bool isNew = true; std::shared_ptr inbound; { @@ -82,6 +88,7 @@ public: { return {}; } + auto it = mLedgers.find(hash); if (it != mLedgers.end()) { diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 435148062..c4f2eddf3 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -60,11 +60,11 @@ enum JobType { jtLEDGER_REQ, // Peer request ledger/txnset data jtPROPOSAL_ut, // A proposal from an untrusted source jtREPLAY_TASK, // A Ledger replay task/subtask - jtLEDGER_DATA, // Received data for a ledger we're acquiring jtTRANSACTION, // A transaction received from the network jtMISSING_TXN, // Request missing transactions jtREQUESTED_TXN, // Reply with requested transactions jtBATCH, // Apply batched transactions + jtLEDGER_DATA, // Received data for a ledger we're acquiring jtADVANCE, // Advance validated/acquired ledgers jtPUBLEDGER, // Publish a fully-accepted ledger jtTXN_DATA, // Fetch a proposed set diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index b6e07a0f1..5bdaa0121 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1339,7 +1339,7 @@ PeerImp::onMessage(std::shared_ptr const& m) // case ShardState::finalized: default: return badData("Invalid incomplete shard state"); - }; + } s.add32(incomplete.state()); // Verify progress @@ -3523,8 +3523,8 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) { auto const queryDepth{ m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)}; - std::vector nodeIds; - std::vector rawNodes; + + std::vector> data; for (int i = 0; i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::softMaxReplyNodes; @@ -3532,30 +3532,22 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) { auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))}; - nodeIds.clear(); - rawNodes.clear(); + data.clear(); + data.reserve(Tuning::softMaxReplyNodes); + try { - if (map->getNodeFat( - *shaMapNodeId, - nodeIds, - rawNodes, - fatLeaves, - queryDepth)) + if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth)) { - assert(nodeIds.size() == rawNodes.size()); JLOG(p_journal_.trace()) << "processLedgerRequest: getNodeFat got " - << rawNodes.size() << " nodes"; + << data.size() << " nodes"; - auto rawNodeIter{rawNodes.begin()}; - for (auto const& nodeId : nodeIds) + for (auto const& d : data) { protocol::TMLedgerNode* node{ledgerData.add_nodes()}; - node->set_nodeid(nodeId.getRawString()); - node->set_nodedata( - &rawNodeIter->front(), rawNodeIter->size()); - ++rawNodeIter; + node->set_nodeid(d.first.getRawString()); + node->set_nodedata(d.second.data(), d.second.size()); } } else @@ -3607,9 +3599,7 @@ PeerImp::processLedgerRequest(std::shared_ptr const& m) << ledgerData.nodes_size() << " nodes"; } - auto message{ - std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; - send(message); + send(std::make_shared(ledgerData, protocol::mtLEDGER_DATA)); } int diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index b913bd5b1..1d221179c 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -238,7 +238,7 @@ public: void visitDifferences( SHAMap const* have, - std::function) const; + std::function const&) const; /** Visit every leaf node in this SHAMap @@ -267,8 +267,7 @@ public: bool getNodeFat( SHAMapNodeID const& wanted, - std::vector& nodeIDs, - std::vector& rawNodes, + std::vector>& data, bool fatLeaves, std::uint32_t depth) const; diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 1e233d55f..1bada8513 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -52,7 +52,7 @@ SHAMap::visitNodes(std::function const& function) const auto node = std::static_pointer_cast(root_); int pos = 0; - while (1) + while (true) { while (pos < 16) { @@ -99,7 +99,7 @@ SHAMap::visitNodes(std::function const& function) const void SHAMap::visitDifferences( SHAMap const* have, - std::function function) const + std::function const& function) const { // Visit every node in this SHAMap that is not present // in the specified SHAMap @@ -426,8 +426,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) bool SHAMap::getNodeFat( SHAMapNodeID const& wanted, - std::vector& nodeIDs, - std::vector& rawNodes, + std::vector>& data, bool fatLeaves, std::uint32_t depth) const { @@ -443,16 +442,15 @@ SHAMap::getNodeFat( auto inner = static_cast(node); if (inner->isEmptyBranch(branch)) return false; - node = descendThrow(inner, branch); nodeID = nodeID.getChildNodeID(branch); } if (node == nullptr || wanted != nodeID) { - JLOG(journal_.warn()) << "peer requested node that is not in the map:\n" - << wanted << " but found\n" - << nodeID; + JLOG(journal_.info()) + << "peer requested node that is not in the map: " << wanted + << " but found " << nodeID; return false; } @@ -465,18 +463,17 @@ SHAMap::getNodeFat( std::stack> stack; stack.emplace(node, nodeID, depth); + Serializer s(8192); + while (!stack.empty()) { std::tie(node, nodeID, depth) = stack.top(); stack.pop(); - { - // Add this node to the reply - Serializer s; - node->serializeForWire(s); - nodeIDs.push_back(nodeID); - rawNodes.push_back(std::move(s.modData())); - } + // Add this node to the reply + s.erase(); + node->serializeForWire(s); + data.emplace_back(std::make_pair(nodeID, s.getData())); if (node->isInner()) { @@ -484,6 +481,7 @@ SHAMap::getNodeFat( // without decrementing the depth auto inner = static_cast(node); int bc = inner->getBranchCount(); + if ((depth > 0) || (bc == 1)) { // We need to process this node's children @@ -492,7 +490,7 @@ SHAMap::getNodeFat( if (!inner->isEmptyBranch(i)) { auto const childNode = descendThrow(inner, i); - SHAMapNodeID const childID = nodeID.getChildNodeID(i); + auto const childID = nodeID.getChildNodeID(i); if (childNode->isInner() && ((depth > 1) || (bc == 1))) { @@ -506,10 +504,10 @@ SHAMap::getNodeFat( else if (childNode->isInner() || fatLeaves) { // Just include this node - Serializer ns; - childNode->serializeForWire(ns); - nodeIDs.push_back(childID); - rawNodes.push_back(std::move(ns.modData())); + s.erase(); + childNode->serializeForWire(s); + data.emplace_back( + std::make_pair(childID, s.getData())); } } } diff --git a/src/test/shamap/SHAMapSync_test.cpp b/src/test/shamap/SHAMapSync_test.cpp index f262f5f8b..ba32f6e80 100644 --- a/src/test/shamap/SHAMapSync_test.cpp +++ b/src/test/shamap/SHAMapSync_test.cpp @@ -124,24 +124,18 @@ public: destination.setSynching(); { - std::vector gotNodeIDs_a; - std::vector gotNodes_a; + std::vector> a; BEAST_EXPECT(source.getNodeFat( - SHAMapNodeID(), - gotNodeIDs_a, - gotNodes_a, - rand_bool(eng_), - rand_int(eng_, 2))); + SHAMapNodeID(), a, rand_bool(eng_), rand_int(eng_, 2))); - unexpected(gotNodes_a.size() < 1, "NodeSize"); + unexpected(a.size() < 1, "NodeSize"); - BEAST_EXPECT(destination - .addRootNode( - source.getHash(), - makeSlice(*gotNodes_a.begin()), - nullptr) - .isGood()); + BEAST_EXPECT( + destination + .addRootNode( + source.getHash(), makeSlice(a[0].second), nullptr) + .isGood()); } do @@ -155,8 +149,7 @@ public: break; // get as many nodes as possible based on this information - std::vector gotNodeIDs_b; - std::vector gotNodes_b; + std::vector> b; for (auto& it : nodesMissing) { @@ -164,29 +157,24 @@ public: // non-deterministic number of times and the number of tests run // should be deterministic if (!source.getNodeFat( - it.first, - gotNodeIDs_b, - gotNodes_b, - rand_bool(eng_), - rand_int(eng_, 2))) + it.first, b, rand_bool(eng_), rand_int(eng_, 2))) fail("", __FILE__, __LINE__); } // Don't use BEAST_EXPECT here b/c it will be called a // non-deterministic number of times and the number of tests run // should be deterministic - if (gotNodeIDs_b.size() != gotNodes_b.size() || - gotNodeIDs_b.empty()) + if (b.empty()) fail("", __FILE__, __LINE__); - for (std::size_t i = 0; i < gotNodeIDs_b.size(); ++i) + for (std::size_t i = 0; i < b.size(); ++i) { // Don't use BEAST_EXPECT here b/c it will be called a // non-deterministic number of times and the number of tests run // should be deterministic if (!destination .addKnownNode( - gotNodeIDs_b[i], makeSlice(gotNodes_b[i]), nullptr) + b[i].first, makeSlice(b[i].second), nullptr) .isUseful()) fail("", __FILE__, __LINE__); }