Compare commits

...

12 Commits

Author SHA1 Message Date
Valentin Balaschenko
1cfdf70852 Merge branch 'develop' into vlntb/mem-leak-ledger-history 2025-10-08 19:17:18 +01:00
Valentin Balaschenko
28a01cfe5a decouple LedgerHistory and TaggedCache, renaming 2025-10-08 15:16:35 +01:00
Valentin Balaschenko
d4d5e70cba locking 2025-10-08 14:59:15 +01:00
Valentin Balaschenko
0056f9e023 formatting 2025-10-07 18:30:25 +01:00
Valentin Balaschenko
c7f5d25dac wip: refactore 2025-10-07 18:24:52 +01:00
Valentin Balaschenko
40ea5c6677 Merge branch 'develop' into vlntb/mem-leak-ledger-history 2025-10-07 14:02:12 +01:00
Valentin Balaschenko
5dacb5fb25 Merge branch 'develop' into vlntb/mem-leak-ledger-history 2025-09-25 17:21:58 +01:00
Valentin Balaschenko
501e6aa1df Merge branch 'develop' into vlntb/mem-leak-ledger-history 2025-09-25 16:41:52 +01:00
Valentin Balaschenko
053c057903 Merge branch 'develop' into vlntb/mem-leak-ledger-history 2025-09-24 16:03:31 +01:00
Valentin Balaschenko
2f57222858 fix 2025-06-02 12:15:16 +01:00
Valentin Balaschenko
164c225670 improve cleanup 2025-06-02 12:00:49 +01:00
Valentin Balaschenko
5b38eb3532 lru map for indexes 2025-06-02 11:02:58 +01:00
5 changed files with 276 additions and 55 deletions

View File

@@ -170,9 +170,6 @@ public:
bool
retrieve(key_type const& key, T& data);
mutex_type&
peekMutex();
std::vector<key_type>
getKeys() const;

View File

@@ -668,29 +668,6 @@ TaggedCache<
return true;
}
template <
class Key,
class T,
bool IsKeyCache,
class SharedWeakUnionPointer,
class SharedPointerType,
class Hash,
class KeyEqual,
class Mutex>
inline auto
TaggedCache<
Key,
T,
IsKeyCache,
SharedWeakUnionPointer,
SharedPointerType,
Hash,
KeyEqual,
Mutex>::peekMutex() -> mutex_type&
{
return m_mutex;
}
template <
class Key,
class T,

View File

@@ -27,8 +27,6 @@
namespace ripple {
// FIXME: Need to clean up ledgers by index at some point
LedgerHistory::LedgerHistory(
beast::insight::Collector::ptr const& collector,
Application& app)
@@ -47,6 +45,7 @@ LedgerHistory::LedgerHistory(
std::chrono::minutes{5},
stopwatch(),
app_.journal("TaggedCache"))
, mLedgersByIndex(256)
, j_(app.journal("LedgerHistory"))
{
}
@@ -63,12 +62,16 @@ LedgerHistory::insert(
ledger->stateMap().getHash().isNonZero(),
"ripple::LedgerHistory::insert : nonzero hash");
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
bool const alreadyHad = m_ledgers_by_hash.canonicalize_replace_cache(
ledger->info().hash, ledger);
if (validated)
{
mLedgersByIndex[ledger->info().seq] = ledger->info().hash;
JLOG(j_.info()) << "LedgerHistory::insert: mLedgersByIndex size: "
<< mLedgersByIndex.size() << " , total size: "
<< mLedgersByIndex.size() *
(sizeof(LedgerIndex) + sizeof(LedgerHash));
}
return alreadyHad;
}
@@ -76,25 +79,18 @@ LedgerHistory::insert(
LedgerHash
LedgerHistory::getLedgerHash(LedgerIndex index)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end())
return it->second;
if (auto p = mLedgersByIndex.get(index))
return *p;
return {};
}
std::shared_ptr<Ledger const>
LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
if (auto p = mLedgersByIndex.get(index))
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(index);
if (it != mLedgersByIndex.end())
{
uint256 hash = it->second;
sl.unlock();
return getLedgerByHash(hash);
}
uint256 const hash = *p;
return getLedgerByHash(hash);
}
std::shared_ptr<Ledger const> ret = loadByIndex(index, app_);
@@ -108,13 +104,16 @@ LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
// Add this ledger to the local tracking by index
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
XRPL_ASSERT(
ret->isImmutable(),
"ripple::LedgerHistory::getLedgerBySeq : immutable result ledger");
m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
JLOG(j_.info())
<< "LedgerHistory::getLedgerBySeq: mLedgersByIndex size: "
<< mLedgersByIndex.size() << " , total size: "
<< mLedgersByIndex.size() *
(sizeof(LedgerIndex) + sizeof(LedgerHash));
return (ret->info().seq == index) ? ret : nullptr;
}
}
@@ -458,8 +457,6 @@ LedgerHistory::builtLedger(
XRPL_ASSERT(
!hash.isZero(), "ripple::LedgerHistory::builtLedger : nonzero hash");
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -500,8 +497,6 @@ LedgerHistory::validatedLedger(
!hash.isZero(),
"ripple::LedgerHistory::validatedLedger : nonzero hash");
std::unique_lock sl(m_consensus_validated.peekMutex());
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -535,13 +530,13 @@ LedgerHistory::validatedLedger(
bool
LedgerHistory::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
auto it = mLedgersByIndex.find(ledgerIndex);
if ((it != mLedgersByIndex.end()) && (it->second != ledgerHash))
if (auto cur = mLedgersByIndex.get(ledgerIndex))
{
it->second = ledgerHash;
return false;
if (*cur != ledgerHash)
{
mLedgersByIndex.put(ledgerIndex, ledgerHash);
return false;
}
}
return true;
}

View File

@@ -22,6 +22,7 @@
#include <xrpld/app/ledger/Ledger.h>
#include <xrpld/app/main/Application.h>
#include <xrpld/core/detail/LRUCache.h>
#include <xrpl/beast/insight/Collector.h>
#include <xrpl/protocol/RippleLedgerHash.h>
@@ -149,7 +150,8 @@ private:
ConsensusValidated m_consensus_validated;
// Maps ledger indexes to the corresponding hash.
std::map<LedgerIndex, LedgerHash> mLedgersByIndex; // validated ledgers
LRUCache<LedgerIndex, LedgerHash, concurrency::ExclusiveMutex>
mLedgersByIndex; // validated ledgers
beast::Journal j_;
};

View File

@@ -0,0 +1,250 @@
//------------------------------------------------------------------------------
/*
This file is part of rippled: https://github.com/ripple/rippled
Copyright (c) 2012, 2013 Ripple Labs Inc.
Permission to use, copy, modify, and/or distribute this software for any
purpose with or without fee is hereby granted, provided that the above
copyright notice and this permission notice appear in all copies.
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
//==============================================================================
#ifndef RIPPLE_APP_LRU_CACHE_H_INCLUDED
#define RIPPLE_APP_LRU_CACHE_H_INCLUDED
#include <list>
#include <mutex>
#include <stdexcept>
#include <type_traits>
#include <unordered_map>
#include <utility>
namespace ripple {
namespace concurrency {
struct SingleThreaded
{
struct mutex_type
{
void
lock() noexcept
{
}
void
unlock() noexcept
{
}
};
using lock_guard = std::lock_guard<mutex_type>;
};
struct ExclusiveMutex
{
using mutex_type = std::mutex;
using lock_guard = std::lock_guard<mutex_type>;
};
} // namespace concurrency
template <
class Key,
class Value,
class Concurrency = concurrency::SingleThreaded>
class LRUCache
{
using List = std::list<Key>; // MRU .. LRU
using DataMap = std::unordered_map<Key, Value>; // Key -> Value
using PosMap =
std::unordered_map<Key, typename List::iterator>; // Key -> pos
// iterator in the
// list
public:
explicit LRUCache(std::size_t capacity) : capacity_(capacity)
{
if (capacity_ == 0)
throw std::invalid_argument("LRUCache capacity must be positive.");
data_.reserve(capacity_);
pos_.reserve(capacity_);
// TODO:
// static_assert(std::is_default_constructible_v<Value>,
// "LRUCache requires Value to be default-constructible for
// operator[]");
// static_assert(std::is_copy_constructible_v<Key> ||
// std::is_move_constructible_v<Key>,
// "LRUCache requires Key to be copy- or move-constructible");
}
LRUCache(LRUCache const&) = delete;
LRUCache&
operator=(LRUCache const&) = delete;
LRUCache(LRUCache&&) = delete;
LRUCache&
operator=(LRUCache&&) = delete;
Value&
operator[](Key const& key)
{
auto g = lock();
return insertOrpromote(key);
}
Value&
operator[](Key&& key)
{
auto g = lock();
auto it = data_.find(key);
if (it != data_.end())
{
promote(key);
return it->second;
}
evictIfFull();
usage_.emplace_front(std::move(key));
pos_.emplace(usage_.front(), usage_.begin());
auto d = data_.emplace(usage_.front(), Value{});
return d.first->second;
}
Value*
get(Key const& key)
{
auto g = lock();
auto it = data_.find(key);
if (it == data_.end())
return nullptr;
promote(key);
return &it->second;
}
template <class... Args>
Value&
put(Key const& key, Args&&... args)
{
auto g = lock();
if (auto it = data_.find(key); it != data_.end())
{
it->second = Value(std::forward<Args>(args)...);
promote(key);
return it->second;
}
evictIfFull();
usage_.emplace_front(key);
pos_.emplace(key, usage_.begin());
auto it = data_.emplace(key, Value(std::forward<Args>(args)...));
return it.first->second;
}
bool
erase(Key const& key)
{
auto g = lock();
auto it = data_.find(key);
if (it == data_.end())
return false;
usage_.erase(pos_.at(key));
pos_.erase(key);
data_.erase(it);
return true;
}
bool
contains(Key const& key) const
{
auto g = lock();
return data_.find(key) != data_.end();
}
std::size_t
size() const noexcept
{
auto g = lock();
return data_.size();
}
std::size_t
capacity() const noexcept
{
return capacity_;
}
bool
empty() const noexcept
{
auto g = lock();
return data_.empty();
}
void
clear()
{
auto g = lock();
data_.clear();
pos_.clear();
usage_.clear();
}
private:
Value&
insertOrpromote(Key const& key)
{
if (auto it = data_.find(key); it != data_.end())
{
promote(key);
return it->second;
}
evictIfFull();
usage_.emplace_front(key);
pos_.emplace(key, usage_.begin());
auto it = data_.emplace(key, Value{});
return it.first->second;
}
void
promote(Key const& key)
{
auto lit = pos_.at(key);
usage_.splice(usage_.begin(), usage_, lit); // O(1)
}
void
evictIfFull()
{
if (data_.size() < capacity_)
return;
auto const& k = usage_.back();
data_.erase(k);
pos_.erase(k);
usage_.pop_back();
}
typename Concurrency::lock_guard
lock() const
{
return typename Concurrency::lock_guard{mtx_};
}
private:
std::size_t const capacity_;
DataMap data_;
PosMap pos_;
List usage_;
mutable typename Concurrency::mutex_type mtx_;
};
} // namespace ripple
#endif