From 96f11c786ec47eb6fc2784545c806ad601b332b3 Mon Sep 17 00:00:00 2001 From: Miguel Portilla Date: Tue, 6 Apr 2021 10:03:28 -0400 Subject: [PATCH] Refactor GetLedger and LedgerData message handlers: * Verify message fields first * Break up GetLedger handler into several functions --- src/ripple/overlay/impl/PeerImp.cpp | 785 +++++++++++++++++----------- src/ripple/overlay/impl/PeerImp.h | 14 +- src/ripple/overlay/impl/Tuning.h | 3 + 3 files changed, 489 insertions(+), 313 deletions(-) diff --git a/src/ripple/overlay/impl/PeerImp.cpp b/src/ripple/overlay/impl/PeerImp.cpp index 06221b6bb..103d21d80 100644 --- a/src/ripple/overlay/impl/PeerImp.cpp +++ b/src/ripple/overlay/impl/PeerImp.cpp @@ -1530,11 +1530,94 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { - fee_ = Resource::feeMediumBurdenPeer; + auto badData = [&](std::string const& msg) { + charge(Resource::feeBadData); + JLOG(p_journal_.warn()) << "TMGetLedger: " << msg; + }; + auto const itype{m->itype()}; + + // Verify ledger info type + if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE) + return badData("Invalid ledger info type"); + + auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> { + if (m->has_ltype()) + return m->ltype(); + return std::nullopt; + }(); + + if (itype == protocol::liTS_CANDIDATE) + { + if (!m->has_ledgerhash()) + return badData("Invalid TX candidate set, missing TX set hash"); + } + else if ( + !m->has_ledgerhash() && !m->has_ledgerseq() && + !(ltype && *ltype == protocol::ltCLOSED)) + { + return badData("Invalid request"); + } + + // Verify ledger type + if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED)) + return badData("Invalid ledger type"); + + // Verify ledger hash + if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash())) + return badData("Invalid ledger hash"); + + // Verify ledger sequence + if (m->has_ledgerseq()) + { + auto const ledgerSeq{m->ledgerseq()}; + if (ledgerSeq < app_.getNodeStore().earliestLedgerSeq()) + { + return badData( + "Invalid ledger sequence " + std::to_string(ledgerSeq)); + } + + // Check if within a reasonable range + using namespace std::chrono_literals; + if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s && + ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10) + { + return badData( + "Invalid ledger sequence " + std::to_string(ledgerSeq)); + } + } + + // Verify ledger node IDs + if (itype != protocol::liBASE) + { + if (m->nodeids_size() <= 0) + return badData("Invalid ledger node IDs"); + + for (auto const& nodeId : m->nodeids()) + { + if (deserializeSHAMapNodeID(nodeId) == std::nullopt) + return badData("Invalid SHAMap node ID"); + } + } + + // Verify query type + if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT) + return badData("Invalid query type"); + + // Verify query depth + if (m->has_querydepth()) + { + if (m->querydepth() > Tuning::maxQueryDepth || + itype == protocol::liBASE) + { + return badData("Invalid query depth"); + } + } + + // Queue a job to process the request std::weak_ptr weak = shared_from_this(); app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m](Job&) { if (auto peer = weak.lock()) - peer->getLedger(m); + peer->processLedgerRequest(m); }); } @@ -1639,58 +1722,98 @@ PeerImp::onMessage(std::shared_ptr const& m) void PeerImp::onMessage(std::shared_ptr const& m) { - protocol::TMLedgerData& packet = *m; + auto badData = [&](std::string const& msg) { + fee_ = Resource::feeBadData; + JLOG(p_journal_.warn()) << "TMLedgerData: " << msg; + }; - if (m->nodes().size() <= 0) + // Verify ledger hash + if (!stringIsUint256Sized(m->ledgerhash())) + return badData("Invalid ledger hash"); + + // Verify ledger sequence { - JLOG(p_journal_.warn()) << "Ledger/TXset data with no nodes"; - return; + auto const ledgerSeq{m->ledgerseq()}; + if (m->type() == protocol::liTS_CANDIDATE) + { + if (ledgerSeq != 0) + { + return badData( + "Invalid ledger sequence " + std::to_string(ledgerSeq)); + } + } + else + { + if (ledgerSeq < app_.getNodeStore().earliestLedgerSeq()) + { + return badData( + "Invalid ledger sequence " + std::to_string(ledgerSeq)); + } + + // Check if within a reasonable range + using namespace std::chrono_literals; + if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s && + ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10) + { + return badData( + "Invalid ledger sequence " + std::to_string(ledgerSeq)); + } + } } + // Verify ledger info type + if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE) + return badData("Invalid ledger info type"); + + // Verify ledger nodes + if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::maxReplyNodes) + { + return badData( + "Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size())); + } + + // Verify reply error + if (m->has_error() && + (m->error() < protocol::reNO_LEDGER || + m->error() > protocol::reBAD_REQUEST)) + { + return badData("Invalid reply error"); + } + + // If there is a request cookie, attempt to relay the message if (m->has_requestcookie()) { - std::shared_ptr target = - overlay_.findPeerByShortID(m->requestcookie()); - if (target) + if (auto peer = overlay_.findPeerByShortID(m->requestcookie())) { m->clear_requestcookie(); - target->send( - std::make_shared(packet, protocol::mtLEDGER_DATA)); + peer->send(std::make_shared(*m, protocol::mtLEDGER_DATA)); } else { JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply"; - fee_ = Resource::feeUnwantedData; } return; } - if (!stringIsUint256Sized(m->ledgerhash())) - { - JLOG(p_journal_.warn()) << "TX candidate reply with invalid hash size"; - fee_ = Resource::feeInvalidRequest; - return; - } - - uint256 const hash{m->ledgerhash()}; + uint256 const ledgerHash{m->ledgerhash()}; + // Otherwise check if received data for a candidate transaction set if (m->type() == protocol::liTS_CANDIDATE) { - // got data for a candidate transaction set - std::weak_ptr weak = shared_from_this(); + std::weak_ptr weak{shared_from_this()}; app_.getJobQueue().addJob( - jtTXN_DATA, "recvPeerData", [weak, hash, m](Job&) { + jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m](Job&) { if (auto peer = weak.lock()) - peer->app_.getInboundTransactions().gotData(hash, peer, m); + { + peer->app_.getInboundTransactions().gotData( + ledgerHash, peer, m); + } }); return; } - if (!app_.getInboundLedgers().gotLedgerData(hash, shared_from_this(), m)) - { - JLOG(p_journal_.trace()) << "Got data for unwanted ledger"; - fee_ = Resource::feeUnwantedData; - } + // Consume the message + app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m); } void @@ -2830,334 +2953,372 @@ getPeerWithLedger( return ret; } -// VFALCO NOTE This function is way too big and cumbersome. void +PeerImp::sendLedgerBase( + std::shared_ptr const& ledger, + protocol::TMLedgerData& ledgerData) +{ + JLOG(p_journal_.trace()) << "sendLedgerBase: Base data"; + + Serializer s(sizeof(LedgerInfo)); + addRaw(ledger->info(), s); + ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength()); + + auto const& stateMap{ledger->stateMap()}; + if (stateMap.getHash() != beast::zero) + { + // Return account state root node if possible + Serializer root(768); + + stateMap.serializeRoot(root); + ledgerData.add_nodes()->set_nodedata( + root.getDataPtr(), root.getLength()); + + if (ledger->info().txHash != beast::zero) + { + auto const& txMap{ledger->txMap()}; + if (txMap.getHash() != beast::zero) + { + // Return TX root node if possible + root.erase(); + txMap.serializeRoot(root); + ledgerData.add_nodes()->set_nodedata( + root.getDataPtr(), root.getLength()); + } + } + } + + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + send(message); +} + +std::shared_ptr PeerImp::getLedger(std::shared_ptr const& m) { - protocol::TMGetLedger& packet = *m; - std::shared_ptr shared; - SHAMap const* map = nullptr; - protocol::TMLedgerData reply; - bool fatLeaves = true; + JLOG(p_journal_.trace()) << "getLedger: Ledger"; + std::shared_ptr ledger; - if (packet.has_requestcookie()) - reply.set_requestcookie(packet.requestcookie()); - - std::string logMe; - - if (packet.itype() == protocol::liTS_CANDIDATE) + if (m->has_ledgerhash()) { - // Request is for a transaction candidate set - JLOG(p_journal_.trace()) << "GetLedger: Tx candidate set"; - - if (!packet.has_ledgerhash() || - !stringIsUint256Sized(packet.ledgerhash())) + // Attempt to find ledger by hash + uint256 const ledgerHash{m->ledgerhash()}; + ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash); + if (!ledger) { - charge(Resource::feeInvalidRequest); - JLOG(p_journal_.warn()) << "GetLedger: Tx candidate set invalid"; - return; - } - - uint256 const txHash{packet.ledgerhash()}; - - shared = app_.getInboundTransactions().getSet(txHash, false); - map = shared.get(); - - if (!map) - { - if (packet.has_querytype() && !packet.has_requestcookie()) + if (m->has_ledgerseq()) { - JLOG(p_journal_.debug()) << "GetLedger: Routing Tx set request"; - - if (auto const v = getPeerWithTree(overlay_, txHash, this)) + // Attempt to find ledger by sequence in the shard store + if (auto shards = app_.getShardStore()) { - packet.set_requestcookie(id()); - v->send(std::make_shared( - packet, protocol::mtGET_LEDGER)); - return; - } - - JLOG(p_journal_.info()) << "GetLedger: Route TX set failed"; - return; - } - - JLOG(p_journal_.debug()) << "GetLedger: Can't provide map "; - charge(Resource::feeInvalidRequest); - return; - } - - reply.set_ledgerseq(0); - reply.set_ledgerhash(txHash.begin(), txHash.size()); - reply.set_type(protocol::liTS_CANDIDATE); - fatLeaves = false; // We'll already have most transactions - } - else - { - if (send_queue_.size() >= Tuning::dropSendQueue) - { - JLOG(p_journal_.debug()) << "GetLedger: Large send queue"; - return; - } - - if (app_.getFeeTrack().isLoadedLocal() && !cluster()) - { - JLOG(p_journal_.debug()) << "GetLedger: Too busy"; - return; - } - - // Figure out what ledger they want - JLOG(p_journal_.trace()) << "GetLedger: Received"; - - if (packet.has_ledgerhash()) - { - if (!stringIsUint256Sized(packet.ledgerhash())) - { - charge(Resource::feeInvalidRequest); - JLOG(p_journal_.warn()) << "GetLedger: Invalid request"; - return; - } - - uint256 const ledgerhash{packet.ledgerhash()}; - logMe += "LedgerHash:"; - logMe += to_string(ledgerhash); - ledger = app_.getLedgerMaster().getLedgerByHash(ledgerhash); - - if (!ledger && packet.has_ledgerseq()) - { - if (auto shardStore = app_.getShardStore()) - { - auto seq = packet.ledgerseq(); - if (seq >= shardStore->earliestLedgerSeq()) - ledger = shardStore->fetchLedger(ledgerhash, seq); + if (m->ledgerseq() >= shards->earliestLedgerSeq()) + { + ledger = + shards->fetchLedger(ledgerHash, m->ledgerseq()); + } } } if (!ledger) { JLOG(p_journal_.trace()) - << "GetLedger: Don't have " << ledgerhash; - } + << "getLedger: Don't have ledger with hash " << ledgerHash; - if (!ledger && - (packet.has_querytype() && !packet.has_requestcookie())) - { - // We don't have the requested ledger - // Search for a peer who might - auto const v = getPeerWithLedger( - overlay_, - ledgerhash, - packet.has_ledgerseq() ? packet.ledgerseq() : 0, - this); - if (!v) + if (m->has_querytype() && !m->has_requestcookie()) { - JLOG(p_journal_.trace()) << "GetLedger: Cannot route"; - return; - } + // Attempt to relay the request to a peer + if (auto const peer = getPeerWithLedger( + overlay_, + ledgerHash, + m->has_ledgerseq() ? m->ledgerseq() : 0, + this)) + { + m->set_requestcookie(id()); + peer->send(std::make_shared( + *m, protocol::mtGET_LEDGER)); + JLOG(p_journal_.debug()) + << "getLedger: Request relayed to peer"; + return ledger; + } - packet.set_requestcookie(id()); - v->send( - std::make_shared(packet, protocol::mtGET_LEDGER)); - JLOG(p_journal_.debug()) << "GetLedger: Request routed"; - return; + JLOG(p_journal_.trace()) + << "getLedger: Failed to find peer to relay request"; + } } } - else if (packet.has_ledgerseq()) + } + else if (m->has_ledgerseq()) + { + // Attempt to find ledger by sequence + if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) { - if (packet.ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) - { - JLOG(p_journal_.debug()) << "GetLedger: Early ledger request"; - return; - } - ledger = app_.getLedgerMaster().getLedgerBySeq(packet.ledgerseq()); - if (!ledger) - { - JLOG(p_journal_.debug()) - << "GetLedger: Don't have " << packet.ledgerseq(); - } - } - else if (packet.has_ltype() && (packet.ltype() == protocol::ltCLOSED)) - { - ledger = app_.getLedgerMaster().getClosedLedger(); - assert(!ledger->open()); - // VFALCO ledger should never be null! - // VFALCO How can the closed ledger be open? -#if 0 - if (ledger && ledger->info().open) - ledger = app_.getLedgerMaster ().getLedgerBySeq ( - ledger->info().seq - 1); -#endif + JLOG(p_journal_.debug()) + << "getLedger: Early ledger sequence request"; } else { - charge(Resource::feeInvalidRequest); - JLOG(p_journal_.warn()) << "GetLedger: Unknown request"; - return; - } - - if ((!ledger) || - (packet.has_ledgerseq() && - (packet.ledgerseq() != ledger->info().seq))) - { - charge(Resource::feeInvalidRequest); - - if (ledger) + ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq()); + if (!ledger) { - JLOG(p_journal_.warn()) << "GetLedger: Invalid sequence"; + JLOG(p_journal_.debug()) + << "getLedger: Don't have ledger with sequence " + << m->ledgerseq(); } - return; - } - - if (!packet.has_ledgerseq() && - (ledger->info().seq < app_.getLedgerMaster().getEarliestFetch())) - { - JLOG(p_journal_.debug()) << "GetLedger: Early ledger request"; - return; - } - - // Fill out the reply - auto const lHash = ledger->info().hash; - reply.set_ledgerhash(lHash.begin(), lHash.size()); - reply.set_ledgerseq(ledger->info().seq); - reply.set_type(packet.itype()); - - if (packet.itype() == protocol::liBASE) - { - // they want the ledger base data - JLOG(p_journal_.trace()) << "GetLedger: Base data"; - Serializer nData(128); - addRaw(ledger->info(), nData); - reply.add_nodes()->set_nodedata( - nData.getDataPtr(), nData.getLength()); - - auto const& stateMap = ledger->stateMap(); - if (stateMap.getHash() != beast::zero) - { - // return account state root node if possible - Serializer rootNode(768); - - stateMap.serializeRoot(rootNode); - reply.add_nodes()->set_nodedata( - rootNode.getDataPtr(), rootNode.getLength()); - - if (ledger->info().txHash != beast::zero) - { - auto const& txMap = ledger->txMap(); - if (txMap.getHash() != beast::zero) - { - rootNode.erase(); - - txMap.serializeRoot(rootNode); - reply.add_nodes()->set_nodedata( - rootNode.getDataPtr(), rootNode.getLength()); - } - } - } - - auto oPacket = - std::make_shared(reply, protocol::mtLEDGER_DATA); - send(oPacket); - return; - } - - if (packet.itype() == protocol::liTX_NODE) - { - map = &ledger->txMap(); - logMe += " TX:"; - logMe += to_string(map->getHash()); - } - else if (packet.itype() == protocol::liAS_NODE) - { - map = &ledger->stateMap(); - logMe += " AS:"; - logMe += to_string(map->getHash()); } } - - if (!map || (packet.nodeids_size() == 0)) + else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED) { - JLOG(p_journal_.warn()) << "GetLedger: Can't find map or empty request"; - charge(Resource::feeInvalidRequest); - return; + ledger = app_.getLedgerMaster().getClosedLedger(); } - JLOG(p_journal_.trace()) << "GetLedger: " << logMe; - - auto const depth = packet.has_querydepth() - ? (std::min(packet.querydepth(), 3u)) - : (isHighLatency() ? 2 : 1); - - for (int i = 0; - (i < packet.nodeids().size() && - (reply.nodes().size() < Tuning::maxReplyNodes)); - ++i) + if (ledger) { - auto const mn = deserializeSHAMapNodeID(packet.nodeids(i)); - - if (!mn) + // Validate retrieved ledger sequence + auto const ledgerSeq{ledger->info().seq}; + if (m->has_ledgerseq()) { - JLOG(p_journal_.warn()) << "GetLedger: Invalid node " << logMe; - charge(Resource::feeBadData); - return; - } - - std::vector nodeIDs; - std::vector rawNodes; - - try - { - if (map->getNodeFat(*mn, nodeIDs, rawNodes, fatLeaves, depth)) + if (ledgerSeq != m->ledgerseq()) { - assert(nodeIDs.size() == rawNodes.size()); - JLOG(p_journal_.trace()) << "GetLedger: getNodeFat got " - << rawNodes.size() << " nodes"; - std::vector::iterator nodeIDIterator; - std::vector::iterator rawNodeIterator; + // Do not resource charge a peer responding to a relay + if (!m->has_requestcookie()) + charge(Resource::feeInvalidRequest); - for (nodeIDIterator = nodeIDs.begin(), - rawNodeIterator = rawNodes.begin(); - nodeIDIterator != nodeIDs.end(); - ++nodeIDIterator, ++rawNodeIterator) - { - protocol::TMLedgerNode* node = reply.add_nodes(); - node->set_nodeid(nodeIDIterator->getRawString()); - node->set_nodedata( - &rawNodeIterator->front(), rawNodeIterator->size()); - } + ledger.reset(); + JLOG(p_journal_.warn()) + << "getLedger: Invalid ledger sequence " << ledgerSeq; + } + } + else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch()) + { + ledger.reset(); + JLOG(p_journal_.debug()) + << "getLedger: Early ledger sequence request " << ledgerSeq; + } + } + else + { + JLOG(p_journal_.warn()) << "getLedger: Unable to find ledger"; + } + + return ledger; +} + +std::shared_ptr +PeerImp::getTxSet(std::shared_ptr const& m) const +{ + JLOG(p_journal_.trace()) << "getTxSet: TX set"; + + uint256 const txSetHash{m->ledgerhash()}; + std::shared_ptr shaMap{ + app_.getInboundTransactions().getSet(txSetHash, false)}; + if (!shaMap) + { + if (m->has_querytype() && !m->has_requestcookie()) + { + // Attempt to relay the request to a peer + if (auto const peer = getPeerWithTree(overlay_, txSetHash, this)) + { + m->set_requestcookie(id()); + peer->send( + std::make_shared(*m, protocol::mtGET_LEDGER)); + JLOG(p_journal_.debug()) << "getTxSet: Request relayed"; } else { - JLOG(p_journal_.warn()) - << "GetLedger: getNodeFat returns false"; + JLOG(p_journal_.debug()) + << "getTxSet: Failed to find relay peer"; } } - catch (std::exception&) + else { - std::string info; - - if (packet.itype() == protocol::liTS_CANDIDATE) - info = "TS candidate"; - else if (packet.itype() == protocol::liBASE) - info = "Ledger base"; - else if (packet.itype() == protocol::liTX_NODE) - info = "TX node"; - else if (packet.itype() == protocol::liAS_NODE) - info = "AS node"; - - if (!packet.has_ledgerhash()) - info += ", no hash specified"; - - JLOG(p_journal_.warn()) - << "getNodeFat( " << *mn << ") throws exception: " << info; + JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set"; } } - JLOG(p_journal_.info()) - << "Got request for " << packet.nodeids().size() << " nodes at depth " - << depth << ", return " << reply.nodes().size() << " nodes"; + return shaMap; +} - auto oPacket = std::make_shared(reply, protocol::mtLEDGER_DATA); - send(oPacket); +void +PeerImp::processLedgerRequest(std::shared_ptr const& m) +{ + // Do not resource charge a peer responding to a relay + if (!m->has_requestcookie()) + charge(Resource::feeMediumBurdenPeer); + + std::shared_ptr ledger; + std::shared_ptr sharedMap; + SHAMap const* map{nullptr}; + protocol::TMLedgerData ledgerData; + bool fatLeaves{true}; + auto const itype{m->itype()}; + + if (itype == protocol::liTS_CANDIDATE) + { + if (sharedMap = getTxSet(m); !sharedMap) + return; + map = sharedMap.get(); + + // Fill out the reply + ledgerData.set_ledgerseq(0); + ledgerData.set_ledgerhash(m->ledgerhash()); + ledgerData.set_type(protocol::liTS_CANDIDATE); + if (m->has_requestcookie()) + ledgerData.set_requestcookie(m->requestcookie()); + + // We'll already have most transactions + fatLeaves = false; + } + else + { + if (send_queue_.size() >= Tuning::dropSendQueue) + { + JLOG(p_journal_.debug()) + << "processLedgerRequest: Large send queue"; + return; + } + if (app_.getFeeTrack().isLoadedLocal() && !cluster()) + { + JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy"; + return; + } + + if (ledger = getLedger(m); !ledger) + return; + + // Fill out the reply + auto const ledgerHash{ledger->info().hash}; + ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size()); + ledgerData.set_ledgerseq(ledger->info().seq); + ledgerData.set_type(itype); + if (m->has_requestcookie()) + ledgerData.set_requestcookie(m->requestcookie()); + + switch (itype) + { + case protocol::liBASE: + sendLedgerBase(ledger, ledgerData); + return; + + case protocol::liTX_NODE: + map = &ledger->txMap(); + JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash " + << to_string(map->getHash()); + break; + + case protocol::liAS_NODE: + map = &ledger->stateMap(); + JLOG(p_journal_.trace()) + << "processLedgerRequest: Account state map hash " + << to_string(map->getHash()); + break; + + default: + // This case should not be possible here + JLOG(p_journal_.error()) + << "processLedgerRequest: Invalid ledger info type"; + return; + } + } + + if (!map) + { + JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map"; + return; + } + + // Add requested node data to reply + if (m->nodeids_size() > 0) + { + auto const queryDepth{ + m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)}; + std::vector nodeIds; + std::vector rawNodes; + + for (int i = 0; i < m->nodeids_size() && + ledgerData.nodes_size() < Tuning::maxReplyNodes; + ++i) + { + auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))}; + + nodeIds.clear(); + rawNodes.clear(); + try + { + if (map->getNodeFat( + *shaMapNodeId, + nodeIds, + rawNodes, + fatLeaves, + queryDepth)) + { + assert(nodeIds.size() == rawNodes.size()); + JLOG(p_journal_.trace()) + << "processLedgerRequest: getNodeFat got " + << rawNodes.size() << " nodes"; + + auto rawNodeIter{rawNodes.begin()}; + for (auto const& nodeId : nodeIds) + { + protocol::TMLedgerNode* node{ledgerData.add_nodes()}; + node->set_nodeid(nodeId.getRawString()); + node->set_nodedata( + &rawNodeIter->front(), rawNodeIter->size()); + ++rawNodeIter; + } + } + else + { + JLOG(p_journal_.warn()) + << "processLedgerRequest: getNodeFat returns false"; + } + } + catch (std::exception& e) + { + std::string info; + switch (itype) + { + case protocol::liBASE: + // This case should not be possible here + info = "Ledger base"; + break; + + case protocol::liTX_NODE: + info = "TX node"; + break; + + case protocol::liAS_NODE: + info = "AS node"; + break; + + case protocol::liTS_CANDIDATE: + info = "TS candidate"; + break; + + default: + info = "Invalid"; + break; + } + + if (!m->has_ledgerhash()) + info += ", no hash specified"; + + JLOG(p_journal_.error()) + << "processLedgerRequest: getNodeFat with nodeId " + << *shaMapNodeId << " and ledger info type " << info + << " throws exception: " << e.what(); + } + } + + JLOG(p_journal_.info()) + << "processLedgerRequest: Got request for " << m->nodeids_size() + << " nodes at depth " << queryDepth << ", return " + << ledgerData.nodes_size() << " nodes"; + } + + auto message{ + std::make_shared(ledgerData, protocol::mtLEDGER_DATA)}; + send(message); } int diff --git a/src/ripple/overlay/impl/PeerImp.h b/src/ripple/overlay/impl/PeerImp.h index 4630b5393..9c511000c 100644 --- a/src/ripple/overlay/impl/PeerImp.h +++ b/src/ripple/overlay/impl/PeerImp.h @@ -46,6 +46,7 @@ namespace ripple { struct ValidatorBlobInfo; +class SHAMap; class PeerImp : public Peer, public std::enable_shared_from_this, @@ -564,7 +565,18 @@ private: std::shared_ptr const& packet); void - getLedger(std::shared_ptr const& packet); + sendLedgerBase( + std::shared_ptr const& ledger, + protocol::TMLedgerData& ledgerData); + + std::shared_ptr + getLedger(std::shared_ptr const& m); + + std::shared_ptr + getTxSet(std::shared_ptr const& m) const; + + void + processLedgerRequest(std::shared_ptr const& m); }; //------------------------------------------------------------------------------ diff --git a/src/ripple/overlay/impl/Tuning.h b/src/ripple/overlay/impl/Tuning.h index 4d3467ae4..a23d482f2 100644 --- a/src/ripple/overlay/impl/Tuning.h +++ b/src/ripple/overlay/impl/Tuning.h @@ -54,6 +54,9 @@ enum { /** How often we check for idle peers (seconds) */ checkIdlePeers = 4, + + /** The maximum number of levels to search */ + maxQueryDepth = 3, }; /** Size of buffer used to read from the socket. */