Compare commits

...

6 Commits

Author SHA1 Message Date
Bart
77aa8715f0 Improve log message 2026-06-06 19:06:31 -04:00
Bart
e67a980849 Improve comment 2026-06-06 18:56:19 -04:00
Bart
5771c38406 Limit number of requested nodes to handle 2026-06-06 18:52:14 -04:00
Bart
ae1b5b6bac Post charge to peer strand 2026-06-06 17:21:19 -04:00
Bart
ecd0136844 Improve log message 2026-06-06 09:29:26 -04:00
Bart
02f20331d5 refactor: Deserialize received nodes once and only in job queue 2026-06-06 09:17:26 -04:00
2 changed files with 58 additions and 37 deletions

View File

@@ -61,6 +61,7 @@
#include <xrpl/server/Handoff.h>
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMap.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <xrpl/tx/apply.h>
@@ -1493,23 +1494,12 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Verify ledger node IDs
if (itype != protocol::liBASE)
// Verify ledger node counts. Full parsing of the node IDs is deferred to the job, so the I/O
// thread is not burdened with SHAMapNodeID deserialization for every TMGetLedger message.
if (itype != protocol::liBASE && m->nodeids_size() <= 0)
{
if (m->nodeids_size() <= 0)
{
badData("Invalid ledger node IDs");
return;
}
for (auto const& nodeId : m->nodeids())
{
if (deserializeSHAMapNodeID(nodeId) == std::nullopt)
{
badData("Invalid SHAMap node ID");
return;
}
}
badData("Invalid ledger node IDs");
return;
}
// Verify query type
@@ -1529,11 +1519,40 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Queue a job to process the request
// Queue a job to process the request.
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(JtLedgerReq, "RcvGetLedger", [weak, m]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
app_.getJobQueue().addJob(JtLedgerReq, "RcvGetLedger", [weak, m, itype]() {
auto peer = weak.lock();
if (!peer)
return;
std::vector<SHAMapNodeID> nodeIDs;
if (itype != protocol::liBASE)
{
nodeIDs.reserve(std::min(m->nodeids_size(), Tuning::kSoftMaxReplyNodes));
for (auto const& nodeId : m->nodeids())
{
if (nodeIDs.size() >= Tuning::kSoftMaxReplyNodes)
{
// Charge the peer for sending too many node IDs, but continue processing the
// received node IDs up to the limit. If the request is legitimate then at least
// they will get a response and won't have to resend these nodes in their next
// request.
peer->charge(
Resource::kFeeModerateBurdenPeer, "TMGetLedger: too many node IDs");
break;
}
auto parsed = deserializeSHAMapNodeID(nodeId);
if (!parsed)
{
peer->charge(Resource::kFeeInvalidData, "TMGetLedger: Invalid node ID");
return;
}
nodeIDs.push_back(std::move(*parsed));
}
}
peer->processLedgerRequest(m, std::move(nodeIDs));
});
}
@@ -3242,7 +3261,9 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
}
void
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
std::vector<SHAMapNodeID> nodeIDs)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
@@ -3327,26 +3348,23 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
}
// Add requested node data to reply
if (m->nodeids_size() > 0)
if (!nodeIDs.empty())
{
std::uint32_t const defaultDepth = isHighLatency() ? 2 : 1;
auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
std::vector<std::pair<SHAMapNodeID, Blob>> data;
for (int i = 0;
i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::kSoftMaxReplyNodes;
++i)
data.reserve(Tuning::kSoftMaxReplyNodes);
for (auto const& nodeID : nodeIDs)
{
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
if (ledgerData.nodes_size() >= Tuning::kSoftMaxReplyNodes)
break;
data.clear();
data.reserve(Tuning::kSoftMaxReplyNodes);
try
{
// NOLINTNEXTLINE(bugprone-unchecked-optional-access) nodeids checked in onGetLedger
if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
if (map->getNodeFat(nodeID, data, fatLeaves, queryDepth))
{
JLOG(pJournal_.trace())
<< "processLedgerRequest: getNodeFat got " << data.size() << " nodes";
@@ -3355,9 +3373,9 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
{
if (ledgerData.nodes_size() >= Tuning::kHardMaxReplyNodes)
break;
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
node->set_nodeid(d.first.getRawString());
node->set_nodedata(d.second.data(), d.second.size());
protocol::TMLedgerNode* ledgerNode{ledgerData.add_nodes()};
ledgerNode->set_nodeid(d.first.getRawString());
ledgerNode->set_nodedata(d.second.data(), d.second.size());
}
}
else
@@ -3396,14 +3414,14 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
info += ", no hash specified";
JLOG(pJournal_.warn())
<< "processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
<< "processLedgerRequest: getNodeFat with nodeId " << nodeID
<< " and ledger info type " << info << " throws exception: " << e.what();
}
}
JLOG(pJournal_.info()) << "processLedgerRequest: Got request for " << m->nodeids_size()
<< " nodes at depth " << queryDepth << ", return "
<< ledgerData.nodes_size() << " nodes";
<< " node IDs (processed " << nodeIDs.size() << ") at depth "
<< queryDepth << ", return " << ledgerData.nodes_size() << " nodes";
}
if (ledgerData.nodes_size() == 0)

View File

@@ -14,6 +14,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <boost/circular_buffer.hpp>
#include <boost/endian/conversion.hpp>
@@ -623,7 +624,9 @@ private:
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
std::vector<SHAMapNodeID> nodeIDs);
};
//------------------------------------------------------------------------------