Improve lifetime management of ledger objects (SLEs) to prevent runaway memory usage: (#4822)

* Add logging for Application.cpp sweep()
* Improve lifetime management of ledger objects (`SLE`s)
* Only store SLE digest in CachedView; get SLEs from CachedSLEs
* Also force release of last ledger used for path finding if there are
  no path finding requests to process
* Count more ST objects (derive from `CountedObject`)
* Track CachedView stats in CountedObjects
* Rename the CachedView counters
* Fix the scope of the digest lookup lock

Before this patch, if you asked "is it caching?" It was always caching.
This commit is contained in:
Ed Hennis
2024-01-12 12:42:33 -05:00
committed by GitHub
parent 4308407dc1
commit d9f90c84c0
19 changed files with 283 additions and 38 deletions

View File

@@ -83,6 +83,9 @@ public:
virtual void
stop() = 0;
virtual std::size_t
cacheSize() = 0;
};
std::unique_ptr<InboundLedgers>

View File

@@ -125,6 +125,27 @@ public:
void
stop();
std::size_t
tasksSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return tasks_.size();
}
std::size_t
deltasSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return deltas_.size();
}
std::size_t
skipListsSize() const
{
std::lock_guard<std::mutex> lock(mtx_);
return skipLists_.size();
}
private:
mutable std::mutex mtx_;
std::vector<std::shared_ptr<LedgerReplayTask>> tasks_;

View File

@@ -411,6 +411,13 @@ public:
mRecentFailures.clear();
}
std::size_t
cacheSize() override
{
ScopedLockType lock(mLock);
return mLedgers.size();
}
private:
clock_type& m_clock;

View File

@@ -1543,6 +1543,7 @@ LedgerMaster::updatePaths()
if (app_.getOPs().isNeedNetworkLedger())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Need network ledger for updating paths";
return;
}
@@ -1568,6 +1569,7 @@ LedgerMaster::updatePaths()
else
{ // Nothing to do
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug()) << "Nothing to do for updating paths";
return;
}
@@ -1584,6 +1586,7 @@ LedgerMaster::updatePaths()
<< "Published ledger too old for updating paths";
std::lock_guard ml(m_mutex);
--mPathFindThread;
mPathLedger.reset();
return;
}
}
@@ -1596,6 +1599,7 @@ LedgerMaster::updatePaths()
if (!pathRequests.requestsPending())
{
--mPathFindThread;
mPathLedger.reset();
JLOG(m_journal.debug())
<< "No path requests found. Nothing to do for updating "
"paths. "
@@ -1613,6 +1617,7 @@ LedgerMaster::updatePaths()
<< "No path requests left. No need for further updating "
"paths";
--mPathFindThread;
mPathLedger.reset();
return;
}
}

View File

@@ -1065,20 +1065,172 @@ public:
// VFALCO TODO fix the dependency inversion using an observer,
// have listeners register for "onSweep ()" notification.
nodeFamily_.sweep();
{
std::shared_ptr<FullBelowCache const> const fullBelowCache =
nodeFamily_.getFullBelowCache(0);
std::shared_ptr<TreeNodeCache const> const treeNodeCache =
nodeFamily_.getTreeNodeCache(0);
std::size_t const oldFullBelowSize = fullBelowCache->size();
std::size_t const oldTreeNodeSize = treeNodeCache->size();
nodeFamily_.sweep();
JLOG(m_journal.debug())
<< "NodeFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << fullBelowCache->size();
JLOG(m_journal.debug())
<< "NodeFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: " << treeNodeCache->size();
}
if (shardFamily_)
{
std::size_t const oldFullBelowSize =
shardFamily_->getFullBelowCacheSize();
std::size_t const oldTreeNodeSize =
shardFamily_->getTreeNodeCacheSize().second;
shardFamily_->sweep();
getMasterTransaction().sweep();
getNodeStore().sweep();
JLOG(m_journal.debug())
<< "ShardFamily::FullBelowCache sweep. Size before: "
<< oldFullBelowSize
<< "; size after: " << shardFamily_->getFullBelowCacheSize();
JLOG(m_journal.debug())
<< "ShardFamily::TreeNodeCache sweep. Size before: "
<< oldTreeNodeSize << "; size after: "
<< shardFamily_->getTreeNodeCacheSize().second;
}
{
TaggedCache<uint256, Transaction> const& masterTxCache =
getMasterTransaction().getCache();
std::size_t const oldMasterTxSize = masterTxCache.size();
getMasterTransaction().sweep();
JLOG(m_journal.debug())
<< "MasterTransaction sweep. Size before: " << oldMasterTxSize
<< "; size after: " << masterTxCache.size();
}
{
// Does not appear to have an associated cache.
getNodeStore().sweep();
}
if (shardStore_)
{
// Does not appear to have an associated cache.
shardStore_->sweep();
getLedgerMaster().sweep();
getTempNodeCache().sweep();
getValidations().expire(m_journal);
getInboundLedgers().sweep();
getLedgerReplayer().sweep();
m_acceptedLedgerCache.sweep();
cachedSLEs_.sweep();
}
{
std::size_t const oldLedgerMasterCacheSize =
getLedgerMaster().getFetchPackCacheSize();
getLedgerMaster().sweep();
JLOG(m_journal.debug())
<< "LedgerMaster sweep. Size before: "
<< oldLedgerMasterCacheSize << "; size after: "
<< getLedgerMaster().getFetchPackCacheSize();
}
{
// NodeCache == TaggedCache<SHAMapHash, Blob>
std::size_t const oldTempNodeCacheSize = getTempNodeCache().size();
getTempNodeCache().sweep();
JLOG(m_journal.debug())
<< "TempNodeCache sweep. Size before: " << oldTempNodeCacheSize
<< "; size after: " << getTempNodeCache().size();
}
{
std::size_t const oldCurrentCacheSize =
getValidations().sizeOfCurrentCache();
std::size_t const oldSizeSeqEnforcesSize =
getValidations().sizeOfSeqEnforcersCache();
std::size_t const oldByLedgerSize =
getValidations().sizeOfByLedgerCache();
std::size_t const oldBySequenceSize =
getValidations().sizeOfBySequenceCache();
getValidations().expire(m_journal);
JLOG(m_journal.debug())
<< "Validations Current expire. Size before: "
<< oldCurrentCacheSize
<< "; size after: " << getValidations().sizeOfCurrentCache();
JLOG(m_journal.debug())
<< "Validations SeqEnforcer expire. Size before: "
<< oldSizeSeqEnforcesSize << "; size after: "
<< getValidations().sizeOfSeqEnforcersCache();
JLOG(m_journal.debug())
<< "Validations ByLedger expire. Size before: "
<< oldByLedgerSize
<< "; size after: " << getValidations().sizeOfByLedgerCache();
JLOG(m_journal.debug())
<< "Validations BySequence expire. Size before: "
<< oldBySequenceSize
<< "; size after: " << getValidations().sizeOfBySequenceCache();
}
{
std::size_t const oldInboundLedgersSize =
getInboundLedgers().cacheSize();
getInboundLedgers().sweep();
JLOG(m_journal.debug())
<< "InboundLedgers sweep. Size before: "
<< oldInboundLedgersSize
<< "; size after: " << getInboundLedgers().cacheSize();
}
{
size_t const oldTasksSize = getLedgerReplayer().tasksSize();
size_t const oldDeltasSize = getLedgerReplayer().deltasSize();
size_t const oldSkipListsSize = getLedgerReplayer().skipListsSize();
getLedgerReplayer().sweep();
JLOG(m_journal.debug())
<< "LedgerReplayer tasks sweep. Size before: " << oldTasksSize
<< "; size after: " << getLedgerReplayer().tasksSize();
JLOG(m_journal.debug())
<< "LedgerReplayer deltas sweep. Size before: "
<< oldDeltasSize
<< "; size after: " << getLedgerReplayer().deltasSize();
JLOG(m_journal.debug())
<< "LedgerReplayer skipLists sweep. Size before: "
<< oldSkipListsSize
<< "; size after: " << getLedgerReplayer().skipListsSize();
}
{
std::size_t const oldAcceptedLedgerSize =
m_acceptedLedgerCache.size();
m_acceptedLedgerCache.sweep();
JLOG(m_journal.debug())
<< "AcceptedLedgerCache sweep. Size before: "
<< oldAcceptedLedgerSize
<< "; size after: " << m_acceptedLedgerCache.size();
}
{
std::size_t const oldCachedSLEsSize = cachedSLEs_.size();
cachedSLEs_.sweep();
JLOG(m_journal.debug())
<< "CachedSLEs sweep. Size before: " << oldCachedSLEsSize
<< "; size after: " << cachedSLEs_.size();
}
#ifdef RIPPLED_REPORTING
if (auto pg = dynamic_cast<PostgresDatabase*>(&*mRelationalDatabase))

View File

@@ -200,6 +200,9 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
break;
}
// Hold on to the line cache until after the lock is released, so it can
// be destroyed outside of the lock
std::shared_ptr<RippleLineCache> lastCache;
{
// Get the latest requests, cache, and ledger for next pass
std::lock_guard sl(mLock);
@@ -207,6 +210,7 @@ PathRequests::updateAll(std::shared_ptr<ReadView const> const& inLedger)
if (requests_.empty())
break;
requests = requests_;
lastCache = cache;
cache = getLineCache(cache->getLedger(), false);
}
} while (!app_.getJobQueue().isStopping());

View File

@@ -1142,6 +1142,34 @@ public:
return laggards;
}
std::size_t
sizeOfCurrentCache() const
{
std::lock_guard lock{mutex_};
return current_.size();
}
std::size_t
sizeOfSeqEnforcersCache() const
{
std::lock_guard lock{mutex_};
return seqEnforcers_.size();
}
std::size_t
sizeOfByLedgerCache() const
{
std::lock_guard lock{mutex_};
return byLedger_.size();
}
std::size_t
sizeOfBySequenceCache() const
{
std::lock_guard lock{mutex_};
return bySequence_.size();
}
};
} // namespace ripple

View File

@@ -38,10 +38,7 @@ private:
DigestAwareReadView const& base_;
CachedSLEs& cache_;
std::mutex mutable mutex_;
std::unordered_map<
key_type,
std::shared_ptr<SLE const>,
hardened_hash<>> mutable map_;
std::unordered_map<key_type, uint256, hardened_hash<>> mutable map_;
public:
CachedViewImpl() = delete;

View File

@@ -33,25 +33,40 @@ CachedViewImpl::exists(Keylet const& k) const
std::shared_ptr<SLE const>
CachedViewImpl::read(Keylet const& k) const
{
{
std::lock_guard lock(mutex_);
auto const iter = map_.find(k.key);
if (iter != map_.end())
static CountedObjects::Counter hits{"CachedView::hit"};
static CountedObjects::Counter hitsexpired{"CachedView::hitExpired"};
static CountedObjects::Counter misses{"CachedView::miss"};
bool cacheHit = false;
bool baseRead = false;
auto const digest = [&]() -> std::optional<uint256> {
{
if (!iter->second || !k.check(*iter->second))
return nullptr;
return iter->second;
std::lock_guard lock(mutex_);
auto const iter = map_.find(k.key);
if (iter != map_.end())
{
cacheHit = true;
return iter->second;
}
}
}
auto const digest = base_.digest(k.key);
return base_.digest(k.key);
}();
if (!digest)
return nullptr;
auto sle = cache_.fetch(*digest, [&]() { return base_.read(k); });
auto sle = cache_.fetch(*digest, [&]() {
baseRead = true;
return base_.read(k);
});
if (cacheHit && baseRead)
hitsexpired.increment();
else if (cacheHit)
hits.increment();
else
misses.increment();
std::lock_guard lock(mutex_);
auto const er = map_.emplace(k.key, sle);
auto const& iter = er.first;
auto const er = map_.emplace(k.key, *digest);
bool const inserted = er.second;
if (iter->second && !k.check(*iter->second))
if (sle && !k.check(*sle))
{
if (!inserted)
{
@@ -62,7 +77,7 @@ CachedViewImpl::read(Keylet const& k) const
}
return nullptr;
}
return iter->second;
return sle;
}
} // namespace detail

View File

@@ -33,7 +33,7 @@ namespace ripple {
// Operations that clients may wish to perform against the network
// Master operational handler, server sequencer, network tracker
class InfoSubRequest
class InfoSubRequest : public CountedObject<InfoSubRequest>
{
public:
using pointer = std::shared_ptr<InfoSubRequest>;

View File

@@ -20,13 +20,15 @@
#ifndef RIPPLE_PROTOCOL_STACCOUNT_H_INCLUDED
#define RIPPLE_PROTOCOL_STACCOUNT_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/AccountID.h>
#include <ripple/protocol/STBase.h>
#include <string>
namespace ripple {
class STAccount final : public STBase
class STAccount final : public STBase, public CountedObject<STAccount>
{
private:
// The original implementation of STAccount kept the value in an STBlob.

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PROTOCOL_STBITSTRING_H_INCLUDED
#define RIPPLE_PROTOCOL_STBITSTRING_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/beast/utility/Zero.h>
#include <ripple/protocol/STBase.h>
@@ -30,7 +31,7 @@ namespace ripple {
// information of a template parameterized by an unsigned type. This RTTI
// information is needed to write gdb pretty printers.
template <int Bits>
class STBitString final : public STBase
class STBitString final : public STBase, public CountedObject<STBitString<Bits>>
{
static_assert(Bits > 0, "Number of bits must be positive");

View File

@@ -21,8 +21,10 @@
#define RIPPLE_PROTOCOL_STBLOB_H_INCLUDED
#include <ripple/basics/Buffer.h>
#include <ripple/basics/CountedObject.h>
#include <ripple/basics/Slice.h>
#include <ripple/protocol/STBase.h>
#include <cassert>
#include <cstring>
#include <memory>
@@ -30,7 +32,7 @@
namespace ripple {
// variable length byte string
class STBlob : public STBase
class STBlob : public STBase, public CountedObject<STBlob>
{
Buffer value_;
@@ -88,7 +90,7 @@ private:
};
inline STBlob::STBlob(STBlob const& rhs)
: STBase(rhs), value_(rhs.data(), rhs.size())
: STBase(rhs), CountedObject<STBlob>(rhs), value_(rhs.data(), rhs.size())
{
}

View File

@@ -20,12 +20,13 @@
#ifndef RIPPLE_PROTOCOL_STINTEGER_H_INCLUDED
#define RIPPLE_PROTOCOL_STINTEGER_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/STBase.h>
namespace ripple {
template <typename Integer>
class STInteger : public STBase
class STInteger : public STBase, public CountedObject<STInteger<Integer>>
{
public:
using value_type = Integer;

View File

@@ -28,7 +28,7 @@
namespace ripple {
class STIssue final : public STBase
class STIssue final : public STBase, CountedObject<STIssue>
{
private:
Issue issue_{xrpIssue()};

View File

@@ -20,13 +20,14 @@
#ifndef RIPPLE_PROTOCOL_STVECTOR256_H_INCLUDED
#define RIPPLE_PROTOCOL_STVECTOR256_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/STBase.h>
#include <ripple/protocol/STBitString.h>
#include <ripple/protocol/STInteger.h>
namespace ripple {
class STVector256 : public STBase
class STVector256 : public STBase, public CountedObject<STVector256>
{
std::vector<uint256> mValue;

View File

@@ -20,6 +20,7 @@
#ifndef RIPPLE_PROTOCOL_STXCHAINBRIDGE_H_INCLUDED
#define RIPPLE_PROTOCOL_STXCHAINBRIDGE_H_INCLUDED
#include <ripple/basics/CountedObject.h>
#include <ripple/protocol/STAccount.h>
#include <ripple/protocol/STBase.h>
#include <ripple/protocol/STIssue.h>
@@ -29,7 +30,7 @@ namespace ripple {
class Serializer;
class STObject;
class STXChainBridge final : public STBase
class STXChainBridge final : public STBase, public CountedObject<STXChainBridge>
{
STAccount lockingChainDoor_{sfLockingChainDoor};
STIssue lockingChainIssue_{sfLockingChainIssue};

View File

@@ -173,6 +173,12 @@ public:
{
}
virtual size_t
cacheSize() override
{
return 0;
}
LedgerMaster& ledgerSource;
LedgerMaster& ledgerSink;
InboundLedgersBehavior bhvr;

View File

@@ -30,7 +30,6 @@
#include <ripple/core/Config.h>
#include <ripple/json/json_value.h>
#include <ripple/json/to_string.h>
#include <ripple/ledger/CachedSLEs.h>
#include <ripple/protocol/Feature.h>
#include <ripple/protocol/Indexes.h>
#include <ripple/protocol/Issue.h>