mirror of
https://github.com/XRPLF/rippled.git
synced 2026-06-02 16:26:48 +00:00
- Drop xrpl.node.amendment_blocked / xrpl.node.server_state from telemetry surface (constants in SpanNames.h, two filters in tempo.yaml). Operators read the same data via server_info / server_state RPC; OTel SDK 1.18.0 cannot refresh resource attrs at runtime so resource-level emission was not viable either. - Namespace all pathfind span attributes under pathfind_* (underscore form per Phase 1c rule 5). Renames in PathFindSpanNames.h and call sites in PathRequest.cpp, PathRequestManager.cpp, plus the rule-5 retention xrpl.pathfind.ledger_index -> pathfind_ledger_index. - Wire pathfind_source_account / pathfind_dest_account on pathfind.request in doPathFind / doRipplePathFind handlers (only when present + string). - Collapse per-asset pathfind.discover / pathfind.rank spans into one pathfind.discover hoisted around the per-source-asset loop in PathRequest::findPaths. Span count goes from 2N to 1 per RPC call; per-asset breakdown traded for bounded storage and cardinality. Trade-off documented inline. - Fix pathfind_num_paths semantics: now sums getBestPaths().size() across the loop (paths actually returned) instead of the maxPaths input cap. - PathRequestManager::updateAll: move span creation after the locked requests_ snapshot, early-return when no active subscriptions exist (avoids empty span on every ledger close), set pathfind_num_requests = requests.size(). - Update Phase2_taskList.md and 02-design-decisions.md to match. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
316 lines
10 KiB
C++
316 lines
10 KiB
C++
#include <xrpld/rpc/detail/PathRequestManager.h>
|
|
|
|
#include <xrpld/app/ledger/LedgerMaster.h>
|
|
#include <xrpld/app/main/Application.h>
|
|
#include <xrpld/rpc/detail/AssetCache.h>
|
|
#include <xrpld/rpc/detail/PathFindSpanNames.h>
|
|
#include <xrpld/rpc/detail/PathRequest.h>
|
|
|
|
#include <xrpl/basics/Log.h>
|
|
#include <xrpl/core/Job.h>
|
|
#include <xrpl/core/JobQueue.h>
|
|
#include <xrpl/json/json_value.h>
|
|
#include <xrpl/ledger/ReadView.h>
|
|
#include <xrpl/protocol/ErrorCodes.h>
|
|
#include <xrpl/protocol/RPCErr.h>
|
|
#include <xrpl/protocol/jss.h>
|
|
#include <xrpl/resource/Consumer.h>
|
|
#include <xrpl/server/InfoSub.h>
|
|
#include <xrpl/telemetry/SpanGuard.h>
|
|
|
|
#include <algorithm>
|
|
#include <cstdint>
|
|
#include <functional>
|
|
#include <memory>
|
|
#include <mutex>
|
|
#include <utility>
|
|
#include <vector>
|
|
|
|
namespace xrpl {
|
|
|
|
/** Get the current AssetCache, updating it if necessary.
|
|
Get the correct ledger to use.
|
|
*/
|
|
std::shared_ptr<AssetCache>
|
|
PathRequestManager::getAssetCache(std::shared_ptr<ReadView const> const& ledger, bool authoritative)
|
|
{
|
|
std::lock_guard const sl(mLock);
|
|
|
|
auto assetCache = assetCache_.lock();
|
|
|
|
std::uint32_t const lineSeq = assetCache ? assetCache->getLedger()->seq() : 0;
|
|
std::uint32_t const lgrSeq = ledger->seq();
|
|
JLOG(mJournal.debug()) << "getAssetCache has cache for " << lineSeq << ", considering "
|
|
<< lgrSeq;
|
|
|
|
if ((lineSeq == 0) || // no ledger
|
|
(authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
|
|
(authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
|
|
(lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
|
|
{
|
|
JLOG(mJournal.debug()) << "getAssetCache creating new cache for " << lgrSeq;
|
|
// Assign to the local before the member, because the member is a
|
|
// weak_ptr, and will immediately discard it if there are no other
|
|
// references.
|
|
assetCache_ = assetCache =
|
|
std::make_shared<AssetCache>(ledger, app_.getJournal("AssetCache"));
|
|
}
|
|
return assetCache;
|
|
}
|
|
|
|
void
|
|
PathRequestManager::updateAll(std::shared_ptr<ReadView const> const& inLedger)
|
|
{
|
|
auto event = app_.getJobQueue().makeLoadEvent(jtPATH_FIND, "PathRequest::updateAll");
|
|
|
|
std::vector<PathRequest::wptr> requests;
|
|
std::shared_ptr<AssetCache> cache;
|
|
|
|
// Get the ledger and cache we should be using
|
|
{
|
|
std::lock_guard const sl(mLock);
|
|
requests = requests_;
|
|
cache = getAssetCache(inLedger, true);
|
|
}
|
|
|
|
// updateAll runs on every ledger close; skip span emission entirely when
|
|
// there are no active path subscriptions to avoid a steady stream of empty
|
|
// spans at mainnet close cadence.
|
|
if (requests.empty())
|
|
return;
|
|
|
|
using namespace telemetry;
|
|
auto span = SpanGuard::span(
|
|
TraceCategory::Rpc, pathfind_span::prefix::pathfind, pathfind_span::op::updateAll);
|
|
span.setAttribute(pathfind_span::attr::ledgerIndex, static_cast<int64_t>(inLedger->seq()));
|
|
span.setAttribute(pathfind_span::attr::numRequests, static_cast<int64_t>(requests.size()));
|
|
|
|
bool newRequests = app_.getLedgerMaster().isNewPathRequest();
|
|
bool mustBreak = false;
|
|
|
|
JLOG(mJournal.trace()) << "updateAll seq=" << cache->getLedger()->seq() << ", "
|
|
<< requests.size() << " requests";
|
|
|
|
int processed = 0, removed = 0;
|
|
|
|
auto getSubscriber = [](PathRequest::pointer const& request) -> InfoSub::pointer {
|
|
if (auto ipSub = request->getSubscriber(); ipSub && ipSub->getRequest() == request)
|
|
{
|
|
return ipSub;
|
|
}
|
|
request->doAborting();
|
|
return nullptr;
|
|
};
|
|
|
|
do
|
|
{
|
|
JLOG(mJournal.trace()) << "updateAll looping";
|
|
for (auto const& wr : requests)
|
|
{
|
|
if (app_.getJobQueue().isStopping())
|
|
break;
|
|
|
|
auto request = wr.lock();
|
|
bool remove = true;
|
|
JLOG(mJournal.trace()) << "updateAll request " << (request ? "" : "not ") << "found";
|
|
|
|
if (request)
|
|
{
|
|
auto continueCallback = [&getSubscriber, &request]() {
|
|
// This callback is used by doUpdate to determine whether to
|
|
// continue working. If getSubscriber returns null, that
|
|
// indicates that this request is no longer relevant.
|
|
return (bool)getSubscriber(request);
|
|
};
|
|
if (!request->needsUpdate(newRequests, cache->getLedger()->seq()))
|
|
{
|
|
remove = false;
|
|
}
|
|
else
|
|
{
|
|
if (auto ipSub = getSubscriber(request))
|
|
{
|
|
if (!ipSub->getConsumer().warn())
|
|
{
|
|
// Release the shared ptr to the subscriber so that
|
|
// it can be freed if the client disconnects, and
|
|
// thus fail to lock later.
|
|
ipSub.reset();
|
|
Json::Value update = request->doUpdate(cache, false, continueCallback);
|
|
request->updateComplete();
|
|
update[jss::type] = "path_find";
|
|
if ((ipSub = getSubscriber(request)))
|
|
{
|
|
ipSub->send(update, false);
|
|
remove = false;
|
|
++processed;
|
|
}
|
|
}
|
|
}
|
|
else if (request->hasCompletion())
|
|
{
|
|
// One-shot request with completion function
|
|
request->doUpdate(cache, false);
|
|
request->updateComplete();
|
|
++processed;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (remove)
|
|
{
|
|
std::lock_guard const sl(mLock);
|
|
|
|
// Remove any dangling weak pointers or weak
|
|
// pointers that refer to this path request.
|
|
auto ret = std::ranges::remove_if(requests_, [&removed, &request](auto const& wl) {
|
|
auto r = wl.lock();
|
|
|
|
if (r && r != request)
|
|
return false;
|
|
++removed;
|
|
return true;
|
|
});
|
|
|
|
requests_.erase(ret.begin(), ret.end());
|
|
}
|
|
|
|
mustBreak = !newRequests && app_.getLedgerMaster().isNewPathRequest();
|
|
|
|
// We weren't handling new requests and then
|
|
// there was a new request
|
|
if (mustBreak)
|
|
break;
|
|
}
|
|
|
|
if (mustBreak)
|
|
{ // a new request came in while we were working
|
|
newRequests = true;
|
|
}
|
|
else if (newRequests)
|
|
{ // we only did new requests, so we always need a last pass
|
|
newRequests = app_.getLedgerMaster().isNewPathRequest();
|
|
}
|
|
else
|
|
{ // if there are no new requests, we are done
|
|
newRequests = app_.getLedgerMaster().isNewPathRequest();
|
|
if (!newRequests)
|
|
break;
|
|
}
|
|
|
|
// Hold on to the line cache until after the lock is released, so it can
|
|
// be destroyed outside of the lock
|
|
std::shared_ptr<AssetCache> lastCache;
|
|
{
|
|
// Get the latest requests, cache, and ledger for next pass
|
|
std::lock_guard const sl(mLock);
|
|
|
|
if (requests_.empty())
|
|
break;
|
|
requests = requests_;
|
|
lastCache = cache;
|
|
cache = getAssetCache(cache->getLedger(), false);
|
|
}
|
|
} while (!app_.getJobQueue().isStopping());
|
|
|
|
JLOG(mJournal.debug()) << "updateAll complete: " << processed << " processed and " << removed
|
|
<< " removed";
|
|
}
|
|
|
|
bool
|
|
PathRequestManager::requestsPending() const
|
|
{
|
|
std::lock_guard const sl(mLock);
|
|
return !requests_.empty();
|
|
}
|
|
|
|
void
|
|
PathRequestManager::insertPathRequest(PathRequest::pointer const& req)
|
|
{
|
|
std::lock_guard const sl(mLock);
|
|
|
|
// Insert after any older unserviced requests but before
|
|
// any serviced requests
|
|
auto ret = std::ranges::find_if(requests_, [](auto const& wl) {
|
|
auto r = wl.lock();
|
|
|
|
// We come before handled requests
|
|
return r && !r->isNew();
|
|
});
|
|
|
|
requests_.emplace(ret, req);
|
|
}
|
|
|
|
// Make a new-style path_find request
|
|
Json::Value
|
|
PathRequestManager::makePathRequest(
|
|
std::shared_ptr<InfoSub> const& subscriber,
|
|
std::shared_ptr<ReadView const> const& inLedger,
|
|
Json::Value const& requestJson)
|
|
{
|
|
auto req = std::make_shared<PathRequest>(app_, subscriber, ++mLastIdentifier, *this, mJournal);
|
|
|
|
auto [valid, jvRes] = req->doCreate(getAssetCache(inLedger, false), requestJson);
|
|
|
|
if (valid)
|
|
{
|
|
subscriber->setRequest(req);
|
|
insertPathRequest(req);
|
|
app_.getLedgerMaster().newPathRequest();
|
|
}
|
|
return std::move(jvRes);
|
|
}
|
|
|
|
// Make an old-style ripple_path_find request
|
|
Json::Value
|
|
PathRequestManager::makeLegacyPathRequest(
|
|
PathRequest::pointer& req,
|
|
std::function<void(void)> completion,
|
|
Resource::Consumer& consumer,
|
|
std::shared_ptr<ReadView const> const& inLedger,
|
|
Json::Value const& request)
|
|
{
|
|
// This assignment must take place before the
|
|
// completion function is called
|
|
req = std::make_shared<PathRequest>(
|
|
app_, completion, consumer, ++mLastIdentifier, *this, mJournal);
|
|
|
|
auto [valid, jvRes] = req->doCreate(getAssetCache(inLedger, false), request);
|
|
|
|
if (!valid)
|
|
{
|
|
req.reset();
|
|
}
|
|
else
|
|
{
|
|
insertPathRequest(req);
|
|
if (!app_.getLedgerMaster().newPathRequest())
|
|
{
|
|
// The newPathRequest failed. Tell the caller.
|
|
jvRes = rpcError(rpcTOO_BUSY);
|
|
req.reset();
|
|
}
|
|
}
|
|
|
|
return std::move(jvRes);
|
|
}
|
|
|
|
Json::Value
|
|
PathRequestManager::doLegacyPathRequest(
|
|
Resource::Consumer& consumer,
|
|
std::shared_ptr<ReadView const> const& inLedger,
|
|
Json::Value const& request)
|
|
{
|
|
auto cache = std::make_shared<AssetCache>(inLedger, app_.getJournal("AssetCache"));
|
|
|
|
auto req =
|
|
std::make_shared<PathRequest>(app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal);
|
|
|
|
auto [valid, jvRes] = req->doCreate(cache, request);
|
|
if (valid)
|
|
jvRes = req->doUpdate(cache, false);
|
|
return std::move(jvRes);
|
|
}
|
|
|
|
} // namespace xrpl
|