#include #include #include #include #include #include #include #include namespace xrpl { /** Get the current RippleLineCache, updating it if necessary. Get the correct ledger to use. */ std::shared_ptr PathRequests::getLineCache( std::shared_ptr const& ledger, bool authoritative) { std::lock_guard sl(mLock); auto lineCache = lineCache_.lock(); std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0; std::uint32_t const lgrSeq = ledger->seq(); JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq << ", considering " << lgrSeq; if ((lineSeq == 0) || // no ledger (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger (authoritative && ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason { JLOG(mJournal.debug()) << "getLineCache creating new cache for " << lgrSeq; // Assign to the local before the member, because the member is a // weak_ptr, and will immediately discard it if there are no other // references. lineCache_ = lineCache = std::make_shared( ledger, registry_.journal("RippleLineCache")); } return lineCache; } void PathRequests::updateAll(std::shared_ptr const& inLedger) { auto event = registry_.getJobQueue().makeLoadEvent( jtPATH_FIND, "PathRequest::updateAll"); std::vector requests; std::shared_ptr cache; // Get the ledger and cache we should be using { std::lock_guard sl(mLock); requests = requests_; cache = getLineCache(inLedger, true); } bool newRequests = registry_.getLedgerMaster().isNewPathRequest(); bool mustBreak = false; JLOG(mJournal.trace()) << "updateAll seq=" << cache->getLedger()->seq() << ", " << requests.size() << " requests"; int processed = 0, removed = 0; auto getSubscriber = [](PathRequest::pointer const& request) -> InfoSub::pointer { if (auto ipSub = request->getSubscriber(); ipSub && ipSub->getRequest() == request) { return ipSub; } request->doAborting(); return nullptr; }; do { JLOG(mJournal.trace()) << "updateAll looping"; for (auto const& wr : requests) { if (registry_.getJobQueue().isStopping()) break; auto request = wr.lock(); bool remove = true; JLOG(mJournal.trace()) << "updateAll request " << (request ? "" : "not ") << "found"; if (request) { auto continueCallback = [&getSubscriber, &request]() { // This callback is used by doUpdate to determine whether to // continue working. If getSubscriber returns null, that // indicates that this request is no longer relevant. return (bool)getSubscriber(request); }; if (!request->needsUpdate( newRequests, cache->getLedger()->seq())) remove = false; else { if (auto ipSub = getSubscriber(request)) { if (!ipSub->getConsumer().warn()) { // Release the shared ptr to the subscriber so that // it can be freed if the client disconnects, and // thus fail to lock later. ipSub.reset(); Json::Value update = request->doUpdate( cache, false, continueCallback); request->updateComplete(); update[jss::type] = "path_find"; if ((ipSub = getSubscriber(request))) { ipSub->send(update, false); remove = false; ++processed; } } } else if (request->hasCompletion()) { // One-shot request with completion function request->doUpdate(cache, false); request->updateComplete(); ++processed; } } } if (remove) { std::lock_guard sl(mLock); // Remove any dangling weak pointers or weak // pointers that refer to this path request. auto ret = std::remove_if( requests_.begin(), requests_.end(), [&removed, &request](auto const& wl) { auto r = wl.lock(); if (r && r != request) return false; ++removed; return true; }); requests_.erase(ret, requests_.end()); } mustBreak = !newRequests && registry_.getLedgerMaster().isNewPathRequest(); // We weren't handling new requests and then // there was a new request if (mustBreak) break; } if (mustBreak) { // a new request came in while we were working newRequests = true; } else if (newRequests) { // we only did new requests, so we always need a last pass newRequests = registry_.getLedgerMaster().isNewPathRequest(); } else { // if there are no new requests, we are done newRequests = registry_.getLedgerMaster().isNewPathRequest(); if (!newRequests) break; } // Hold on to the line cache until after the lock is released, so it can // be destroyed outside of the lock std::shared_ptr lastCache; { // Get the latest requests, cache, and ledger for next pass std::lock_guard sl(mLock); if (requests_.empty()) break; requests = requests_; lastCache = cache; cache = getLineCache(cache->getLedger(), false); } } while (!registry_.getJobQueue().isStopping()); JLOG(mJournal.debug()) << "updateAll complete: " << processed << " processed and " << removed << " removed"; } bool PathRequests::requestsPending() const { std::lock_guard sl(mLock); return !requests_.empty(); } void PathRequests::insertPathRequest(PathRequest::pointer const& req) { std::lock_guard sl(mLock); // Insert after any older unserviced requests but before // any serviced requests auto ret = std::find_if(requests_.begin(), requests_.end(), [](auto const& wl) { auto r = wl.lock(); // We come before handled requests return r && !r->isNew(); }); requests_.emplace(ret, req); } // Make a new-style path_find request Json::Value PathRequests::makePathRequest( std::shared_ptr const& subscriber, std::shared_ptr const& inLedger, Json::Value const& requestJson) { auto req = std::make_shared( setup_, registry_, subscriber, ++mLastIdentifier, *this, mJournal); auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), requestJson); if (valid) { subscriber->setRequest(req); insertPathRequest(req); registry_.getLedgerMaster().newPathRequest(); } return std::move(jvRes); } // Make an old-style ripple_path_find request Json::Value PathRequests::makeLegacyPathRequest( PathRequest::pointer& req, std::function completion, Resource::Consumer& consumer, std::shared_ptr const& inLedger, Json::Value const& request) { // This assignment must take place before the // completion function is called req = std::make_shared( setup_, registry_, completion, consumer, ++mLastIdentifier, *this, mJournal); auto [valid, jvRes] = req->doCreate(getLineCache(inLedger, false), request); if (!valid) { req.reset(); } else { insertPathRequest(req); if (!registry_.getLedgerMaster().newPathRequest()) { // The newPathRequest failed. Tell the caller. jvRes = rpcError(rpcTOO_BUSY); req.reset(); } } return std::move(jvRes); } Json::Value PathRequests::doLegacyPathRequest( Resource::Consumer& consumer, std::shared_ptr const& inLedger, Json::Value const& request) { auto cache = std::make_shared( inLedger, registry_.journal("RippleLineCache")); auto req = std::make_shared( setup_, registry_, [] {}, consumer, ++mLastIdentifier, *this, mJournal); auto [valid, jvRes] = req->doCreate(cache, request); if (valid) jvRes = req->doUpdate(cache, false); return std::move(jvRes); } } // namespace xrpl