Refactor GetLedger and LedgerData message handlers:

* Verify message fields first
* Break up GetLedger handler into several functions
This commit is contained in:
Miguel Portilla
2021-04-06 10:03:28 -04:00
committed by manojsdoshi
parent 9202197354
commit 96f11c786e
3 changed files with 489 additions and 313 deletions

View File

@@ -1530,11 +1530,94 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMTransaction> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
{ {
fee_ = Resource::feeMediumBurdenPeer; auto badData = [&](std::string const& msg) {
charge(Resource::feeBadData);
JLOG(p_journal_.warn()) << "TMGetLedger: " << msg;
};
auto const itype{m->itype()};
// Verify ledger info type
if (itype < protocol::liBASE || itype > protocol::liTS_CANDIDATE)
return badData("Invalid ledger info type");
auto const ltype = [&m]() -> std::optional<::protocol::TMLedgerType> {
if (m->has_ltype())
return m->ltype();
return std::nullopt;
}();
if (itype == protocol::liTS_CANDIDATE)
{
if (!m->has_ledgerhash())
return badData("Invalid TX candidate set, missing TX set hash");
}
else if (
!m->has_ledgerhash() && !m->has_ledgerseq() &&
!(ltype && *ltype == protocol::ltCLOSED))
{
return badData("Invalid request");
}
// Verify ledger type
if (ltype && (*ltype < protocol::ltACCEPTED || *ltype > protocol::ltCLOSED))
return badData("Invalid ledger type");
// Verify ledger hash
if (m->has_ledgerhash() && !stringIsUint256Sized(m->ledgerhash()))
return badData("Invalid ledger hash");
// Verify ledger sequence
if (m->has_ledgerseq())
{
auto const ledgerSeq{m->ledgerseq()};
if (ledgerSeq < app_.getNodeStore().earliestLedgerSeq())
{
return badData(
"Invalid ledger sequence " + std::to_string(ledgerSeq));
}
// Check if within a reasonable range
using namespace std::chrono_literals;
if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
{
return badData(
"Invalid ledger sequence " + std::to_string(ledgerSeq));
}
}
// Verify ledger node IDs
if (itype != protocol::liBASE)
{
if (m->nodeids_size() <= 0)
return badData("Invalid ledger node IDs");
for (auto const& nodeId : m->nodeids())
{
if (deserializeSHAMapNodeID(nodeId) == std::nullopt)
return badData("Invalid SHAMap node ID");
}
}
// Verify query type
if (m->has_querytype() && m->querytype() != protocol::qtINDIRECT)
return badData("Invalid query type");
// Verify query depth
if (m->has_querydepth())
{
if (m->querydepth() > Tuning::maxQueryDepth ||
itype == protocol::liBASE)
{
return badData("Invalid query depth");
}
}
// Queue a job to process the request
std::weak_ptr<PeerImp> weak = shared_from_this(); std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m](Job&) { app_.getJobQueue().addJob(jtLEDGER_REQ, "recvGetLedger", [weak, m](Job&) {
if (auto peer = weak.lock()) if (auto peer = weak.lock())
peer->getLedger(m); peer->processLedgerRequest(m);
}); });
} }
@@ -1639,58 +1722,98 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMReplayDeltaResponse> const& m)
void void
PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m) PeerImp::onMessage(std::shared_ptr<protocol::TMLedgerData> const& m)
{ {
protocol::TMLedgerData& packet = *m; auto badData = [&](std::string const& msg) {
fee_ = Resource::feeBadData;
JLOG(p_journal_.warn()) << "TMLedgerData: " << msg;
};
if (m->nodes().size() <= 0) // Verify ledger hash
if (!stringIsUint256Sized(m->ledgerhash()))
return badData("Invalid ledger hash");
// Verify ledger sequence
{ {
JLOG(p_journal_.warn()) << "Ledger/TXset data with no nodes"; auto const ledgerSeq{m->ledgerseq()};
return; if (m->type() == protocol::liTS_CANDIDATE)
{
if (ledgerSeq != 0)
{
return badData(
"Invalid ledger sequence " + std::to_string(ledgerSeq));
}
}
else
{
if (ledgerSeq < app_.getNodeStore().earliestLedgerSeq())
{
return badData(
"Invalid ledger sequence " + std::to_string(ledgerSeq));
}
// Check if within a reasonable range
using namespace std::chrono_literals;
if (app_.getLedgerMaster().getValidatedLedgerAge() <= 10s &&
ledgerSeq > app_.getLedgerMaster().getValidLedgerIndex() + 10)
{
return badData(
"Invalid ledger sequence " + std::to_string(ledgerSeq));
}
}
} }
// Verify ledger info type
if (m->type() < protocol::liBASE || m->type() > protocol::liTS_CANDIDATE)
return badData("Invalid ledger info type");
// Verify ledger nodes
if (m->nodes_size() <= 0 || m->nodes_size() > Tuning::maxReplyNodes)
{
return badData(
"Invalid Ledger/TXset nodes " + std::to_string(m->nodes_size()));
}
// Verify reply error
if (m->has_error() &&
(m->error() < protocol::reNO_LEDGER ||
m->error() > protocol::reBAD_REQUEST))
{
return badData("Invalid reply error");
}
// If there is a request cookie, attempt to relay the message
if (m->has_requestcookie()) if (m->has_requestcookie())
{ {
std::shared_ptr<Peer> target = if (auto peer = overlay_.findPeerByShortID(m->requestcookie()))
overlay_.findPeerByShortID(m->requestcookie());
if (target)
{ {
m->clear_requestcookie(); m->clear_requestcookie();
target->send( peer->send(std::make_shared<Message>(*m, protocol::mtLEDGER_DATA));
std::make_shared<Message>(packet, protocol::mtLEDGER_DATA));
} }
else else
{ {
JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply"; JLOG(p_journal_.info()) << "Unable to route TX/ledger data reply";
fee_ = Resource::feeUnwantedData;
} }
return; return;
} }
if (!stringIsUint256Sized(m->ledgerhash())) uint256 const ledgerHash{m->ledgerhash()};
{
JLOG(p_journal_.warn()) << "TX candidate reply with invalid hash size";
fee_ = Resource::feeInvalidRequest;
return;
}
uint256 const hash{m->ledgerhash()};
// Otherwise check if received data for a candidate transaction set
if (m->type() == protocol::liTS_CANDIDATE) if (m->type() == protocol::liTS_CANDIDATE)
{ {
// got data for a candidate transaction set std::weak_ptr<PeerImp> weak{shared_from_this()};
std::weak_ptr<PeerImp> weak = shared_from_this();
app_.getJobQueue().addJob( app_.getJobQueue().addJob(
jtTXN_DATA, "recvPeerData", [weak, hash, m](Job&) { jtTXN_DATA, "recvPeerData", [weak, ledgerHash, m](Job&) {
if (auto peer = weak.lock()) if (auto peer = weak.lock())
peer->app_.getInboundTransactions().gotData(hash, peer, m); {
peer->app_.getInboundTransactions().gotData(
ledgerHash, peer, m);
}
}); });
return; return;
} }
if (!app_.getInboundLedgers().gotLedgerData(hash, shared_from_this(), m)) // Consume the message
{ app_.getInboundLedgers().gotLedgerData(ledgerHash, shared_from_this(), m);
JLOG(p_journal_.trace()) << "Got data for unwanted ledger";
fee_ = Resource::feeUnwantedData;
}
} }
void void
@@ -2830,334 +2953,372 @@ getPeerWithLedger(
return ret; return ret;
} }
// VFALCO NOTE This function is way too big and cumbersome.
void void
PeerImp::sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData)
{
JLOG(p_journal_.trace()) << "sendLedgerBase: Base data";
Serializer s(sizeof(LedgerInfo));
addRaw(ledger->info(), s);
ledgerData.add_nodes()->set_nodedata(s.getDataPtr(), s.getLength());
auto const& stateMap{ledger->stateMap()};
if (stateMap.getHash() != beast::zero)
{
// Return account state root node if possible
Serializer root(768);
stateMap.serializeRoot(root);
ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(), root.getLength());
if (ledger->info().txHash != beast::zero)
{
auto const& txMap{ledger->txMap()};
if (txMap.getHash() != beast::zero)
{
// Return TX root node if possible
root.erase();
txMap.serializeRoot(root);
ledgerData.add_nodes()->set_nodedata(
root.getDataPtr(), root.getLength());
}
}
}
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
send(message);
}
std::shared_ptr<Ledger const>
PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m) PeerImp::getLedger(std::shared_ptr<protocol::TMGetLedger> const& m)
{ {
protocol::TMGetLedger& packet = *m; JLOG(p_journal_.trace()) << "getLedger: Ledger";
std::shared_ptr<SHAMap> shared;
SHAMap const* map = nullptr;
protocol::TMLedgerData reply;
bool fatLeaves = true;
std::shared_ptr<Ledger const> ledger; std::shared_ptr<Ledger const> ledger;
if (packet.has_requestcookie()) if (m->has_ledgerhash())
reply.set_requestcookie(packet.requestcookie());
std::string logMe;
if (packet.itype() == protocol::liTS_CANDIDATE)
{ {
// Request is for a transaction candidate set // Attempt to find ledger by hash
JLOG(p_journal_.trace()) << "GetLedger: Tx candidate set"; uint256 const ledgerHash{m->ledgerhash()};
ledger = app_.getLedgerMaster().getLedgerByHash(ledgerHash);
if (!packet.has_ledgerhash() || if (!ledger)
!stringIsUint256Sized(packet.ledgerhash()))
{ {
charge(Resource::feeInvalidRequest); if (m->has_ledgerseq())
JLOG(p_journal_.warn()) << "GetLedger: Tx candidate set invalid";
return;
}
uint256 const txHash{packet.ledgerhash()};
shared = app_.getInboundTransactions().getSet(txHash, false);
map = shared.get();
if (!map)
{
if (packet.has_querytype() && !packet.has_requestcookie())
{ {
JLOG(p_journal_.debug()) << "GetLedger: Routing Tx set request"; // Attempt to find ledger by sequence in the shard store
if (auto shards = app_.getShardStore())
if (auto const v = getPeerWithTree(overlay_, txHash, this))
{ {
packet.set_requestcookie(id()); if (m->ledgerseq() >= shards->earliestLedgerSeq())
v->send(std::make_shared<Message>( {
packet, protocol::mtGET_LEDGER)); ledger =
return; shards->fetchLedger(ledgerHash, m->ledgerseq());
} }
JLOG(p_journal_.info()) << "GetLedger: Route TX set failed";
return;
}
JLOG(p_journal_.debug()) << "GetLedger: Can't provide map ";
charge(Resource::feeInvalidRequest);
return;
}
reply.set_ledgerseq(0);
reply.set_ledgerhash(txHash.begin(), txHash.size());
reply.set_type(protocol::liTS_CANDIDATE);
fatLeaves = false; // We'll already have most transactions
}
else
{
if (send_queue_.size() >= Tuning::dropSendQueue)
{
JLOG(p_journal_.debug()) << "GetLedger: Large send queue";
return;
}
if (app_.getFeeTrack().isLoadedLocal() && !cluster())
{
JLOG(p_journal_.debug()) << "GetLedger: Too busy";
return;
}
// Figure out what ledger they want
JLOG(p_journal_.trace()) << "GetLedger: Received";
if (packet.has_ledgerhash())
{
if (!stringIsUint256Sized(packet.ledgerhash()))
{
charge(Resource::feeInvalidRequest);
JLOG(p_journal_.warn()) << "GetLedger: Invalid request";
return;
}
uint256 const ledgerhash{packet.ledgerhash()};
logMe += "LedgerHash:";
logMe += to_string(ledgerhash);
ledger = app_.getLedgerMaster().getLedgerByHash(ledgerhash);
if (!ledger && packet.has_ledgerseq())
{
if (auto shardStore = app_.getShardStore())
{
auto seq = packet.ledgerseq();
if (seq >= shardStore->earliestLedgerSeq())
ledger = shardStore->fetchLedger(ledgerhash, seq);
} }
} }
if (!ledger) if (!ledger)
{ {
JLOG(p_journal_.trace()) JLOG(p_journal_.trace())
<< "GetLedger: Don't have " << ledgerhash; << "getLedger: Don't have ledger with hash " << ledgerHash;
}
if (!ledger && if (m->has_querytype() && !m->has_requestcookie())
(packet.has_querytype() && !packet.has_requestcookie()))
{
// We don't have the requested ledger
// Search for a peer who might
auto const v = getPeerWithLedger(
overlay_,
ledgerhash,
packet.has_ledgerseq() ? packet.ledgerseq() : 0,
this);
if (!v)
{ {
JLOG(p_journal_.trace()) << "GetLedger: Cannot route"; // Attempt to relay the request to a peer
return; if (auto const peer = getPeerWithLedger(
} overlay_,
ledgerHash,
m->has_ledgerseq() ? m->ledgerseq() : 0,
this))
{
m->set_requestcookie(id());
peer->send(std::make_shared<Message>(
*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug())
<< "getLedger: Request relayed to peer";
return ledger;
}
packet.set_requestcookie(id()); JLOG(p_journal_.trace())
v->send( << "getLedger: Failed to find peer to relay request";
std::make_shared<Message>(packet, protocol::mtGET_LEDGER)); }
JLOG(p_journal_.debug()) << "GetLedger: Request routed";
return;
} }
} }
else if (packet.has_ledgerseq()) }
else if (m->has_ledgerseq())
{
// Attempt to find ledger by sequence
if (m->ledgerseq() < app_.getLedgerMaster().getEarliestFetch())
{ {
if (packet.ledgerseq() < app_.getLedgerMaster().getEarliestFetch()) JLOG(p_journal_.debug())
{ << "getLedger: Early ledger sequence request";
JLOG(p_journal_.debug()) << "GetLedger: Early ledger request";
return;
}
ledger = app_.getLedgerMaster().getLedgerBySeq(packet.ledgerseq());
if (!ledger)
{
JLOG(p_journal_.debug())
<< "GetLedger: Don't have " << packet.ledgerseq();
}
}
else if (packet.has_ltype() && (packet.ltype() == protocol::ltCLOSED))
{
ledger = app_.getLedgerMaster().getClosedLedger();
assert(!ledger->open());
// VFALCO ledger should never be null!
// VFALCO How can the closed ledger be open?
#if 0
if (ledger && ledger->info().open)
ledger = app_.getLedgerMaster ().getLedgerBySeq (
ledger->info().seq - 1);
#endif
} }
else else
{ {
charge(Resource::feeInvalidRequest); ledger = app_.getLedgerMaster().getLedgerBySeq(m->ledgerseq());
JLOG(p_journal_.warn()) << "GetLedger: Unknown request"; if (!ledger)
return;
}
if ((!ledger) ||
(packet.has_ledgerseq() &&
(packet.ledgerseq() != ledger->info().seq)))
{
charge(Resource::feeInvalidRequest);
if (ledger)
{ {
JLOG(p_journal_.warn()) << "GetLedger: Invalid sequence"; JLOG(p_journal_.debug())
<< "getLedger: Don't have ledger with sequence "
<< m->ledgerseq();
} }
return;
}
if (!packet.has_ledgerseq() &&
(ledger->info().seq < app_.getLedgerMaster().getEarliestFetch()))
{
JLOG(p_journal_.debug()) << "GetLedger: Early ledger request";
return;
}
// Fill out the reply
auto const lHash = ledger->info().hash;
reply.set_ledgerhash(lHash.begin(), lHash.size());
reply.set_ledgerseq(ledger->info().seq);
reply.set_type(packet.itype());
if (packet.itype() == protocol::liBASE)
{
// they want the ledger base data
JLOG(p_journal_.trace()) << "GetLedger: Base data";
Serializer nData(128);
addRaw(ledger->info(), nData);
reply.add_nodes()->set_nodedata(
nData.getDataPtr(), nData.getLength());
auto const& stateMap = ledger->stateMap();
if (stateMap.getHash() != beast::zero)
{
// return account state root node if possible
Serializer rootNode(768);
stateMap.serializeRoot(rootNode);
reply.add_nodes()->set_nodedata(
rootNode.getDataPtr(), rootNode.getLength());
if (ledger->info().txHash != beast::zero)
{
auto const& txMap = ledger->txMap();
if (txMap.getHash() != beast::zero)
{
rootNode.erase();
txMap.serializeRoot(rootNode);
reply.add_nodes()->set_nodedata(
rootNode.getDataPtr(), rootNode.getLength());
}
}
}
auto oPacket =
std::make_shared<Message>(reply, protocol::mtLEDGER_DATA);
send(oPacket);
return;
}
if (packet.itype() == protocol::liTX_NODE)
{
map = &ledger->txMap();
logMe += " TX:";
logMe += to_string(map->getHash());
}
else if (packet.itype() == protocol::liAS_NODE)
{
map = &ledger->stateMap();
logMe += " AS:";
logMe += to_string(map->getHash());
} }
} }
else if (m->has_ltype() && m->ltype() == protocol::ltCLOSED)
if (!map || (packet.nodeids_size() == 0))
{ {
JLOG(p_journal_.warn()) << "GetLedger: Can't find map or empty request"; ledger = app_.getLedgerMaster().getClosedLedger();
charge(Resource::feeInvalidRequest);
return;
} }
JLOG(p_journal_.trace()) << "GetLedger: " << logMe; if (ledger)
auto const depth = packet.has_querydepth()
? (std::min(packet.querydepth(), 3u))
: (isHighLatency() ? 2 : 1);
for (int i = 0;
(i < packet.nodeids().size() &&
(reply.nodes().size() < Tuning::maxReplyNodes));
++i)
{ {
auto const mn = deserializeSHAMapNodeID(packet.nodeids(i)); // Validate retrieved ledger sequence
auto const ledgerSeq{ledger->info().seq};
if (!mn) if (m->has_ledgerseq())
{ {
JLOG(p_journal_.warn()) << "GetLedger: Invalid node " << logMe; if (ledgerSeq != m->ledgerseq())
charge(Resource::feeBadData);
return;
}
std::vector<SHAMapNodeID> nodeIDs;
std::vector<Blob> rawNodes;
try
{
if (map->getNodeFat(*mn, nodeIDs, rawNodes, fatLeaves, depth))
{ {
assert(nodeIDs.size() == rawNodes.size()); // Do not resource charge a peer responding to a relay
JLOG(p_journal_.trace()) << "GetLedger: getNodeFat got " if (!m->has_requestcookie())
<< rawNodes.size() << " nodes"; charge(Resource::feeInvalidRequest);
std::vector<SHAMapNodeID>::iterator nodeIDIterator;
std::vector<Blob>::iterator rawNodeIterator;
for (nodeIDIterator = nodeIDs.begin(), ledger.reset();
rawNodeIterator = rawNodes.begin(); JLOG(p_journal_.warn())
nodeIDIterator != nodeIDs.end(); << "getLedger: Invalid ledger sequence " << ledgerSeq;
++nodeIDIterator, ++rawNodeIterator) }
{ }
protocol::TMLedgerNode* node = reply.add_nodes(); else if (ledgerSeq < app_.getLedgerMaster().getEarliestFetch())
node->set_nodeid(nodeIDIterator->getRawString()); {
node->set_nodedata( ledger.reset();
&rawNodeIterator->front(), rawNodeIterator->size()); JLOG(p_journal_.debug())
} << "getLedger: Early ledger sequence request " << ledgerSeq;
}
}
else
{
JLOG(p_journal_.warn()) << "getLedger: Unable to find ledger";
}
return ledger;
}
std::shared_ptr<SHAMap const>
PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
{
JLOG(p_journal_.trace()) << "getTxSet: TX set";
uint256 const txSetHash{m->ledgerhash()};
std::shared_ptr<SHAMap> shaMap{
app_.getInboundTransactions().getSet(txSetHash, false)};
if (!shaMap)
{
if (m->has_querytype() && !m->has_requestcookie())
{
// Attempt to relay the request to a peer
if (auto const peer = getPeerWithTree(overlay_, txSetHash, this))
{
m->set_requestcookie(id());
peer->send(
std::make_shared<Message>(*m, protocol::mtGET_LEDGER));
JLOG(p_journal_.debug()) << "getTxSet: Request relayed";
} }
else else
{ {
JLOG(p_journal_.warn()) JLOG(p_journal_.debug())
<< "GetLedger: getNodeFat returns false"; << "getTxSet: Failed to find relay peer";
} }
} }
catch (std::exception&) else
{ {
std::string info; JLOG(p_journal_.debug()) << "getTxSet: Failed to find TX set";
if (packet.itype() == protocol::liTS_CANDIDATE)
info = "TS candidate";
else if (packet.itype() == protocol::liBASE)
info = "Ledger base";
else if (packet.itype() == protocol::liTX_NODE)
info = "TX node";
else if (packet.itype() == protocol::liAS_NODE)
info = "AS node";
if (!packet.has_ledgerhash())
info += ", no hash specified";
JLOG(p_journal_.warn())
<< "getNodeFat( " << *mn << ") throws exception: " << info;
} }
} }
JLOG(p_journal_.info()) return shaMap;
<< "Got request for " << packet.nodeids().size() << " nodes at depth " }
<< depth << ", return " << reply.nodes().size() << " nodes";
auto oPacket = std::make_shared<Message>(reply, protocol::mtLEDGER_DATA); void
send(oPacket); PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
charge(Resource::feeMediumBurdenPeer);
std::shared_ptr<Ledger const> ledger;
std::shared_ptr<SHAMap const> sharedMap;
SHAMap const* map{nullptr};
protocol::TMLedgerData ledgerData;
bool fatLeaves{true};
auto const itype{m->itype()};
if (itype == protocol::liTS_CANDIDATE)
{
if (sharedMap = getTxSet(m); !sharedMap)
return;
map = sharedMap.get();
// Fill out the reply
ledgerData.set_ledgerseq(0);
ledgerData.set_ledgerhash(m->ledgerhash());
ledgerData.set_type(protocol::liTS_CANDIDATE);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
// We'll already have most transactions
fatLeaves = false;
}
else
{
if (send_queue_.size() >= Tuning::dropSendQueue)
{
JLOG(p_journal_.debug())
<< "processLedgerRequest: Large send queue";
return;
}
if (app_.getFeeTrack().isLoadedLocal() && !cluster())
{
JLOG(p_journal_.debug()) << "processLedgerRequest: Too busy";
return;
}
if (ledger = getLedger(m); !ledger)
return;
// Fill out the reply
auto const ledgerHash{ledger->info().hash};
ledgerData.set_ledgerhash(ledgerHash.begin(), ledgerHash.size());
ledgerData.set_ledgerseq(ledger->info().seq);
ledgerData.set_type(itype);
if (m->has_requestcookie())
ledgerData.set_requestcookie(m->requestcookie());
switch (itype)
{
case protocol::liBASE:
sendLedgerBase(ledger, ledgerData);
return;
case protocol::liTX_NODE:
map = &ledger->txMap();
JLOG(p_journal_.trace()) << "processLedgerRequest: TX map hash "
<< to_string(map->getHash());
break;
case protocol::liAS_NODE:
map = &ledger->stateMap();
JLOG(p_journal_.trace())
<< "processLedgerRequest: Account state map hash "
<< to_string(map->getHash());
break;
default:
// This case should not be possible here
JLOG(p_journal_.error())
<< "processLedgerRequest: Invalid ledger info type";
return;
}
}
if (!map)
{
JLOG(p_journal_.warn()) << "processLedgerRequest: Unable to find map";
return;
}
// Add requested node data to reply
if (m->nodeids_size() > 0)
{
auto const queryDepth{
m->has_querydepth() ? m->querydepth() : (isHighLatency() ? 2 : 1)};
std::vector<SHAMapNodeID> nodeIds;
std::vector<Blob> rawNodes;
for (int i = 0; i < m->nodeids_size() &&
ledgerData.nodes_size() < Tuning::maxReplyNodes;
++i)
{
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
nodeIds.clear();
rawNodes.clear();
try
{
if (map->getNodeFat(
*shaMapNodeId,
nodeIds,
rawNodes,
fatLeaves,
queryDepth))
{
assert(nodeIds.size() == rawNodes.size());
JLOG(p_journal_.trace())
<< "processLedgerRequest: getNodeFat got "
<< rawNodes.size() << " nodes";
auto rawNodeIter{rawNodes.begin()};
for (auto const& nodeId : nodeIds)
{
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
node->set_nodeid(nodeId.getRawString());
node->set_nodedata(
&rawNodeIter->front(), rawNodeIter->size());
++rawNodeIter;
}
}
else
{
JLOG(p_journal_.warn())
<< "processLedgerRequest: getNodeFat returns false";
}
}
catch (std::exception& e)
{
std::string info;
switch (itype)
{
case protocol::liBASE:
// This case should not be possible here
info = "Ledger base";
break;
case protocol::liTX_NODE:
info = "TX node";
break;
case protocol::liAS_NODE:
info = "AS node";
break;
case protocol::liTS_CANDIDATE:
info = "TS candidate";
break;
default:
info = "Invalid";
break;
}
if (!m->has_ledgerhash())
info += ", no hash specified";
JLOG(p_journal_.error())
<< "processLedgerRequest: getNodeFat with nodeId "
<< *shaMapNodeId << " and ledger info type " << info
<< " throws exception: " << e.what();
}
}
JLOG(p_journal_.info())
<< "processLedgerRequest: Got request for " << m->nodeids_size()
<< " nodes at depth " << queryDepth << ", return "
<< ledgerData.nodes_size() << " nodes";
}
auto message{
std::make_shared<Message>(ledgerData, protocol::mtLEDGER_DATA)};
send(message);
} }
int int

View File

@@ -46,6 +46,7 @@
namespace ripple { namespace ripple {
struct ValidatorBlobInfo; struct ValidatorBlobInfo;
class SHAMap;
class PeerImp : public Peer, class PeerImp : public Peer,
public std::enable_shared_from_this<PeerImp>, public std::enable_shared_from_this<PeerImp>,
@@ -564,7 +565,18 @@ private:
std::shared_ptr<protocol::TMValidation> const& packet); std::shared_ptr<protocol::TMValidation> const& packet);
void void
getLedger(std::shared_ptr<protocol::TMGetLedger> const& packet); sendLedgerBase(
std::shared_ptr<Ledger const> const& ledger,
protocol::TMLedgerData& ledgerData);
std::shared_ptr<Ledger const>
getLedger(std::shared_ptr<protocol::TMGetLedger> const& m);
std::shared_ptr<SHAMap const>
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
}; };
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------

View File

@@ -54,6 +54,9 @@ enum {
/** How often we check for idle peers (seconds) */ /** How often we check for idle peers (seconds) */
checkIdlePeers = 4, checkIdlePeers = 4,
/** The maximum number of levels to search */
maxQueryDepth = 3,
}; };
/** Size of buffer used to read from the socket. */ /** Size of buffer used to read from the socket. */