wip lock per partition

This commit is contained in:
Valentin Balaschenko
2025-06-13 18:33:00 +01:00
parent d0f836581b
commit da694c8304
3 changed files with 95 additions and 27 deletions

View File

@@ -195,6 +195,9 @@ private:
void
collect_metrics();
Mutex&
lockPartition(key_type const& key) const;
private:
struct Stats
{
@@ -297,7 +300,8 @@ private:
[[maybe_unused]] clock_type::time_point const& now,
typename KeyValueCacheType::map_type& partition,
SweptPointersVector& stuffToSweep,
std::atomic<int>& allRemoval);
std::atomic<int>& allRemoval,
Mutex& partitionLock);
[[nodiscard]] std::thread
sweepHelper(
@@ -305,7 +309,8 @@ private:
clock_type::time_point const& now,
typename KeyOnlyCacheType::map_type& partition,
SweptPointersVector&,
std::atomic<int>& allRemovals);
std::atomic<int>& allRemovals,
Mutex& partitionLock);
beast::Journal m_journal;
clock_type& m_clock;
@@ -325,6 +330,7 @@ private:
cache_type m_cache; // Hold strong reference to recent objects
std::atomic<std::uint64_t> m_hits;
std::atomic<std::uint64_t> m_misses;
mutable std::vector<mutex_type> partitionLocks_;
};
} // namespace ripple

View File

@@ -60,6 +60,7 @@ inline TaggedCache<
, m_hits(0)
, m_misses(0)
{
partitionLocks_ = std::vector<mutex_type>(m_cache.partitions());
}
template <
@@ -105,7 +106,13 @@ TaggedCache<
KeyEqual,
Mutex>::size() const
{
return m_cache.size();
std::size_t totalSize = 0;
for (size_t i = 0; i < partitionLocks_.size(); ++i)
{
std::lock_guard<Mutex> lock(partitionLocks_[i]);
totalSize += m_cache.map()[i].size();
}
return totalSize;
}
template <
@@ -151,7 +158,7 @@ TaggedCache<
KeyEqual,
Mutex>::getTrackSize() const
{
return m_cache.size();
return size();
}
template <
@@ -200,7 +207,11 @@ TaggedCache<
KeyEqual,
Mutex>::clear()
{
for (auto& mutex : partitionLocks_)
mutex.lock();
m_cache.clear();
for (auto& mutex : partitionLocks_)
mutex.unlock();
m_cache_count.store(0, std::memory_order_relaxed);
}
@@ -224,7 +235,11 @@ TaggedCache<
KeyEqual,
Mutex>::reset()
{
for (auto& mutex : partitionLocks_)
mutex.lock();
m_cache.clear();
for (auto& mutex : partitionLocks_)
mutex.unlock();
m_cache_count.store(0, std::memory_order_relaxed);
m_hits.store(0, std::memory_order_relaxed);
m_misses.store(0, std::memory_order_relaxed);
@@ -251,7 +266,7 @@ TaggedCache<
KeyEqual,
Mutex>::touch_if_exists(KeyComparable const& key)
{
// TOOD: acuiqring iterator should lock the partition
std::lock_guard<Mutex> lock(lockPartition(key));
auto const iter(m_cache.find(key));
if (iter == m_cache.end())
{
@@ -293,8 +308,6 @@ TaggedCache<
auto const start = std::chrono::steady_clock::now();
{
// TODO: lock the cache partition
if (m_target_size == 0 ||
(static_cast<int>(m_cache.size()) <= m_target_size))
{
@@ -325,7 +338,8 @@ TaggedCache<
now,
m_cache.map()[p],
allStuffToSweep[p],
allRemovals));
allRemovals,
partitionLocks_[p]));
}
for (std::thread& worker : workers)
worker.join();
@@ -365,7 +379,9 @@ TaggedCache<
{
// Remove from cache, if !valid, remove from map too. Returns true if
// removed from cache
// TODO: acuiqring iterator should lock the partition
std::lock_guard<Mutex> lock(lockPartition(key));
auto cit = m_cache.find(key);
if (cit == m_cache.end())
@@ -416,9 +432,8 @@ TaggedCache<
// Return canonical value, store if needed, refresh in cache
// Return values: true=we had the data already
// TODO: acuiqring iterator should lock the partition
std::lock_guard<Mutex> lock(lockPartition(key));
auto cit = m_cache.find(key);
if (cit == m_cache.end())
{
m_cache.emplace(
@@ -555,7 +570,8 @@ TaggedCache<
KeyEqual,
Mutex>::fetch(key_type const& key)
{
// TODO: do we need any lock here, since we are returing a shared pointer?
std::lock_guard<Mutex> lock(lockPartition(key));
auto ret = initialFetch(key);
if (!ret)
m_misses.fetch_add(1, std::memory_order_relaxed);
@@ -623,8 +639,8 @@ TaggedCache<
-> std::enable_if_t<IsKeyCache, ReturnType>
{
clock_type::time_point const now(m_clock.now());
auto [it, inserted] =
m_cache.emplace( // TODO: make sure partition is locked
std::lock_guard<Mutex> lock(lockPartition(key));
auto [it, inserted] = m_cache.emplace(
std::piecewise_construct,
std::forward_as_tuple(key),
std::forward_as_tuple(now));
@@ -687,8 +703,12 @@ TaggedCache<
{
v.reserve(m_cache.size());
for (auto const& _ : m_cache) // TODO: make sure partition is locked
v.push_back(_.first);
for (std::size_t i = 0; i < partitionLocks_.size(); ++i)
{
std::lock_guard<Mutex> lock(partitionLocks_[i]);
for (auto const& entry : m_cache.map()[i])
v.push_back(entry.first);
}
}
return v;
@@ -743,11 +763,10 @@ TaggedCache<
KeyEqual,
Mutex>::fetch(key_type const& digest, Handler const& h)
{
// TODO: do we need any lock here, since we are returing a shared pointer?
{ // TODO: potenially remove this scope
std::lock_guard<Mutex> lock(lockPartition(digest));
if (auto ret = initialFetch(digest))
return ret;
}
auto sle = h();
if (!sle)
@@ -782,6 +801,8 @@ TaggedCache<
KeyEqual,
Mutex>::initialFetch(key_type const& key)
{
std::lock_guard<Mutex> lock(lockPartition(key));
auto cit = m_cache.find(key);
if (cit == m_cache.end())
return {};
@@ -793,7 +814,7 @@ TaggedCache<
entry.touch(m_clock.now());
return entry.ptr.getStrong();
}
entry.ptr = entry.lock(); // TODO what is this?
entry.ptr = entry.lock();
if (entry.isCached())
{
// independent of cache size, so not counted as a hit
@@ -866,12 +887,15 @@ TaggedCache<
[[maybe_unused]] clock_type::time_point const& now,
typename KeyValueCacheType::map_type& partition,
SweptPointersVector& stuffToSweep,
std::atomic<int>& allRemovals)
std::atomic<int>& allRemovals,
Mutex& partitionLock)
{
return std::thread([&, this]() {
int cacheRemovals = 0;
int mapRemovals = 0;
std::lock_guard<Mutex> lock(partitionLock);
// Keep references to all the stuff we sweep
// so that we can destroy them outside the lock.
stuffToSweep.reserve(partition.size());
@@ -954,12 +978,15 @@ TaggedCache<
clock_type::time_point const& now,
typename KeyOnlyCacheType::map_type& partition,
SweptPointersVector&,
std::atomic<int>& allRemovals)
std::atomic<int>& allRemovals,
Mutex& partitionLock)
{
return std::thread([&, this]() {
int cacheRemovals = 0;
int mapRemovals = 0;
std::lock_guard<Mutex> lock(partitionLock);
// Keep references to all the stuff we sweep
// so that we can destroy them outside the lock.
{
@@ -994,6 +1021,29 @@ TaggedCache<
});
}
template <
class Key,
class T,
bool IsKeyCache,
class SharedWeakUnionPointer,
class SharedPointerType,
class Hash,
class KeyEqual,
class Mutex>
inline Mutex&
TaggedCache<
Key,
T,
IsKeyCache,
SharedWeakUnionPointer,
SharedPointerType,
Hash,
KeyEqual,
Mutex>::lockPartition(key_type const& key) const
{
return partitionLocks_[m_cache.partition_index(key)];
}
} // namespace ripple
#endif

View File

@@ -277,6 +277,12 @@ public:
return map_;
}
partition_map_type const&
map() const
{
return map_;
}
iterator
begin()
{
@@ -321,6 +327,12 @@ public:
return cend();
}
std::size_t
partition_index(key_type const& key) const
{
return partitioner(key);
}
private:
template <class T>
void