diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 739416009..378f59092 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -733,6 +733,7 @@ if (tests) src/test/basics/contract_test.cpp src/test/basics/FeeUnits_test.cpp src/test/basics/hardened_hash_test.cpp + src/test/basics/join_test.cpp src/test/basics/mulDiv_test.cpp src/test/basics/tagged_integer_test.cpp #[===============================[ diff --git a/src/ripple/app/ledger/LedgerHolder.h b/src/ripple/app/ledger/LedgerHolder.h index 449cff9ab..93d67400e 100644 --- a/src/ripple/app/ledger/LedgerHolder.h +++ b/src/ripple/app/ledger/LedgerHolder.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED #define RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED +#include #include #include @@ -35,7 +36,7 @@ namespace ripple { way the object always holds a value. We can use the genesis ledger in all cases. */ -class LedgerHolder +class LedgerHolder : public CountedObject { public: // Update the held ledger diff --git a/src/ripple/app/ledger/LedgerReplay.h b/src/ripple/app/ledger/LedgerReplay.h index 96af63c13..0365dea1b 100644 --- a/src/ripple/app/ledger/LedgerReplay.h +++ b/src/ripple/app/ledger/LedgerReplay.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED #define RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED +#include #include #include #include @@ -29,7 +30,7 @@ namespace ripple { class Ledger; class STTx; -class LedgerReplay +class LedgerReplay : public CountedObject { std::shared_ptr parent_; std::shared_ptr replay_; diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 0dd0ba1ee..cea77c621 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -261,8 +261,13 @@ LedgerMaster::getPublishedLedgerAge() std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); ret -= pubClose; ret = (ret > 0s) ? ret : 0s; + static std::chrono::seconds lastRet = -1s; - JLOG(m_journal.trace()) << "Published ledger age is " << ret.count(); + if (ret != lastRet) + { + JLOG(m_journal.trace()) << "Published ledger age is " << ret.count(); + lastRet = ret; + } return ret; } @@ -287,8 +292,13 @@ LedgerMaster::getValidatedLedgerAge() std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch(); ret -= valClose; ret = (ret > 0s) ? ret : 0s; + static std::chrono::seconds lastRet = -1s; - JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count(); + if (ret != lastRet) + { + JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count(); + lastRet = ret; + } return ret; } @@ -1483,12 +1493,14 @@ LedgerMaster::updatePaths() if (app_.getOPs().isNeedNetworkLedger()) { --mPathFindThread; + JLOG(m_journal.debug()) << "Need network ledger for updating paths"; return; } } while (!app_.getJobQueue().isStopping()) { + JLOG(m_journal.debug()) << "updatePaths running"; std::shared_ptr lastLedger; { std::lock_guard ml(m_mutex); @@ -1506,6 +1518,7 @@ LedgerMaster::updatePaths() else { // Nothing to do --mPathFindThread; + JLOG(m_journal.debug()) << "Nothing to do for updating paths"; return; } } @@ -1527,7 +1540,31 @@ LedgerMaster::updatePaths() try { - app_.getPathRequests().updateAll(lastLedger); + auto& pathRequests = app_.getPathRequests(); + { + std::lock_guard ml(m_mutex); + if (!pathRequests.requestsPending()) + { + --mPathFindThread; + JLOG(m_journal.debug()) + << "No path requests found. Nothing to do for updating " + "paths. " + << mPathFindThread << " jobs remaining"; + return; + } + } + JLOG(m_journal.debug()) << "Updating paths"; + pathRequests.updateAll(lastLedger); + + std::lock_guard ml(m_mutex); + if (!pathRequests.requestsPending()) + { + JLOG(m_journal.debug()) + << "No path requests left. No need for further updating " + "paths"; + --mPathFindThread; + return; + } } catch (SHAMapMissingNode const& mn) { @@ -1587,8 +1624,11 @@ LedgerMaster::newPFWork( const char* name, std::unique_lock&) { - if (mPathFindThread < 2) + if (mPathFindThread < 2 && app_.getPathRequests().requestsPending()) { + JLOG(m_journal.debug()) + << "newPFWork: Creating job. path find threads: " + << mPathFindThread; if (app_.getJobQueue().addJob( jtUPDATE_PF, name, [this]() { updatePaths(); })) { diff --git a/src/ripple/app/misc/CanonicalTXSet.h b/src/ripple/app/misc/CanonicalTXSet.h index d0dfd97e3..3ca217944 100644 --- a/src/ripple/app/misc/CanonicalTXSet.h +++ b/src/ripple/app/misc/CanonicalTXSet.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED #define RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED +#include #include #include #include @@ -34,7 +35,7 @@ namespace ripple { */ // VFALCO TODO rename to SortedTxSet -class CanonicalTXSet +class CanonicalTXSet : public CountedObject { private: class Key diff --git a/src/ripple/app/misc/OrderBook.h b/src/ripple/app/misc/OrderBook.h index fb96bd5c0..ce8e0db9f 100644 --- a/src/ripple/app/misc/OrderBook.h +++ b/src/ripple/app/misc/OrderBook.h @@ -20,10 +20,12 @@ #ifndef RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED #define RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED +#include + namespace ripple { /** Describes a serialized ledger entry for an order book. */ -class OrderBook +class OrderBook : public CountedObject { public: using pointer = std::shared_ptr; diff --git a/src/ripple/app/paths/PathRequest.cpp b/src/ripple/app/paths/PathRequest.cpp index adb038528..e5b15fd9d 100644 --- a/src/ripple/app/paths/PathRequest.cpp +++ b/src/ripple/app/paths/PathRequest.cpp @@ -441,7 +441,7 @@ PathRequest::parseJson(Json::Value const& jvParams) } Json::Value -PathRequest::doClose(Json::Value const&) +PathRequest::doClose() { JLOG(m_journal.debug()) << iIdentifier << " closed"; std::lock_guard sl(mLock); @@ -457,13 +457,20 @@ PathRequest::doStatus(Json::Value const&) return jvStatus; } +void +PathRequest::doAborting() const +{ + JLOG(m_journal.info()) << iIdentifier << " aborting early"; +} + std::unique_ptr const& PathRequest::getPathFinder( std::shared_ptr const& cache, hash_map>& currency_map, Currency const& currency, STAmount const& dst_amount, - int const level) + int const level, + std::function const& continueCallback) { auto i = currency_map.find(currency); if (i != currency_map.end()) @@ -477,8 +484,8 @@ PathRequest::getPathFinder( dst_amount, saSendMax, app_); - if (pathfinder->findPaths(level)) - pathfinder->computePathRanks(max_paths_); + if (pathfinder->findPaths(level, continueCallback)) + pathfinder->computePathRanks(max_paths_, continueCallback); else pathfinder.reset(); // It's a bad request - clear it. return currency_map[currency] = std::move(pathfinder); @@ -488,7 +495,8 @@ bool PathRequest::findPaths( std::shared_ptr const& cache, int const level, - Json::Value& jvArray) + Json::Value& jvArray, + std::function const& continueCallback) { auto sourceCurrencies = sciSourceCurrencies; if (sourceCurrencies.empty() && saSendMax) @@ -515,22 +523,33 @@ PathRequest::findPaths( hash_map> currency_map; for (auto const& issue : sourceCurrencies) { + if (continueCallback && !continueCallback()) + break; JLOG(m_journal.debug()) << iIdentifier << " Trying to find paths: " << STAmount(issue, 1).getFullText(); auto& pathfinder = getPathFinder( - cache, currency_map, issue.currency, dst_amount, level); + cache, + currency_map, + issue.currency, + dst_amount, + level, + continueCallback); if (!pathfinder) { - assert(false); + assert(continueCallback && !continueCallback()); JLOG(m_journal.debug()) << iIdentifier << " No paths found"; continue; } STPath fullLiquidityPath; auto ps = pathfinder->getBestPaths( - max_paths_, fullLiquidityPath, mContext[issue], issue.account); + max_paths_, + fullLiquidityPath, + mContext[issue], + issue.account, + continueCallback); mContext[issue] = ps; auto& sourceAccount = !isXRP(issue.account) @@ -628,7 +647,10 @@ PathRequest::findPaths( } Json::Value -PathRequest::doUpdate(std::shared_ptr const& cache, bool fast) +PathRequest::doUpdate( + std::shared_ptr const& cache, + bool fast, + std::function const& continueCallback) { using namespace std::chrono; JLOG(m_journal.debug()) @@ -699,7 +721,7 @@ PathRequest::doUpdate(std::shared_ptr const& cache, bool fast) JLOG(m_journal.debug()) << iIdentifier << " processing at level " << iLevel; Json::Value jvArray = Json::arrayValue; - if (findPaths(cache, iLevel, jvArray)) + if (findPaths(cache, iLevel, jvArray, continueCallback)) { bLastSuccess = jvArray.size() != 0; newStatus[jss::alternatives] = std::move(jvArray); @@ -730,7 +752,7 @@ PathRequest::doUpdate(std::shared_ptr const& cache, bool fast) } InfoSub::pointer -PathRequest::getSubscriber() +PathRequest::getSubscriber() const { return wpSubscriber.lock(); } diff --git a/src/ripple/app/paths/PathRequest.h b/src/ripple/app/paths/PathRequest.h index 704414d03..70c286d6e 100644 --- a/src/ripple/app/paths/PathRequest.h +++ b/src/ripple/app/paths/PathRequest.h @@ -43,10 +43,10 @@ class PathRequests; // Return values from parseJson <0 = invalid, >0 = valid #define PFR_PJ_INVALID -1 #define PFR_PJ_NOCHANGE 0 -#define PFR_PJ_CHANGE 1 -class PathRequest : public std::enable_shared_from_this, - public CountedObject +class PathRequest final : public InfoSubRequest, + public std::enable_shared_from_this, + public CountedObject { public: using wptr = std::weak_ptr; @@ -55,8 +55,6 @@ public: using wref = const wptr&; public: - // VFALCO TODO Break the cyclic dependency on InfoSub - // path_find semantics // Subscriber is updated PathRequest( @@ -91,15 +89,20 @@ public: doCreate(std::shared_ptr const&, Json::Value const&); Json::Value - doClose(Json::Value const&); + doClose() override; Json::Value - doStatus(Json::Value const&); + doStatus(Json::Value const&) override; + void + doAborting() const; // update jvStatus Json::Value - doUpdate(std::shared_ptr const&, bool fast); + doUpdate( + std::shared_ptr const&, + bool fast, + std::function const& continueCallback = {}); InfoSub::pointer - getSubscriber(); + getSubscriber() const; bool hasCompletion(); @@ -113,13 +116,18 @@ private: hash_map>&, Currency const&, STAmount const&, - int const); + int const, + std::function const&); /** Finds and sets a PathSet in the JSON argument. Returns false if the source currencies are inavlid. */ bool - findPaths(std::shared_ptr const&, int const, Json::Value&); + findPaths( + std::shared_ptr const&, + int const, + Json::Value&, + std::function const&); int parseJson(Json::Value const&); @@ -156,7 +164,7 @@ private: int iLevel; bool bLastSuccess; - int iIdentifier; + int const iIdentifier; std::chrono::steady_clock::time_point const created_; std::chrono::steady_clock::time_point quick_reply_; diff --git a/src/ripple/app/paths/PathRequests.cpp b/src/ripple/app/paths/PathRequests.cpp index 50e591eb1..951f55dc8 100644 --- a/src/ripple/app/paths/PathRequests.cpp +++ b/src/ripple/app/paths/PathRequests.cpp @@ -40,8 +40,12 @@ PathRequests::getLineCache( { std::lock_guard sl(mLock); - std::uint32_t lineSeq = mLineCache ? mLineCache->getLedger()->seq() : 0; - std::uint32_t lgrSeq = ledger->seq(); + auto lineCache = lineCache_.lock(); + + std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0; + std::uint32_t const lgrSeq = ledger->seq(); + JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq + << ", considering " << lgrSeq; if ((lineSeq == 0) || // no ledger (authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger @@ -49,9 +53,15 @@ PathRequests::getLineCache( ((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason (lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason { - mLineCache = std::make_shared(ledger); + JLOG(mJournal.debug()) + << "getLineCache creating new cache for " << lgrSeq; + // Assign to the local before the member, because the member is a + // weak_ptr, and will immediately discard it if there are no other + // references. + lineCache_ = lineCache = std::make_shared( + ledger, app_.journal("RippleLineCache")); } - return mLineCache; + return lineCache; } void @@ -78,8 +88,20 @@ PathRequests::updateAll(std::shared_ptr const& inLedger) int processed = 0, removed = 0; + auto getSubscriber = + [](PathRequest::pointer const& request) -> InfoSub::pointer { + if (auto ipSub = request->getSubscriber(); + ipSub && ipSub->getRequest() == request) + { + return ipSub; + } + request->doAborting(); + return nullptr; + }; + do { + JLOG(mJournal.trace()) << "updateAll looping"; for (auto const& wr : requests) { if (app_.getJobQueue().isStopping()) @@ -87,25 +109,40 @@ PathRequests::updateAll(std::shared_ptr const& inLedger) auto request = wr.lock(); bool remove = true; + JLOG(mJournal.trace()) + << "updateAll request " << (request ? "" : "not ") << "found"; if (request) { + auto continueCallback = [&getSubscriber, &request]() { + // This callback is used by doUpdate to determine whether to + // continue working. If getSubscriber returns null, that + // indicates that this request is no longer relevant. + return (bool)getSubscriber(request); + }; if (!request->needsUpdate( newRequests, cache->getLedger()->seq())) remove = false; else { - if (auto ipSub = request->getSubscriber()) + if (auto ipSub = getSubscriber(request)) { if (!ipSub->getConsumer().warn()) { - Json::Value update = - request->doUpdate(cache, false); + // Release the shared ptr to the subscriber so that + // it can be freed if the client disconnects, and + // thus fail to lock later. + ipSub.reset(); + Json::Value update = request->doUpdate( + cache, false, continueCallback); request->updateComplete(); update[jss::type] = "path_find"; - ipSub->send(update, false); - remove = false; - ++processed; + if ((ipSub = getSubscriber(request))) + { + ipSub->send(update, false); + remove = false; + ++processed; + } } } else if (request->hasCompletion()) @@ -178,6 +215,13 @@ PathRequests::updateAll(std::shared_ptr const& inLedger) << " processed and " << removed << " removed"; } +bool +PathRequests::requestsPending() const +{ + std::lock_guard sl(mLock); + return !requests_.empty(); +} + void PathRequests::insertPathRequest(PathRequest::pointer const& req) { @@ -211,7 +255,7 @@ PathRequests::makePathRequest( if (valid) { - subscriber->setPathRequest(req); + subscriber->setRequest(req); insertPathRequest(req); app_.getLedgerMaster().newPathRequest(); } @@ -258,7 +302,8 @@ PathRequests::doLegacyPathRequest( std::shared_ptr const& inLedger, Json::Value const& request) { - auto cache = std::make_shared(inLedger); + auto cache = std::make_shared( + inLedger, app_.journal("RippleLineCache")); auto req = std::make_shared( app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal); diff --git a/src/ripple/app/paths/PathRequests.h b/src/ripple/app/paths/PathRequests.h index 75b852867..db683ee4c 100644 --- a/src/ripple/app/paths/PathRequests.h +++ b/src/ripple/app/paths/PathRequests.h @@ -51,6 +51,9 @@ public: void updateAll(std::shared_ptr const& ledger); + bool + requestsPending() const; + std::shared_ptr getLineCache( std::shared_ptr const& ledger, @@ -109,11 +112,11 @@ private: std::vector requests_; // Use a RippleLineCache - std::shared_ptr mLineCache; + std::weak_ptr lineCache_; std::atomic mLastIdentifier; - std::recursive_mutex mLock; + std::recursive_mutex mutable mLock; }; } // namespace ripple diff --git a/src/ripple/app/paths/Pathfinder.cpp b/src/ripple/app/paths/Pathfinder.cpp index 548336327..158077c57 100644 --- a/src/ripple/app/paths/Pathfinder.cpp +++ b/src/ripple/app/paths/Pathfinder.cpp @@ -24,6 +24,7 @@ #include #include #include +#include #include #include #include @@ -191,8 +192,11 @@ Pathfinder::Pathfinder( } bool -Pathfinder::findPaths(int searchLevel) +Pathfinder::findPaths( + int searchLevel, + std::function const& continueCallback) { + JLOG(j_.trace()) << "findPaths start"; if (mDstAmount == beast::zero) { // No need to send zero money. @@ -316,10 +320,13 @@ Pathfinder::findPaths(int searchLevel) // Now iterate over all paths for that paymentType. for (auto const& costedPath : mPathTable[paymentType]) { + if (continueCallback && !continueCallback()) + return false; // Only use paths with at most the current search level. if (costedPath.searchLevel <= searchLevel) { - addPathsForType(costedPath.type); + JLOG(j_.trace()) << "findPaths trying payment type " << paymentType; + addPathsForType(costedPath.type, continueCallback); if (mCompletePaths.size() > PATHFINDER_MAX_COMPLETE_PATHS) break; @@ -401,7 +408,9 @@ Pathfinder::getPathLiquidity( } void -Pathfinder::computePathRanks(int maxPaths) +Pathfinder::computePathRanks( + int maxPaths, + std::function const& continueCallback) { mRemainingAmount = convertAmount(mDstAmount, convert_all_); @@ -439,7 +448,7 @@ Pathfinder::computePathRanks(int maxPaths) JLOG(j_.debug()) << "Default path causes exception"; } - rankPaths(maxPaths, mCompletePaths, mPathRanks); + rankPaths(maxPaths, mCompletePaths, mPathRanks, continueCallback); } static bool @@ -480,8 +489,11 @@ void Pathfinder::rankPaths( int maxPaths, STPathSet const& paths, - std::vector& rankedPaths) + std::vector& rankedPaths, + std::function const& continueCallback) { + JLOG(j_.trace()) << "rankPaths with " << paths.size() << " candidates, and " + << maxPaths << " maximum"; rankedPaths.clear(); rankedPaths.reserve(paths.size()); @@ -499,6 +511,8 @@ Pathfinder::rankPaths( for (int i = 0; i < paths.size(); ++i) { + if (continueCallback && !continueCallback()) + return; auto const& currentPath = paths[i]; if (!currentPath.empty()) { @@ -554,7 +568,8 @@ Pathfinder::getBestPaths( int maxPaths, STPath& fullLiquidityPath, STPathSet const& extraPaths, - AccountID const& srcIssuer) + AccountID const& srcIssuer, + std::function const& continueCallback) { JLOG(j_.debug()) << "findPaths: " << mCompletePaths.size() << " paths and " << extraPaths.size() << " extras"; @@ -567,7 +582,7 @@ Pathfinder::getBestPaths( isXRP(mSrcCurrency) || (srcIssuer == mSrcAccount); std::vector extraPathRanks; - rankPaths(maxPaths, extraPaths, extraPathRanks); + rankPaths(maxPaths, extraPaths, extraPathRanks, continueCallback); STPathSet bestPaths; @@ -582,6 +597,8 @@ Pathfinder::getBestPaths( while (pathsIterator != mPathRanks.end() || extraPathsIterator != extraPathRanks.end()) { + if (continueCallback && !continueCallback()) + break; bool usePath = false; bool useExtraPath = false; @@ -692,7 +709,8 @@ Pathfinder::getPathsOut( Currency const& currency, AccountID const& account, bool isDstCurrency, - AccountID const& dstAccount) + AccountID const& dstAccount, + std::function const& continueCallback) { Issue const issue(currency, account); @@ -755,17 +773,26 @@ void Pathfinder::addLinks( STPathSet const& currentPaths, // The paths to build from STPathSet& incompletePaths, // The set of partial paths we add to - int addFlags) + int addFlags, + std::function const& continueCallback) { JLOG(j_.debug()) << "addLink< on " << currentPaths.size() << " source(s), flags=" << addFlags; for (auto const& path : currentPaths) - addLink(path, incompletePaths, addFlags); + { + if (continueCallback && !continueCallback()) + return; + addLink(path, incompletePaths, addFlags, continueCallback); + } } STPathSet& -Pathfinder::addPathsForType(PathType const& pathType) +Pathfinder::addPathsForType( + PathType const& pathType, + std::function const& continueCallback) { + JLOG(j_.warn()) << "addPathsForType " + << CollectionAndDelimiter(pathType, ", "); // See if the set of paths for this type already exists. auto it = mPaths.find(pathType); if (it != mPaths.end()) @@ -774,13 +801,16 @@ Pathfinder::addPathsForType(PathType const& pathType) // Otherwise, if the type has no nodes, return the empty path. if (pathType.empty()) return mPaths[pathType]; + if (continueCallback && !continueCallback()) + return mPaths[{}]; // Otherwise, get the paths for the parent PathType by calling // addPathsForType recursively. PathType parentPathType = pathType; parentPathType.pop_back(); - STPathSet const& parentPaths = addPathsForType(parentPathType); + STPathSet const& parentPaths = + addPathsForType(parentPathType, continueCallback); STPathSet& pathsOut = mPaths[pathType]; JLOG(j_.debug()) << "getPaths< adding onto '" @@ -800,26 +830,38 @@ Pathfinder::addPathsForType(PathType const& pathType) break; case nt_ACCOUNTS: - addLinks(parentPaths, pathsOut, afADD_ACCOUNTS); + addLinks(parentPaths, pathsOut, afADD_ACCOUNTS, continueCallback); break; case nt_BOOKS: - addLinks(parentPaths, pathsOut, afADD_BOOKS); + addLinks(parentPaths, pathsOut, afADD_BOOKS, continueCallback); break; case nt_XRP_BOOK: - addLinks(parentPaths, pathsOut, afADD_BOOKS | afOB_XRP); + addLinks( + parentPaths, + pathsOut, + afADD_BOOKS | afOB_XRP, + continueCallback); break; case nt_DEST_BOOK: - addLinks(parentPaths, pathsOut, afADD_BOOKS | afOB_LAST); + addLinks( + parentPaths, + pathsOut, + afADD_BOOKS | afOB_LAST, + continueCallback); break; case nt_DESTINATION: // FIXME: What if a different issuer was specified on the // destination amount? // TODO(tom): what does this even mean? Should it be a JIRA? - addLinks(parentPaths, pathsOut, afADD_ACCOUNTS | afAC_LAST); + addLinks( + parentPaths, + pathsOut, + afADD_ACCOUNTS | afAC_LAST, + continueCallback); break; } @@ -890,7 +932,8 @@ void Pathfinder::addLink( const STPath& currentPath, // The path to build from STPathSet& incompletePaths, // The set of partial paths we add to - int addFlags) + int addFlags, + std::function const& continueCallback) { auto const& pathEnd = currentPath.empty() ? mSource : currentPath.back(); auto const& uEndCurrency = pathEnd.getCurrency(); @@ -903,7 +946,8 @@ Pathfinder::addLink( // rather than the ultimate destination? bool const hasEffectiveDestination = mEffectiveDst != mDstAccount; - JLOG(j_.trace()) << "addLink< flags=" << addFlags << " onXRP=" << bOnXRP; + JLOG(j_.trace()) << "addLink< flags=" << addFlags << " onXRP=" << bOnXRP + << " completePaths size=" << mCompletePaths.size(); JLOG(j_.trace()) << currentPath.getJson(JsonOptions::none); if (addFlags & afADD_ACCOUNTS) @@ -939,6 +983,8 @@ Pathfinder::addLink( for (auto const& rs : rippleLines) { + if (continueCallback && !continueCallback()) + return; auto const& acct = rs.getAccountIDPeer(); if (hasEffectiveDestination && (acct == mDstAccount)) @@ -1002,7 +1048,8 @@ Pathfinder::addLink( uEndCurrency, acct, bIsEndCurrency, - mEffectiveDst); + mEffectiveDst, + continueCallback); if (out) candidates.push_back({out, acct}); } @@ -1030,6 +1077,8 @@ Pathfinder::addLink( auto it = candidates.begin(); while (count-- != 0) { + if (continueCallback && !continueCallback()) + return; // Add accounts to incompletePaths STPathElement pathElement( STPathElement::typeAccount, @@ -1074,6 +1123,8 @@ Pathfinder::addLink( for (auto const& book : books) { + if (continueCallback && !continueCallback()) + return; if (!currentPath.hasSeen( xrpAccount(), book->getCurrencyOut(), diff --git a/src/ripple/app/paths/Pathfinder.h b/src/ripple/app/paths/Pathfinder.h index aa40b1432..45da9ec11 100644 --- a/src/ripple/app/paths/Pathfinder.h +++ b/src/ripple/app/paths/Pathfinder.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -34,7 +35,7 @@ namespace ripple { @see RippleCalc */ -class Pathfinder +class Pathfinder : public CountedObject { public: /** Construct a pathfinder without an issuer.*/ @@ -56,11 +57,15 @@ public: initPathTable(); bool - findPaths(int searchLevel); + findPaths( + int searchLevel, + std::function const& continueCallback = {}); /** Compute the rankings of the paths. */ void - computePathRanks(int maxPaths); + computePathRanks( + int maxPaths, + std::function const& continueCallback = {}); /* Get the best paths, up to maxPaths in number, from mCompletePaths. @@ -72,7 +77,8 @@ public: int maxPaths, STPath& fullLiquidityPath, STPathSet const& extraPaths, - AccountID const& srcIssuer); + AccountID const& srcIssuer, + std::function const& continueCallback = {}); enum NodeType { nt_SOURCE, // The source account: with an issuer account, if needed. @@ -127,7 +133,9 @@ private: // Add all paths of one type to mCompletePaths. STPathSet& - addPathsForType(PathType const& type); + addPathsForType( + PathType const& type, + std::function const& continueCallback); bool issueMatchesOrigin(Issue const&); @@ -137,20 +145,23 @@ private: Currency const& currency, AccountID const& account, bool isDestCurrency, - AccountID const& dest); + AccountID const& dest, + std::function const& continueCallback); void addLink( STPath const& currentPath, STPathSet& incompletePaths, - int addFlags); + int addFlags, + std::function const& continueCallback); // Call addLink() for each path in currentPaths. void addLinks( STPathSet const& currentPaths, STPathSet& incompletePaths, - int addFlags); + int addFlags, + std::function const& continueCallback); // Compute the liquidity for a path. Return tesSUCCESS if it has has enough // liquidity to be worth keeping, otherwise an error. @@ -178,7 +189,8 @@ private: rankPaths( int maxPaths, STPathSet const& paths, - std::vector& rankedPaths); + std::vector& rankedPaths, + std::function const& continueCallback); AccountID mSrcAccount; AccountID mDstAccount; diff --git a/src/ripple/app/paths/RippleLineCache.cpp b/src/ripple/app/paths/RippleLineCache.cpp index 28e429a4b..a0b26ba28 100644 --- a/src/ripple/app/paths/RippleLineCache.cpp +++ b/src/ripple/app/paths/RippleLineCache.cpp @@ -23,12 +23,22 @@ namespace ripple { -RippleLineCache::RippleLineCache(std::shared_ptr const& ledger) +RippleLineCache::RippleLineCache( + std::shared_ptr const& ledger, + beast::Journal j) + : journal_(j) { - // We want the caching that OpenView provides - // And we need to own a shared_ptr to the input view - // VFALCO TODO This should be a CachedLedger - mLedger = std::make_shared(&*ledger, ledger); + mLedger = ledger; + + JLOG(journal_.debug()) << "RippleLineCache created for ledger " + << mLedger->info().seq; +} + +RippleLineCache::~RippleLineCache() +{ + JLOG(journal_.debug()) << "~RippleLineCache destroyed for ledger " + << mLedger->info().seq << " with " << lines_.size() + << " accounts"; } std::vector const& @@ -43,6 +53,13 @@ RippleLineCache::getRippleLines(AccountID const& accountID) if (inserted) it->second = PathFindTrustLine::getItems(accountID, *mLedger); + JLOG(journal_.debug()) << "RippleLineCache getRippleLines for ledger " + << mLedger->info().seq << " found " + << it->second.size() << " lines for " + << (inserted ? "new " : "existing ") << accountID + << " out of a total of " << lines_.size() + << " accounts"; + return it->second; } diff --git a/src/ripple/app/paths/RippleLineCache.h b/src/ripple/app/paths/RippleLineCache.h index 04c7e743a..e7a7e0f74 100644 --- a/src/ripple/app/paths/RippleLineCache.h +++ b/src/ripple/app/paths/RippleLineCache.h @@ -33,10 +33,13 @@ namespace ripple { // Used by Pathfinder -class RippleLineCache : public CountedObject +class RippleLineCache final : public CountedObject { public: - explicit RippleLineCache(std::shared_ptr const& l); + explicit RippleLineCache( + std::shared_ptr const& l, + beast::Journal j); + ~RippleLineCache(); std::shared_ptr const& getLedger() const @@ -53,7 +56,9 @@ private: ripple::hardened_hash<> hasher_; std::shared_ptr mLedger; - struct AccountKey + beast::Journal journal_; + + struct AccountKey final : public CountedObject { AccountID account_; std::size_t hash_value_; diff --git a/src/ripple/basics/join.h b/src/ripple/basics/join.h new file mode 100644 index 000000000..dde52bc9e --- /dev/null +++ b/src/ripple/basics/join.h @@ -0,0 +1,108 @@ +//------------------------------------------------------------------------------ +/* +This file is part of rippled: https://github.com/ripple/rippled +Copyright (c) 2022 Ripple Labs Inc. + +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ + +#ifndef JOIN_H_INCLUDED +#define JOIN_H_INCLUDED + +#include + +namespace ripple { + +template +Stream& +join(Stream& s, Iter iter, Iter end, std::string const& delimiter) +{ + if (iter == end) + return s; + s << *iter; + for (++iter; iter != end; ++iter) + s << delimiter << *iter; + return s; +} + +template +class CollectionAndDelimiter +{ +public: + Collection const& collection; + std::string const delimiter; + + explicit CollectionAndDelimiter(Collection const& c, std::string delim) + : collection(c), delimiter(std::move(delim)) + { + } + + template + friend Stream& + operator<<(Stream& s, CollectionAndDelimiter const& cd) + { + return join( + s, + std::begin(cd.collection), + std::end(cd.collection), + cd.delimiter); + } +}; + +template +class CollectionAndDelimiter +{ +public: + Collection const* collection; + std::string const delimiter; + + explicit CollectionAndDelimiter(Collection const c[N], std::string delim) + : collection(c), delimiter(std::move(delim)) + { + } + + template + friend Stream& + operator<<(Stream& s, CollectionAndDelimiter const& cd) + { + return join(s, cd.collection, cd.collection + N, cd.delimiter); + } +}; + +// Specialization for const char* strings +template +class CollectionAndDelimiter +{ +public: + char const* collection; + std::string const delimiter; + + explicit CollectionAndDelimiter(char const c[N], std::string delim) + : collection(c), delimiter(std::move(delim)) + { + } + + template + friend Stream& + operator<<(Stream& s, CollectionAndDelimiter const& cd) + { + auto end = cd.collection + N; + if (N > 0 && *(end - 1) == '\0') + --end; + return join(s, cd.collection, end, cd.delimiter); + } +}; + +} // namespace ripple + +#endif diff --git a/src/ripple/core/Job.h b/src/ripple/core/Job.h index 0200daf27..435148062 100644 --- a/src/ripple/core/Job.h +++ b/src/ripple/core/Job.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_CORE_JOB_H_INCLUDED #define RIPPLE_CORE_JOB_H_INCLUDED +#include #include #include #include @@ -92,7 +93,7 @@ enum JobType { jtNS_WRITE, }; -class Job +class Job : public CountedObject { public: using clock_type = std::chrono::steady_clock; diff --git a/src/ripple/net/InfoSub.h b/src/ripple/net/InfoSub.h index bc6460ea8..3c170669b 100644 --- a/src/ripple/net/InfoSub.h +++ b/src/ripple/net/InfoSub.h @@ -33,7 +33,18 @@ namespace ripple { // Operations that clients may wish to perform against the network // Master operational handler, server sequencer, network tracker -class PathRequest; +class InfoSubRequest +{ +public: + using pointer = std::shared_ptr; + + virtual ~InfoSubRequest() = default; + + virtual Json::Value + doClose() = 0; + virtual Json::Value + doStatus(Json::Value const&) = 0; +}; /** Manages a client's subscription to data feeds. */ @@ -205,13 +216,13 @@ public: deleteSubAccountHistory(AccountID const& account); void - clearPathRequest(); + clearRequest(); void - setPathRequest(const std::shared_ptr& req); + setRequest(const std::shared_ptr& req); - std::shared_ptr const& - getPathRequest(); + std::shared_ptr const& + getRequest(); protected: std::mutex mLock; @@ -221,7 +232,7 @@ private: Source& m_source; hash_set realTimeSubscriptions_; hash_set normalSubscriptions_; - std::shared_ptr mPathRequest; + std::shared_ptr request_; std::uint64_t mSeq; hash_set accountHistorySubscriptions_; diff --git a/src/ripple/net/impl/InfoSub.cpp b/src/ripple/net/impl/InfoSub.cpp index 26849f29c..9ea5962fa 100644 --- a/src/ripple/net/impl/InfoSub.cpp +++ b/src/ripple/net/impl/InfoSub.cpp @@ -119,21 +119,21 @@ InfoSub::deleteSubAccountHistory(AccountID const& account) } void -InfoSub::clearPathRequest() +InfoSub::clearRequest() { - mPathRequest.reset(); + request_.reset(); } void -InfoSub::setPathRequest(const std::shared_ptr& req) +InfoSub::setRequest(const std::shared_ptr& req) { - mPathRequest = req; + request_ = req; } -const std::shared_ptr& -InfoSub::getPathRequest() +const std::shared_ptr& +InfoSub::getRequest() { - return mPathRequest; + return request_; } } // namespace ripple diff --git a/src/ripple/protocol/Book.h b/src/ripple/protocol/Book.h index 8a0867fe3..1469b60dd 100644 --- a/src/ripple/protocol/Book.h +++ b/src/ripple/protocol/Book.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_PROTOCOL_BOOK_H_INCLUDED #define RIPPLE_PROTOCOL_BOOK_H_INCLUDED +#include #include #include @@ -29,7 +30,7 @@ namespace ripple { The order book is a pair of Issues called in and out. @see Issue. */ -class Book +class Book final : public CountedObject { public: Issue in; diff --git a/src/ripple/protocol/STAmount.h b/src/ripple/protocol/STAmount.h index 0f3023aaf..d0add30db 100644 --- a/src/ripple/protocol/STAmount.h +++ b/src/ripple/protocol/STAmount.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_PROTOCOL_STAMOUNT_H_INCLUDED #define RIPPLE_PROTOCOL_STAMOUNT_H_INCLUDED +#include #include #include #include @@ -40,7 +41,7 @@ namespace ripple { // Wire form: // High 8 bits are (offset+142), legal range is, 80 to 22 inclusive // Low 56 bits are value, legal range is 10^15 to (10^16 - 1) inclusive -class STAmount : public STBase +class STAmount final : public STBase, public CountedObject { public: using mantissa_type = std::uint64_t; diff --git a/src/ripple/protocol/STPathSet.h b/src/ripple/protocol/STPathSet.h index 3ac2c0797..8102bc76e 100644 --- a/src/ripple/protocol/STPathSet.h +++ b/src/ripple/protocol/STPathSet.h @@ -20,6 +20,7 @@ #ifndef RIPPLE_PROTOCOL_STPATHSET_H_INCLUDED #define RIPPLE_PROTOCOL_STPATHSET_H_INCLUDED +#include #include #include #include @@ -30,7 +31,7 @@ namespace ripple { -class STPathElement +class STPathElement final : public CountedObject { unsigned int mType; AccountID mAccountID; @@ -114,7 +115,7 @@ private: get_hash(STPathElement const& element); }; -class STPath +class STPath final : public CountedObject { std::vector mPath; @@ -172,7 +173,7 @@ public: //------------------------------------------------------------------------------ // A set of zero or more payment paths -class STPathSet final : public STBase +class STPathSet final : public STBase, public CountedObject { std::vector value; diff --git a/src/ripple/rpc/handlers/PathFind.cpp b/src/ripple/rpc/handlers/PathFind.cpp index 656744b68..9d6e0cff1 100644 --- a/src/ripple/rpc/handlers/PathFind.cpp +++ b/src/ripple/rpc/handlers/PathFind.cpp @@ -51,25 +51,25 @@ doPathFind(RPC::JsonContext& context) if (sSubCommand == "create") { context.loadType = Resource::feeHighBurdenRPC; - context.infoSub->clearPathRequest(); + context.infoSub->clearRequest(); return context.app.getPathRequests().makePathRequest( context.infoSub, lpLedger, context.params); } if (sSubCommand == "close") { - PathRequest::pointer request = context.infoSub->getPathRequest(); + InfoSubRequest::pointer request = context.infoSub->getRequest(); if (!request) return rpcError(rpcNO_PF_REQUEST); - context.infoSub->clearPathRequest(); - return request->doClose(context.params); + context.infoSub->clearRequest(); + return request->doClose(); } if (sSubCommand == "status") { - PathRequest::pointer request = context.infoSub->getPathRequest(); + InfoSubRequest::pointer request = context.infoSub->getRequest(); if (!request) return rpcError(rpcNO_PF_REQUEST); diff --git a/src/ripple/rpc/impl/TransactionSign.cpp b/src/ripple/rpc/impl/TransactionSign.cpp index 8dff9ee62..ca24b6874 100644 --- a/src/ripple/rpc/impl/TransactionSign.cpp +++ b/src/ripple/rpc/impl/TransactionSign.cpp @@ -223,7 +223,8 @@ checkPayment( if (auto ledger = app.openLedger().current()) { Pathfinder pf( - std::make_shared(ledger), + std::make_shared( + ledger, app.journal("RippleLineCache")), srcAddressID, *dstAccountID, sendMax.issue().currency, diff --git a/src/test/basics/join_test.cpp b/src/test/basics/join_test.cpp new file mode 100644 index 000000000..730fcb693 --- /dev/null +++ b/src/test/basics/join_test.cpp @@ -0,0 +1,105 @@ +//------------------------------------------------------------------------------ +/* +This file is part of rippled: https://github.com/ripple/rippled +Copyright (c) 2022 Ripple Labs Inc. + +Permission to use, copy, modify, and/or distribute this software for any +purpose with or without fee is hereby granted, provided that the above +copyright notice and this permission notice appear in all copies. + +THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES +WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF +MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR +ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN +ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF +OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#include + +#include +#include + +namespace ripple { +namespace test { + +struct join_test : beast::unit_test::suite +{ + void + run() override + { + auto test = [this](auto collectionanddelimiter, std::string expected) { + std::stringstream ss; + // Put something else in the buffer before and after to ensure that + // the << operator returns the stream correctly. + ss << "(" << collectionanddelimiter << ")"; + auto const str = ss.str(); + BEAST_EXPECT(str.substr(1, str.length() - 2) == expected); + BEAST_EXPECT(str.front() == '('); + BEAST_EXPECT(str.back() == ')'); + }; + + // C++ array + test( + CollectionAndDelimiter(std::array{2, -1, 5, 10}, "/"), + "2/-1/5/10"); + // One item C++ array edge case + test( + CollectionAndDelimiter(std::array{"test"}, " & "), + "test"); + // Empty C++ array edge case + test(CollectionAndDelimiter(std::array{}, ","), ""); + { + // C-style array + char letters[4]{'w', 'a', 's', 'd'}; + test(CollectionAndDelimiter(letters, std::to_string(0)), "w0a0s0d"); + } + { + // Auto sized C-style array + std::string words[]{"one", "two", "three", "four"}; + test(CollectionAndDelimiter(words, "\n"), "one\ntwo\nthree\nfour"); + } + { + // One item C-style array edge case + std::string words[]{"thing"}; + test(CollectionAndDelimiter(words, "\n"), "thing"); + } + // Initializer list + test( + CollectionAndDelimiter(std::initializer_list{19, 25}, "+"), + "19+25"); + // vector + test( + CollectionAndDelimiter(std::vector{0, 42}, std::to_string(99)), + "09942"); + { + // vector with one item edge case + using namespace jtx; + test( + CollectionAndDelimiter( + std::vector{Account::master}, "xxx"), + Account::master.human()); + } + // empty vector edge case + test(CollectionAndDelimiter(std::vector{}, ","), ""); + // C-style string + test(CollectionAndDelimiter("string", " "), "s t r i n g"); + // Empty C-style string edge case + test(CollectionAndDelimiter("", "*"), ""); + // Single char C-style string edge case + test(CollectionAndDelimiter("x", "*"), "x"); + // std::string + test(CollectionAndDelimiter(std::string{"string"}, "-"), "s-t-r-i-n-g"); + // Empty std::string edge case + test(CollectionAndDelimiter(std::string{""}, "*"), ""); + // Single char std::string edge case + test(CollectionAndDelimiter(std::string{"y"}, "*"), "y"); + } +}; // namespace test + +BEAST_DEFINE_TESTSUITE(join, ripple_basics, ripple); + +} // namespace test +} // namespace ripple diff --git a/src/test/jtx/impl/paths.cpp b/src/test/jtx/impl/paths.cpp index e27d2789b..1b9bf52fc 100644 --- a/src/test/jtx/impl/paths.cpp +++ b/src/test/jtx/impl/paths.cpp @@ -33,7 +33,8 @@ paths::operator()(Env& env, JTx& jt) const auto const to = env.lookup(jv[jss::Destination].asString()); auto const amount = amountFromJson(sfAmount, jv[jss::Amount]); Pathfinder pf( - std::make_shared(env.current()), + std::make_shared( + env.current(), env.app().journal("RippleLineCache")), from, to, in_.currency, diff --git a/src/test/rpc/GetCounts_test.cpp b/src/test/rpc/GetCounts_test.cpp index a3b0b7162..52b645ed7 100644 --- a/src/test/rpc/GetCounts_test.cpp +++ b/src/test/rpc/GetCounts_test.cpp @@ -35,6 +35,9 @@ class GetCounts_test : public beast::unit_test::suite Json::Value result; { + using namespace std::chrono_literals; + // Add a little delay so the App's "uptime" will have a value. + std::this_thread::sleep_for(1s); // check counts with no transactions posted result = env.rpc("get_counts")[jss::result]; BEAST_EXPECT(result[jss::status] == "success");