Compare commits

..

2 Commits

Author SHA1 Message Date
Bart
ecd0136844 Improve log message 2026-06-06 09:29:26 -04:00
Bart
02f20331d5 refactor: Deserialize received nodes once and only in job queue 2026-06-06 09:17:26 -04:00
4 changed files with 68 additions and 64 deletions

View File

@@ -65,13 +65,9 @@ public:
/**
* @brief called for each ledger entry in the current transaction.
*
* @param isDelete true if the SLE is being deleted.
* @param before ledger entry before modification by the
* transaction.
* @param after ledger entry after modification by the transaction.
* `after` IS NEVER NULL. `isDelete` is the only correct way to check for deletions.
* Check for null defensively, but do not make any logic decisions based on whether `after` is
* set, because it will always be set.
* @param isDelete true if the SLE is being deleted
* @param before ledger entry before modification by the transaction
* @param after ledger entry after modification by the transaction
*/
void
visitEntry(bool isDelete, SLE::const_ref before, SLE::const_ref after);

View File

@@ -135,28 +135,24 @@ XRPNotCreated::visitEntry(bool isDelete, SLE::const_ref before, SLE::const_ref a
}
}
if (!after)
if (after)
{
// LCOV_EXCL_START
UNREACHABLE("xrpl::XRPNotCreated::visitEntry : after can't be null");
return;
// LCOV_EXCL_STOP
}
switch (after->getType())
{
case ltACCOUNT_ROOT:
drops_ += (*after)[sfBalance].xrp().drops();
break;
case ltPAYCHAN:
if (!isDelete)
drops_ += ((*after)[sfAmount] - (*after)[sfBalance]).xrp().drops();
break;
case ltESCROW:
if (!isDelete && isXRP((*after)[sfAmount]))
drops_ += (*after)[sfAmount].xrp().drops();
break;
default:
break;
switch (after->getType())
{
case ltACCOUNT_ROOT:
drops_ += (*after)[sfBalance].xrp().drops();
break;
case ltPAYCHAN:
if (!isDelete)
drops_ += ((*after)[sfAmount] - (*after)[sfBalance]).xrp().drops();
break;
case ltESCROW:
if (!isDelete && isXRP((*after)[sfAmount]))
drops_ += (*after)[sfAmount].xrp().drops();
break;
default:
break;
}
}
}

View File

@@ -61,6 +61,7 @@
#include <xrpl/server/Handoff.h>
#include <xrpl/server/LoadFeeTrack.h>
#include <xrpl/server/NetworkOPs.h>
#include <xrpl/shamap/SHAMap.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <xrpl/tx/apply.h>
@@ -1493,23 +1494,11 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Verify ledger node IDs
if (itype != protocol::liBASE)
// Verify ledger node IDs. Full parsing is deferred to the job.
if (itype != protocol::liBASE && m->nodeids_size() <= 0)
{
if (m->nodeids_size() <= 0)
{
badData("Invalid ledger node IDs");
return;
}
for (auto const& nodeId : m->nodeids())
{
if (deserializeSHAMapNodeID(nodeId) == std::nullopt)
{
badData("Invalid SHAMap node ID");
return;
}
}
badData("Invalid ledger node IDs");
return;
}
// Verify query type
@@ -1529,11 +1518,32 @@ PeerImp::onMessage(std::shared_ptr<protocol::TMGetLedger> const& m)
}
}
// Queue a job to process the request
// Queue a job to process the request. Full parsing of the node IDs is
// performed inside the job so the I/O thread is not burdened with
// SHAMapNodeID deserialization for every TMGetLedger message.
std::weak_ptr<PeerImp> const weak = shared_from_this();
app_.getJobQueue().addJob(JtLedgerReq, "RcvGetLedger", [weak, m]() {
if (auto peer = weak.lock())
peer->processLedgerRequest(m);
app_.getJobQueue().addJob(JtLedgerReq, "RcvGetLedger", [weak, m, itype]() {
auto peer = weak.lock();
if (!peer)
return;
std::vector<SHAMapNodeID> nodeIDs;
if (itype != protocol::liBASE)
{
nodeIDs.reserve(m->nodeids_size());
for (auto const& nodeId : m->nodeids())
{
auto parsed = deserializeSHAMapNodeID(nodeId);
if (!parsed)
{
peer->charge(Resource::kFeeInvalidData, "get_ledger invalid node ID");
return;
}
nodeIDs.push_back(std::move(*parsed));
}
}
peer->processLedgerRequest(m, std::move(nodeIDs));
});
}
@@ -3242,7 +3252,9 @@ PeerImp::getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const
}
void
PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
PeerImp::processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
std::vector<SHAMapNodeID> nodeIDs)
{
// Do not resource charge a peer responding to a relay
if (!m->has_requestcookie())
@@ -3327,26 +3339,23 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
}
// Add requested node data to reply
if (m->nodeids_size() > 0)
if (!nodeIDs.empty())
{
std::uint32_t const defaultDepth = isHighLatency() ? 2 : 1;
auto const queryDepth{m->has_querydepth() ? m->querydepth() : defaultDepth};
std::vector<std::pair<SHAMapNodeID, Blob>> data;
for (int i = 0;
i < m->nodeids_size() && ledgerData.nodes_size() < Tuning::kSoftMaxReplyNodes;
++i)
data.reserve(Tuning::kSoftMaxReplyNodes);
for (auto const& nodeID : nodeIDs)
{
auto const shaMapNodeId{deserializeSHAMapNodeID(m->nodeids(i))};
if (ledgerData.nodes_size() >= Tuning::kSoftMaxReplyNodes)
break;
data.clear();
data.reserve(Tuning::kSoftMaxReplyNodes);
try
{
// NOLINTNEXTLINE(bugprone-unchecked-optional-access) nodeids checked in onGetLedger
if (map->getNodeFat(*shaMapNodeId, data, fatLeaves, queryDepth))
if (map->getNodeFat(nodeID, data, fatLeaves, queryDepth))
{
JLOG(pJournal_.trace())
<< "processLedgerRequest: getNodeFat got " << data.size() << " nodes";
@@ -3355,9 +3364,9 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
{
if (ledgerData.nodes_size() >= Tuning::kHardMaxReplyNodes)
break;
protocol::TMLedgerNode* node{ledgerData.add_nodes()};
node->set_nodeid(d.first.getRawString());
node->set_nodedata(d.second.data(), d.second.size());
protocol::TMLedgerNode* ledgerNode{ledgerData.add_nodes()};
ledgerNode->set_nodeid(d.first.getRawString());
ledgerNode->set_nodedata(d.second.data(), d.second.size());
}
}
else
@@ -3396,12 +3405,12 @@ PeerImp::processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m)
info += ", no hash specified";
JLOG(pJournal_.warn())
<< "processLedgerRequest: getNodeFat with nodeId " << *shaMapNodeId
<< "processLedgerRequest: getNodeFat with nodeId " << nodeID
<< " and ledger info type " << info << " throws exception: " << e.what();
}
}
JLOG(pJournal_.info()) << "processLedgerRequest: Got request for " << m->nodeids_size()
JLOG(pJournal_.info()) << "processLedgerRequest: Got request for " << nodeIDs.size()
<< " nodes at depth " << queryDepth << ", return "
<< ledgerData.nodes_size() << " nodes";
}

View File

@@ -14,6 +14,7 @@
#include <xrpl/protocol/STTx.h>
#include <xrpl/protocol/STValidation.h>
#include <xrpl/resource/Fees.h>
#include <xrpl/shamap/SHAMapNodeID.h>
#include <boost/circular_buffer.hpp>
#include <boost/endian/conversion.hpp>
@@ -623,7 +624,9 @@ private:
getTxSet(std::shared_ptr<protocol::TMGetLedger> const& m) const;
void
processLedgerRequest(std::shared_ptr<protocol::TMGetLedger> const& m);
processLedgerRequest(
std::shared_ptr<protocol::TMGetLedger> const& m,
std::vector<SHAMapNodeID> nodeIDs);
};
//------------------------------------------------------------------------------