Incremental improvements to path finding memory usage:

* Abort background path finding when closed or disconnected
* Exit pathfinding job thread if there are no requests left
* Don't bother creating the path find job if there are no requests
* Refactor to remove circular dependency between InfoSub and PathRequest
This commit is contained in:
Edward Hennis
2022-01-26 19:41:36 -05:00
committed by Nik Bougalis
parent 4d5459d041
commit e7e672c3f8
26 changed files with 550 additions and 108 deletions

View File

@@ -733,6 +733,7 @@ if (tests)
src/test/basics/contract_test.cpp
src/test/basics/FeeUnits_test.cpp
src/test/basics/hardened_hash_test.cpp
src/test/basics/join_test.cpp
src/test/basics/mulDiv_test.cpp
src/test/basics/tagged_integer_test.cpp
#[===============================[

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERHOLDER_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/contract.h>
#include <mutex>
@@ -35,7 +36,7 @@ namespace ripple {
way the object always holds a value. We can use the
genesis ledger in all cases.
*/
class LedgerHolder
class LedgerHolder : public CountedObject<LedgerHolder>
{
public:
// Update the held ledger

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED
#define RIPPLE_APP_LEDGER_LEDGERREPLAY_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <cstdint>
#include <map>
#include <memory>
@@ -29,7 +30,7 @@ namespace ripple {
class Ledger;
class STTx;
class LedgerReplay
class LedgerReplay : public CountedObject<LedgerReplay>
{
std::shared_ptr<Ledger const> parent_;
std::shared_ptr<Ledger const> replay_;

View File

@@ -261,8 +261,13 @@ LedgerMaster::getPublishedLedgerAge()
std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch();
ret -= pubClose;
ret = (ret > 0s) ? ret : 0s;
static std::chrono::seconds lastRet = -1s;
JLOG(m_journal.trace()) << "Published ledger age is " << ret.count();
if (ret != lastRet)
{
JLOG(m_journal.trace()) << "Published ledger age is " << ret.count();
lastRet = ret;
}
return ret;
}
@@ -287,8 +292,13 @@ LedgerMaster::getValidatedLedgerAge()
std::chrono::seconds ret = app_.timeKeeper().closeTime().time_since_epoch();
ret -= valClose;
ret = (ret > 0s) ? ret : 0s;
static std::chrono::seconds lastRet = -1s;
JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count();
if (ret != lastRet)
{
JLOG(m_journal.trace()) << "Validated ledger age is " << ret.count();
lastRet = ret;
}
return ret;
}
@@ -1483,12 +1493,14 @@ LedgerMaster::updatePaths()
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
JLOG(m_journal.debug()) << "Need network ledger for updating paths";
return;
}
}
while (!app_.getJobQueue().isStopping())
{
JLOG(m_journal.debug()) << "updatePaths running";
std::shared_ptr<ReadView const> lastLedger;
{
std::lock_guard ml(m_mutex);
@@ -1506,6 +1518,7 @@ LedgerMaster::updatePaths()
else
{ // Nothing to do
--mPathFindThread;
JLOG(m_journal.debug()) << "Nothing to do for updating paths";
return;
}
}
@@ -1527,7 +1540,31 @@ LedgerMaster::updatePaths()
try
{
app_.getPathRequests().updateAll(lastLedger);
auto& pathRequests = app_.getPathRequests();
{
std::lock_guard ml(m_mutex);
if (!pathRequests.requestsPending())
{
--mPathFindThread;
JLOG(m_journal.debug())
<< "No path requests found. Nothing to do for updating "
"paths. "
<< mPathFindThread << " jobs remaining";
return;
}
}
JLOG(m_journal.debug()) << "Updating paths";
pathRequests.updateAll(lastLedger);
std::lock_guard ml(m_mutex);
if (!pathRequests.requestsPending())
{
JLOG(m_journal.debug())
<< "No path requests left. No need for further updating "
"paths";
--mPathFindThread;
return;
}
}
catch (SHAMapMissingNode const& mn)
{
@@ -1587,8 +1624,11 @@ LedgerMaster::newPFWork(
const char* name,
std::unique_lock<std::recursive_mutex>&)
{
if (mPathFindThread < 2)
if (mPathFindThread < 2 && app_.getPathRequests().requestsPending())
{
JLOG(m_journal.debug())
<< "newPFWork: Creating job. path find threads: "
<< mPathFindThread;
if (app_.getJobQueue().addJob(
jtUPDATE_PF, name, [this]() { updatePaths(); }))
{

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED
#define RIPPLE_APP_MISC_CANONICALTXSET_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <ripple/protocol/STTx.h>
#include <ripple/protocol/SeqProxy.h>
@@ -34,7 +35,7 @@ namespace ripple {
*/
// VFALCO TODO rename to SortedTxSet
class CanonicalTXSet
class CanonicalTXSet : public CountedObject<CanonicalTXSet>
{
private:
class Key

View File

@@ -20,10 +20,12 @@
#ifndef RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED
#define RIPPLE_APP_MISC_ORDERBOOK_H_INCLUDED
#include <ripple/basics/CountedObject.h>
namespace ripple {
/** Describes a serialized ledger entry for an order book. */
class OrderBook
class OrderBook : public CountedObject<OrderBook>
{
public:
using pointer = std::shared_ptr<OrderBook>;

View File

@@ -441,7 +441,7 @@ PathRequest::parseJson(Json::Value const& jvParams)
}
Json::Value
PathRequest::doClose(Json::Value const&)
PathRequest::doClose()
{
JLOG(m_journal.debug()) << iIdentifier << " closed";
std::lock_guard sl(mLock);
@@ -457,13 +457,20 @@ PathRequest::doStatus(Json::Value const&)
return jvStatus;
}
void
PathRequest::doAborting() const
{
JLOG(m_journal.info()) << iIdentifier << " aborting early";
}
std::unique_ptr<Pathfinder> const&
PathRequest::getPathFinder(
std::shared_ptr<RippleLineCache> const& cache,
hash_map<Currency, std::unique_ptr<Pathfinder>>& currency_map,
Currency const& currency,
STAmount const& dst_amount,
int const level)
int const level,
std::function<bool(void)> const& continueCallback)
{
auto i = currency_map.find(currency);
if (i != currency_map.end())
@@ -477,8 +484,8 @@ PathRequest::getPathFinder(
dst_amount,
saSendMax,
app_);
if (pathfinder->findPaths(level))
pathfinder->computePathRanks(max_paths_);
if (pathfinder->findPaths(level, continueCallback))
pathfinder->computePathRanks(max_paths_, continueCallback);
else
pathfinder.reset(); // It's a bad request - clear it.
return currency_map[currency] = std::move(pathfinder);
@@ -488,7 +495,8 @@ bool
PathRequest::findPaths(
std::shared_ptr<RippleLineCache> const& cache,
int const level,
Json::Value& jvArray)
Json::Value& jvArray,
std::function<bool(void)> const& continueCallback)
{
auto sourceCurrencies = sciSourceCurrencies;
if (sourceCurrencies.empty() && saSendMax)
@@ -515,22 +523,33 @@ PathRequest::findPaths(
hash_map<Currency, std::unique_ptr<Pathfinder>> currency_map;
for (auto const& issue : sourceCurrencies)
{
if (continueCallback && !continueCallback())
break;
JLOG(m_journal.debug())
<< iIdentifier
<< " Trying to find paths: " << STAmount(issue, 1).getFullText();
auto& pathfinder = getPathFinder(
cache, currency_map, issue.currency, dst_amount, level);
cache,
currency_map,
issue.currency,
dst_amount,
level,
continueCallback);
if (!pathfinder)
{
assert(false);
assert(continueCallback && !continueCallback());
JLOG(m_journal.debug()) << iIdentifier << " No paths found";
continue;
}
STPath fullLiquidityPath;
auto ps = pathfinder->getBestPaths(
max_paths_, fullLiquidityPath, mContext[issue], issue.account);
max_paths_,
fullLiquidityPath,
mContext[issue],
issue.account,
continueCallback);
mContext[issue] = ps;
auto& sourceAccount = !isXRP(issue.account)
@@ -628,7 +647,10 @@ PathRequest::findPaths(
}
Json::Value
PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
PathRequest::doUpdate(
std::shared_ptr<RippleLineCache> const& cache,
bool fast,
std::function<bool(void)> const& continueCallback)
{
using namespace std::chrono;
JLOG(m_journal.debug())
@@ -699,7 +721,7 @@ PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
JLOG(m_journal.debug()) << iIdentifier << " processing at level " << iLevel;
Json::Value jvArray = Json::arrayValue;
if (findPaths(cache, iLevel, jvArray))
if (findPaths(cache, iLevel, jvArray, continueCallback))
{
bLastSuccess = jvArray.size() != 0;
newStatus[jss::alternatives] = std::move(jvArray);
@@ -730,7 +752,7 @@ PathRequest::doUpdate(std::shared_ptr<RippleLineCache> const& cache, bool fast)
}
InfoSub::pointer
PathRequest::getSubscriber()
PathRequest::getSubscriber() const
{
return wpSubscriber.lock();
}

View File

@@ -43,10 +43,10 @@ class PathRequests;
// Return values from parseJson <0 = invalid, >0 = valid
#define PFR_PJ_INVALID -1
#define PFR_PJ_NOCHANGE 0
#define PFR_PJ_CHANGE 1
class PathRequest : public std::enable_shared_from_this<PathRequest>,
public CountedObject<PathRequest>
class PathRequest final : public InfoSubRequest,
public std::enable_shared_from_this<PathRequest>,
public CountedObject<PathRequest>
{
public:
using wptr = std::weak_ptr<PathRequest>;
@@ -55,8 +55,6 @@ public:
using wref = const wptr&;
public:
// VFALCO TODO Break the cyclic dependency on InfoSub
// path_find semantics
// Subscriber is updated
PathRequest(
@@ -91,15 +89,20 @@ public:
doCreate(std::shared_ptr<RippleLineCache> const&, Json::Value const&);
Json::Value
doClose(Json::Value const&);
doClose() override;
Json::Value
doStatus(Json::Value const&);
doStatus(Json::Value const&) override;
void
doAborting() const;
// update jvStatus
Json::Value
doUpdate(std::shared_ptr<RippleLineCache> const&, bool fast);
doUpdate(
std::shared_ptr<RippleLineCache> const&,
bool fast,
std::function<bool(void)> const& continueCallback = {});
InfoSub::pointer
getSubscriber();
getSubscriber() const;
bool
hasCompletion();
@@ -113,13 +116,18 @@ private:
hash_map<Currency, std::unique_ptr<Pathfinder>>&,
Currency const&,
STAmount const&,
int const);
int const,
std::function<bool(void)> const&);
/** Finds and sets a PathSet in the JSON argument.
Returns false if the source currencies are inavlid.
*/
bool
findPaths(std::shared_ptr<RippleLineCache> const&, int const, Json::Value&);
findPaths(
std::shared_ptr<RippleLineCache> const&,
int const,
Json::Value&,
std::function<bool(void)> const&);
int
parseJson(Json::Value const&);
@@ -156,7 +164,7 @@ private:
int iLevel;
bool bLastSuccess;
int iIdentifier;
int const iIdentifier;
std::chrono::steady_clock::time_point const created_;
std::chrono::steady_clock::time_point quick_reply_;

View File

@@ -40,8 +40,12 @@ PathRequests::getLineCache(
{
std::lock_guard sl(mLock);
std::uint32_t lineSeq = mLineCache ? mLineCache->getLedger()->seq() : 0;
std::uint32_t lgrSeq = ledger->seq();
auto lineCache = lineCache_.lock();
std::uint32_t const lineSeq = lineCache ? lineCache->getLedger()->seq() : 0;
std::uint32_t const lgrSeq = ledger->seq();
JLOG(mJournal.debug()) << "getLineCache has cache for " << lineSeq
<< ", considering " << lgrSeq;
if ((lineSeq == 0) || // no ledger
(authoritative && (lgrSeq > lineSeq)) || // newer authoritative ledger
@@ -49,9 +53,15 @@ PathRequests::getLineCache(
((lgrSeq + 8) < lineSeq)) || // we jumped way back for some reason
(lgrSeq > (lineSeq + 8))) // we jumped way forward for some reason
{
mLineCache = std::make_shared<RippleLineCache>(ledger);
JLOG(mJournal.debug())
<< "getLineCache creating new cache for " << lgrSeq;
// Assign to the local before the member, because the member is a
// weak_ptr, and will immediately discard it if there are no other
// references.
lineCache_ = lineCache = std::make_shared<RippleLineCache>(
ledger, app_.journal("RippleLineCache"));
}
return mLineCache;
return lineCache;
}
void
@@ -78,8 +88,20 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
int processed = 0, removed = 0;
auto getSubscriber =
[](PathRequest::pointer const& request) -> InfoSub::pointer {
if (auto ipSub = request->getSubscriber();
ipSub && ipSub->getRequest() == request)
{
return ipSub;
}
request->doAborting();
return nullptr;
};
do
{
JLOG(mJournal.trace()) << "updateAll looping";
for (auto const& wr : requests)
{
if (app_.getJobQueue().isStopping())
@@ -87,25 +109,40 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
auto request = wr.lock();
bool remove = true;
JLOG(mJournal.trace())
<< "updateAll request " << (request ? "" : "not ") << "found";
if (request)
{
auto continueCallback = [&getSubscriber, &request]() {
// This callback is used by doUpdate to determine whether to
// continue working. If getSubscriber returns null, that
// indicates that this request is no longer relevant.
return (bool)getSubscriber(request);
};
if (!request->needsUpdate(
newRequests, cache->getLedger()->seq()))
remove = false;
else
{
if (auto ipSub = request->getSubscriber())
if (auto ipSub = getSubscriber(request))
{
if (!ipSub->getConsumer().warn())
{
Json::Value update =
request->doUpdate(cache, false);
// Release the shared ptr to the subscriber so that
// it can be freed if the client disconnects, and
// thus fail to lock later.
ipSub.reset();
Json::Value update = request->doUpdate(
cache, false, continueCallback);
request->updateComplete();
update[jss::type] = "path_find";
ipSub->send(update, false);
remove = false;
++processed;
if ((ipSub = getSubscriber(request)))
{
ipSub->send(update, false);
remove = false;
++processed;
}
}
}
else if (request->hasCompletion())
@@ -178,6 +215,13 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
<< " processed and " << removed << " removed";
}
bool
PathRequests::requestsPending() const
{
std::lock_guard sl(mLock);
return !requests_.empty();
}
void
PathRequests::insertPathRequest(PathRequest::pointer const& req)
{
@@ -211,7 +255,7 @@ PathRequests::makePathRequest(
if (valid)
{
subscriber->setPathRequest(req);
subscriber->setRequest(req);
insertPathRequest(req);
app_.getLedgerMaster().newPathRequest();
}
@@ -258,7 +302,8 @@ PathRequests::doLegacyPathRequest(
std::shared_ptr<ReadView const> const& inLedger,
Json::Value const& request)
{
auto cache = std::make_shared<RippleLineCache>(inLedger);
auto cache = std::make_shared<RippleLineCache>(
inLedger, app_.journal("RippleLineCache"));
auto req = std::make_shared<PathRequest>(
app_, [] {}, consumer, ++mLastIdentifier, *this, mJournal);

View File

@@ -51,6 +51,9 @@ public:
void
updateAll(std::shared_ptr<ReadView const> const& ledger);
bool
requestsPending() const;
std::shared_ptr<RippleLineCache>
getLineCache(
std::shared_ptr<ReadView const> const& ledger,
@@ -109,11 +112,11 @@ private:
std::vector<PathRequest::wptr> requests_;
// Use a RippleLineCache
std::shared_ptr<RippleLineCache> mLineCache;
std::weak_ptr<RippleLineCache> lineCache_;
std::atomic<int> mLastIdentifier;
std::recursive_mutex mLock;
std::recursive_mutex mutable mLock;
};
} // namespace ripple

View File

@@ -24,6 +24,7 @@
#include <ripple/app/paths/RippleLineCache.h>
#include <ripple/app/paths/impl/PathfinderUtils.h>
#include <ripple/basics/Log.h>
#include <ripple/basics/join.h>
#include <ripple/core/Config.h>
#include <ripple/core/JobQueue.h>
#include <ripple/json/to_string.h>
@@ -191,8 +192,11 @@ Pathfinder::Pathfinder(
}
bool
Pathfinder::findPaths(int searchLevel)
Pathfinder::findPaths(
int searchLevel,
std::function<bool(void)> const& continueCallback)
{
JLOG(j_.trace()) << "findPaths start";
if (mDstAmount == beast::zero)
{
// No need to send zero money.
@@ -316,10 +320,13 @@ Pathfinder::findPaths(int searchLevel)
// Now iterate over all paths for that paymentType.
for (auto const& costedPath : mPathTable[paymentType])
{
if (continueCallback && !continueCallback())
return false;
// Only use paths with at most the current search level.
if (costedPath.searchLevel <= searchLevel)
{
addPathsForType(costedPath.type);
JLOG(j_.trace()) << "findPaths trying payment type " << paymentType;
addPathsForType(costedPath.type, continueCallback);
if (mCompletePaths.size() > PATHFINDER_MAX_COMPLETE_PATHS)
break;
@@ -401,7 +408,9 @@ Pathfinder::getPathLiquidity(
}
void
Pathfinder::computePathRanks(int maxPaths)
Pathfinder::computePathRanks(
int maxPaths,
std::function<bool(void)> const& continueCallback)
{
mRemainingAmount = convertAmount(mDstAmount, convert_all_);
@@ -439,7 +448,7 @@ Pathfinder::computePathRanks(int maxPaths)
JLOG(j_.debug()) << "Default path causes exception";
}
rankPaths(maxPaths, mCompletePaths, mPathRanks);
rankPaths(maxPaths, mCompletePaths, mPathRanks, continueCallback);
}
static bool
@@ -480,8 +489,11 @@ void
Pathfinder::rankPaths(
int maxPaths,
STPathSet const& paths,
std::vector<PathRank>& rankedPaths)
std::vector<PathRank>& rankedPaths,
std::function<bool(void)> const& continueCallback)
{
JLOG(j_.trace()) << "rankPaths with " << paths.size() << " candidates, and "
<< maxPaths << " maximum";
rankedPaths.clear();
rankedPaths.reserve(paths.size());
@@ -499,6 +511,8 @@ Pathfinder::rankPaths(
for (int i = 0; i < paths.size(); ++i)
{
if (continueCallback && !continueCallback())
return;
auto const& currentPath = paths[i];
if (!currentPath.empty())
{
@@ -554,7 +568,8 @@ Pathfinder::getBestPaths(
int maxPaths,
STPath& fullLiquidityPath,
STPathSet const& extraPaths,
AccountID const& srcIssuer)
AccountID const& srcIssuer,
std::function<bool(void)> const& continueCallback)
{
JLOG(j_.debug()) << "findPaths: " << mCompletePaths.size() << " paths and "
<< extraPaths.size() << " extras";
@@ -567,7 +582,7 @@ Pathfinder::getBestPaths(
isXRP(mSrcCurrency) || (srcIssuer == mSrcAccount);
std::vector<PathRank> extraPathRanks;
rankPaths(maxPaths, extraPaths, extraPathRanks);
rankPaths(maxPaths, extraPaths, extraPathRanks, continueCallback);
STPathSet bestPaths;
@@ -582,6 +597,8 @@ Pathfinder::getBestPaths(
while (pathsIterator != mPathRanks.end() ||
extraPathsIterator != extraPathRanks.end())
{
if (continueCallback && !continueCallback())
break;
bool usePath = false;
bool useExtraPath = false;
@@ -692,7 +709,8 @@ Pathfinder::getPathsOut(
Currency const& currency,
AccountID const& account,
bool isDstCurrency,
AccountID const& dstAccount)
AccountID const& dstAccount,
std::function<bool(void)> const& continueCallback)
{
Issue const issue(currency, account);
@@ -755,17 +773,26 @@ void
Pathfinder::addLinks(
STPathSet const& currentPaths, // The paths to build from
STPathSet& incompletePaths, // The set of partial paths we add to
int addFlags)
int addFlags,
std::function<bool(void)> const& continueCallback)
{
JLOG(j_.debug()) << "addLink< on " << currentPaths.size()
<< " source(s), flags=" << addFlags;
for (auto const& path : currentPaths)
addLink(path, incompletePaths, addFlags);
{
if (continueCallback && !continueCallback())
return;
addLink(path, incompletePaths, addFlags, continueCallback);
}
}
STPathSet&
Pathfinder::addPathsForType(PathType const& pathType)
Pathfinder::addPathsForType(
PathType const& pathType,
std::function<bool(void)> const& continueCallback)
{
JLOG(j_.warn()) << "addPathsForType "
<< CollectionAndDelimiter(pathType, ", ");
// See if the set of paths for this type already exists.
auto it = mPaths.find(pathType);
if (it != mPaths.end())
@@ -774,13 +801,16 @@ Pathfinder::addPathsForType(PathType const& pathType)
// Otherwise, if the type has no nodes, return the empty path.
if (pathType.empty())
return mPaths[pathType];
if (continueCallback && !continueCallback())
return mPaths[{}];
// Otherwise, get the paths for the parent PathType by calling
// addPathsForType recursively.
PathType parentPathType = pathType;
parentPathType.pop_back();
STPathSet const& parentPaths = addPathsForType(parentPathType);
STPathSet const& parentPaths =
addPathsForType(parentPathType, continueCallback);
STPathSet& pathsOut = mPaths[pathType];
JLOG(j_.debug()) << "getPaths< adding onto '"
@@ -800,26 +830,38 @@ Pathfinder::addPathsForType(PathType const& pathType)
break;
case nt_ACCOUNTS:
addLinks(parentPaths, pathsOut, afADD_ACCOUNTS);
addLinks(parentPaths, pathsOut, afADD_ACCOUNTS, continueCallback);
break;
case nt_BOOKS:
addLinks(parentPaths, pathsOut, afADD_BOOKS);
addLinks(parentPaths, pathsOut, afADD_BOOKS, continueCallback);
break;
case nt_XRP_BOOK:
addLinks(parentPaths, pathsOut, afADD_BOOKS | afOB_XRP);
addLinks(
parentPaths,
pathsOut,
afADD_BOOKS | afOB_XRP,
continueCallback);
break;
case nt_DEST_BOOK:
addLinks(parentPaths, pathsOut, afADD_BOOKS | afOB_LAST);
addLinks(
parentPaths,
pathsOut,
afADD_BOOKS | afOB_LAST,
continueCallback);
break;
case nt_DESTINATION:
// FIXME: What if a different issuer was specified on the
// destination amount?
// TODO(tom): what does this even mean? Should it be a JIRA?
addLinks(parentPaths, pathsOut, afADD_ACCOUNTS | afAC_LAST);
addLinks(
parentPaths,
pathsOut,
afADD_ACCOUNTS | afAC_LAST,
continueCallback);
break;
}
@@ -890,7 +932,8 @@ void
Pathfinder::addLink(
const STPath& currentPath, // The path to build from
STPathSet& incompletePaths, // The set of partial paths we add to
int addFlags)
int addFlags,
std::function<bool(void)> const& continueCallback)
{
auto const& pathEnd = currentPath.empty() ? mSource : currentPath.back();
auto const& uEndCurrency = pathEnd.getCurrency();
@@ -903,7 +946,8 @@ Pathfinder::addLink(
// rather than the ultimate destination?
bool const hasEffectiveDestination = mEffectiveDst != mDstAccount;
JLOG(j_.trace()) << "addLink< flags=" << addFlags << " onXRP=" << bOnXRP;
JLOG(j_.trace()) << "addLink< flags=" << addFlags << " onXRP=" << bOnXRP
<< " completePaths size=" << mCompletePaths.size();
JLOG(j_.trace()) << currentPath.getJson(JsonOptions::none);
if (addFlags & afADD_ACCOUNTS)
@@ -939,6 +983,8 @@ Pathfinder::addLink(
for (auto const& rs : rippleLines)
{
if (continueCallback && !continueCallback())
return;
auto const& acct = rs.getAccountIDPeer();
if (hasEffectiveDestination && (acct == mDstAccount))
@@ -1002,7 +1048,8 @@ Pathfinder::addLink(
uEndCurrency,
acct,
bIsEndCurrency,
mEffectiveDst);
mEffectiveDst,
continueCallback);
if (out)
candidates.push_back({out, acct});
}
@@ -1030,6 +1077,8 @@ Pathfinder::addLink(
auto it = candidates.begin();
while (count-- != 0)
{
if (continueCallback && !continueCallback())
return;
// Add accounts to incompletePaths
STPathElement pathElement(
STPathElement::typeAccount,
@@ -1074,6 +1123,8 @@ Pathfinder::addLink(
for (auto const& book : books)
{
if (continueCallback && !continueCallback())
return;
if (!currentPath.hasSeen(
xrpAccount(),
book->getCurrencyOut(),

View File

@@ -22,6 +22,7 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/paths/RippleLineCache.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/core/LoadEvent.h>
#include <ripple/protocol/STAmount.h>
#include <ripple/protocol/STPathSet.h>
@@ -34,7 +35,7 @@ namespace ripple {
@see RippleCalc
*/
class Pathfinder
class Pathfinder : public CountedObject<Pathfinder>
{
public:
/** Construct a pathfinder without an issuer.*/
@@ -56,11 +57,15 @@ public:
initPathTable();
bool
findPaths(int searchLevel);
findPaths(
int searchLevel,
std::function<bool(void)> const& continueCallback = {});
/** Compute the rankings of the paths. */
void
computePathRanks(int maxPaths);
computePathRanks(
int maxPaths,
std::function<bool(void)> const& continueCallback = {});
/* Get the best paths, up to maxPaths in number, from mCompletePaths.
@@ -72,7 +77,8 @@ public:
int maxPaths,
STPath& fullLiquidityPath,
STPathSet const& extraPaths,
AccountID const& srcIssuer);
AccountID const& srcIssuer,
std::function<bool(void)> const& continueCallback = {});
enum NodeType {
nt_SOURCE, // The source account: with an issuer account, if needed.
@@ -127,7 +133,9 @@ private:
// Add all paths of one type to mCompletePaths.
STPathSet&
addPathsForType(PathType const& type);
addPathsForType(
PathType const& type,
std::function<bool(void)> const& continueCallback);
bool
issueMatchesOrigin(Issue const&);
@@ -137,20 +145,23 @@ private:
Currency const& currency,
AccountID const& account,
bool isDestCurrency,
AccountID const& dest);
AccountID const& dest,
std::function<bool(void)> const& continueCallback);
void
addLink(
STPath const& currentPath,
STPathSet& incompletePaths,
int addFlags);
int addFlags,
std::function<bool(void)> const& continueCallback);
// Call addLink() for each path in currentPaths.
void
addLinks(
STPathSet const& currentPaths,
STPathSet& incompletePaths,
int addFlags);
int addFlags,
std::function<bool(void)> const& continueCallback);
// Compute the liquidity for a path. Return tesSUCCESS if it has has enough
// liquidity to be worth keeping, otherwise an error.
@@ -178,7 +189,8 @@ private:
rankPaths(
int maxPaths,
STPathSet const& paths,
std::vector<PathRank>& rankedPaths);
std::vector<PathRank>& rankedPaths,
std::function<bool(void)> const& continueCallback);
AccountID mSrcAccount;
AccountID mDstAccount;

View File

@@ -23,12 +23,22 @@
namespace ripple {
RippleLineCache::RippleLineCache(std::shared_ptr<ReadView const> const& ledger)
RippleLineCache::RippleLineCache(
std::shared_ptr<ReadView const> const& ledger,
beast::Journal j)
: journal_(j)
{
// We want the caching that OpenView provides
// And we need to own a shared_ptr to the input view
// VFALCO TODO This should be a CachedLedger
mLedger = std::make_shared<OpenView>(&*ledger, ledger);
mLedger = ledger;
JLOG(journal_.debug()) << "RippleLineCache created for ledger "
<< mLedger->info().seq;
}
RippleLineCache::~RippleLineCache()
{
JLOG(journal_.debug()) << "~RippleLineCache destroyed for ledger "
<< mLedger->info().seq << " with " << lines_.size()
<< " accounts";
}
std::vector<PathFindTrustLine> const&
@@ -43,6 +53,13 @@ RippleLineCache::getRippleLines(AccountID const& accountID)
if (inserted)
it->second = PathFindTrustLine::getItems(accountID, *mLedger);
JLOG(journal_.debug()) << "RippleLineCache getRippleLines for ledger "
<< mLedger->info().seq << " found "
<< it->second.size() << " lines for "
<< (inserted ? "new " : "existing ") << accountID
<< " out of a total of " << lines_.size()
<< " accounts";
return it->second;
}

View File

@@ -33,10 +33,13 @@
namespace ripple {
// Used by Pathfinder
class RippleLineCache : public CountedObject<RippleLineCache>
class RippleLineCache final : public CountedObject<RippleLineCache>
{
public:
explicit RippleLineCache(std::shared_ptr<ReadView const> const& l);
explicit RippleLineCache(
std::shared_ptr<ReadView const> const& l,
beast::Journal j);
~RippleLineCache();
std::shared_ptr<ReadView const> const&
getLedger() const
@@ -53,7 +56,9 @@ private:
ripple::hardened_hash<> hasher_;
std::shared_ptr<ReadView const> mLedger;
struct AccountKey
beast::Journal journal_;
struct AccountKey final : public CountedObject<AccountKey>
{
AccountID account_;
std::size_t hash_value_;

108
src/ripple/basics/join.h Normal file
View File

@@ -0,0 +1,108 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2022 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
#ifndef JOIN_H_INCLUDED
#define JOIN_H_INCLUDED
#include <string>
namespace ripple {
template <class Stream, class Iter>
Stream&
join(Stream& s, Iter iter, Iter end, std::string const& delimiter)
{
if (iter == end)
return s;
s << *iter;
for (++iter; iter != end; ++iter)
s << delimiter << *iter;
return s;
}
template <class Collection>
class CollectionAndDelimiter
{
public:
Collection const& collection;
std::string const delimiter;
explicit CollectionAndDelimiter(Collection const& c, std::string delim)
: collection(c), delimiter(std::move(delim))
{
}
template <class Stream>
friend Stream&
operator<<(Stream& s, CollectionAndDelimiter const& cd)
{
return join(
s,
std::begin(cd.collection),
std::end(cd.collection),
cd.delimiter);
}
};
template <class Collection, std::size_t N>
class CollectionAndDelimiter<Collection[N]>
{
public:
Collection const* collection;
std::string const delimiter;
explicit CollectionAndDelimiter(Collection const c[N], std::string delim)
: collection(c), delimiter(std::move(delim))
{
}
template <class Stream>
friend Stream&
operator<<(Stream& s, CollectionAndDelimiter const& cd)
{
return join(s, cd.collection, cd.collection + N, cd.delimiter);
}
};
// Specialization for const char* strings
template <std::size_t N>
class CollectionAndDelimiter<char[N]>
{
public:
char const* collection;
std::string const delimiter;
explicit CollectionAndDelimiter(char const c[N], std::string delim)
: collection(c), delimiter(std::move(delim))
{
}
template <class Stream>
friend Stream&
operator<<(Stream& s, CollectionAndDelimiter const& cd)
{
auto end = cd.collection + N;
if (N > 0 && *(end - 1) == '\0')
--end;
return join(s, cd.collection, end, cd.delimiter);
}
};
} // namespace ripple
#endif

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_CORE_JOB_H_INCLUDED
#define RIPPLE_CORE_JOB_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/core/ClosureCounter.h>
#include <ripple/core/LoadMonitor.h>
#include <functional>
@@ -92,7 +93,7 @@ enum JobType {
jtNS_WRITE,
};
class Job
class Job : public CountedObject<Job>
{
public:
using clock_type = std::chrono::steady_clock;

View File

@@ -33,7 +33,18 @@ namespace ripple {
// Operations that clients may wish to perform against the network
// Master operational handler, server sequencer, network tracker
class PathRequest;
class InfoSubRequest
{
public:
using pointer = std::shared_ptr<InfoSubRequest>;
virtual ~InfoSubRequest() = default;
virtual Json::Value
doClose() = 0;
virtual Json::Value
doStatus(Json::Value const&) = 0;
};
/** Manages a client's subscription to data feeds.
*/
@@ -205,13 +216,13 @@ public:
deleteSubAccountHistory(AccountID const& account);
void
clearPathRequest();
clearRequest();
void
setPathRequest(const std::shared_ptr<PathRequest>& req);
setRequest(const std::shared_ptr<InfoSubRequest>& req);
std::shared_ptr<PathRequest> const&
getPathRequest();
std::shared_ptr<InfoSubRequest> const&
getRequest();
protected:
std::mutex mLock;
@@ -221,7 +232,7 @@ private:
Source& m_source;
hash_set<AccountID> realTimeSubscriptions_;
hash_set<AccountID> normalSubscriptions_;
std::shared_ptr<PathRequest> mPathRequest;
std::shared_ptr<InfoSubRequest> request_;
std::uint64_t mSeq;
hash_set<AccountID> accountHistorySubscriptions_;

View File

@@ -119,21 +119,21 @@ InfoSub::deleteSubAccountHistory(AccountID const& account)
}
void
InfoSub::clearPathRequest()
InfoSub::clearRequest()
{
mPathRequest.reset();
request_.reset();
}
void
InfoSub::setPathRequest(const std::shared_ptr<PathRequest>& req)
InfoSub::setRequest(const std::shared_ptr<InfoSubRequest>& req)
{
mPathRequest = req;
request_ = req;
}
const std::shared_ptr<PathRequest>&
InfoSub::getPathRequest()
const std::shared_ptr<InfoSubRequest>&
InfoSub::getRequest()
{
return mPathRequest;
return request_;
}
} // namespace ripple

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PROTOCOL_BOOK_H_INCLUDED
#define RIPPLE_PROTOCOL_BOOK_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/Issue.h>
#include <boost/utility/base_from_member.hpp>
@@ -29,7 +30,7 @@ namespace ripple {
The order book is a pair of Issues called in and out.
@see Issue.
*/
class Book
class Book final : public CountedObject<Book>
{
public:
Issue in;

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PROTOCOL_STAMOUNT_H_INCLUDED
#define RIPPLE_PROTOCOL_STAMOUNT_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/IOUAmount.h>
#include <ripple/basics/LocalValue.h>
#include <ripple/basics/XRPAmount.h>
@@ -40,7 +41,7 @@ namespace ripple {
// Wire form:
// High 8 bits are (offset+142), legal range is, 80 to 22 inclusive
// Low 56 bits are value, legal range is 10^15 to (10^16 - 1) inclusive
class STAmount : public STBase
class STAmount final : public STBase, public CountedObject<STAmount>
{
public:
using mantissa_type = std::uint64_t;

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PROTOCOL_STPATHSET_H_INCLUDED
#define RIPPLE_PROTOCOL_STPATHSET_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/json/json_value.h>
#include <ripple/protocol/SField.h>
#include <ripple/protocol/STBase.h>
@@ -30,7 +31,7 @@
namespace ripple {
class STPathElement
class STPathElement final : public CountedObject<STPathElement>
{
unsigned int mType;
AccountID mAccountID;
@@ -114,7 +115,7 @@ private:
get_hash(STPathElement const& element);
};
class STPath
class STPath final : public CountedObject<STPath>
{
std::vector<STPathElement> mPath;
@@ -172,7 +173,7 @@ public:
//------------------------------------------------------------------------------
// A set of zero or more payment paths
class STPathSet final : public STBase
class STPathSet final : public STBase, public CountedObject<STPathSet>
{
std::vector<STPath> value;

View File

@@ -51,25 +51,25 @@ doPathFind(RPC::JsonContext& context)
if (sSubCommand == "create")
{
context.loadType = Resource::feeHighBurdenRPC;
context.infoSub->clearPathRequest();
context.infoSub->clearRequest();
return context.app.getPathRequests().makePathRequest(
context.infoSub, lpLedger, context.params);
}
if (sSubCommand == "close")
{
PathRequest::pointer request = context.infoSub->getPathRequest();
InfoSubRequest::pointer request = context.infoSub->getRequest();
if (!request)
return rpcError(rpcNO_PF_REQUEST);
context.infoSub->clearPathRequest();
return request->doClose(context.params);
context.infoSub->clearRequest();
return request->doClose();
}
if (sSubCommand == "status")
{
PathRequest::pointer request = context.infoSub->getPathRequest();
InfoSubRequest::pointer request = context.infoSub->getRequest();
if (!request)
return rpcError(rpcNO_PF_REQUEST);

View File

@@ -223,7 +223,8 @@ checkPayment(
if (auto ledger = app.openLedger().current())
{
Pathfinder pf(
std::make_shared<RippleLineCache>(ledger),
std::make_shared<RippleLineCache>(
ledger, app.journal("RippleLineCache")),
srcAddressID,
*dstAccountID,
sendMax.issue().currency,

View File

@@ -0,0 +1,105 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2022 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#include <test/jtx/Account.h>
#include <ripple/basics/join.h>
#include <ripple/beast/unit_test.h>
namespace ripple {
namespace test {
struct join_test : beast::unit_test::suite
{
void
run() override
{
auto test = [this](auto collectionanddelimiter, std::string expected) {
std::stringstream ss;
// Put something else in the buffer before and after to ensure that
// the << operator returns the stream correctly.
ss << "(" << collectionanddelimiter << ")";
auto const str = ss.str();
BEAST_EXPECT(str.substr(1, str.length() - 2) == expected);
BEAST_EXPECT(str.front() == '(');
BEAST_EXPECT(str.back() == ')');
};
// C++ array
test(
CollectionAndDelimiter(std::array<int, 4>{2, -1, 5, 10}, "/"),
"2/-1/5/10");
// One item C++ array edge case
test(
CollectionAndDelimiter(std::array<std::string, 1>{"test"}, " & "),
"test");
// Empty C++ array edge case
test(CollectionAndDelimiter(std::array<int, 0>{}, ","), "");
{
// C-style array
char letters[4]{'w', 'a', 's', 'd'};
test(CollectionAndDelimiter(letters, std::to_string(0)), "w0a0s0d");
}
{
// Auto sized C-style array
std::string words[]{"one", "two", "three", "four"};
test(CollectionAndDelimiter(words, "\n"), "one\ntwo\nthree\nfour");
}
{
// One item C-style array edge case
std::string words[]{"thing"};
test(CollectionAndDelimiter(words, "\n"), "thing");
}
// Initializer list
test(
CollectionAndDelimiter(std::initializer_list<size_t>{19, 25}, "+"),
"19+25");
// vector
test(
CollectionAndDelimiter(std::vector<int>{0, 42}, std::to_string(99)),
"09942");
{
// vector with one item edge case
using namespace jtx;
test(
CollectionAndDelimiter(
std::vector<Account>{Account::master}, "xxx"),
Account::master.human());
}
// empty vector edge case
test(CollectionAndDelimiter(std::vector<uint256>{}, ","), "");
// C-style string
test(CollectionAndDelimiter("string", " "), "s t r i n g");
// Empty C-style string edge case
test(CollectionAndDelimiter("", "*"), "");
// Single char C-style string edge case
test(CollectionAndDelimiter("x", "*"), "x");
// std::string
test(CollectionAndDelimiter(std::string{"string"}, "-"), "s-t-r-i-n-g");
// Empty std::string edge case
test(CollectionAndDelimiter(std::string{""}, "*"), "");
// Single char std::string edge case
test(CollectionAndDelimiter(std::string{"y"}, "*"), "y");
}
}; // namespace test
BEAST_DEFINE_TESTSUITE(join, ripple_basics, ripple);
} // namespace test
} // namespace ripple

View File

@@ -33,7 +33,8 @@ paths::operator()(Env& env, JTx& jt) const
auto const to = env.lookup(jv[jss::Destination].asString());
auto const amount = amountFromJson(sfAmount, jv[jss::Amount]);
Pathfinder pf(
std::make_shared<RippleLineCache>(env.current()),
std::make_shared<RippleLineCache>(
env.current(), env.app().journal("RippleLineCache")),
from,
to,
in_.currency,

View File

@@ -35,6 +35,9 @@ class GetCounts_test : public beast::unit_test::suite
Json::Value result;
{
using namespace std::chrono_literals;
// Add a little delay so the App's "uptime" will have a value.
std::this_thread::sleep_for(1s);
// check counts with no transactions posted
result = env.rpc("get_counts")[jss::result];
BEAST_EXPECT(result[jss::status] == "success");