From da694c830490ed2e81e760df74dd54f342db98ae Mon Sep 17 00:00:00 2001 From: Valentin Balaschenko <13349202+vlntb@users.noreply.github.com> Date: Fri, 13 Jun 2025 18:33:00 +0100 Subject: [PATCH] wip lock per partition --- include/xrpl/basics/TaggedCache.h | 10 +- include/xrpl/basics/TaggedCache.ipp | 100 +++++++++++++----- .../xrpl/basics/partitioned_unordered_map.h | 12 +++ 3 files changed, 95 insertions(+), 27 deletions(-) diff --git a/include/xrpl/basics/TaggedCache.h b/include/xrpl/basics/TaggedCache.h index 05c493c186..d1e2c46253 100644 --- a/include/xrpl/basics/TaggedCache.h +++ b/include/xrpl/basics/TaggedCache.h @@ -195,6 +195,9 @@ private: void collect_metrics(); + Mutex& + lockPartition(key_type const& key) const; + private: struct Stats { @@ -297,7 +300,8 @@ private: [[maybe_unused]] clock_type::time_point const& now, typename KeyValueCacheType::map_type& partition, SweptPointersVector& stuffToSweep, - std::atomic& allRemoval); + std::atomic& allRemoval, + Mutex& partitionLock); [[nodiscard]] std::thread sweepHelper( @@ -305,7 +309,8 @@ private: clock_type::time_point const& now, typename KeyOnlyCacheType::map_type& partition, SweptPointersVector&, - std::atomic& allRemovals); + std::atomic& allRemovals, + Mutex& partitionLock); beast::Journal m_journal; clock_type& m_clock; @@ -325,6 +330,7 @@ private: cache_type m_cache; // Hold strong reference to recent objects std::atomic m_hits; std::atomic m_misses; + mutable std::vector partitionLocks_; }; } // namespace ripple diff --git a/include/xrpl/basics/TaggedCache.ipp b/include/xrpl/basics/TaggedCache.ipp index 02e7ae2203..41b13297cf 100644 --- a/include/xrpl/basics/TaggedCache.ipp +++ b/include/xrpl/basics/TaggedCache.ipp @@ -60,6 +60,7 @@ inline TaggedCache< , m_hits(0) , m_misses(0) { + partitionLocks_ = std::vector(m_cache.partitions()); } template < @@ -105,7 +106,13 @@ TaggedCache< KeyEqual, Mutex>::size() const { - return m_cache.size(); + std::size_t totalSize = 0; + for (size_t i = 0; i < partitionLocks_.size(); ++i) + { + std::lock_guard lock(partitionLocks_[i]); + totalSize += m_cache.map()[i].size(); + } + return totalSize; } template < @@ -151,7 +158,7 @@ TaggedCache< KeyEqual, Mutex>::getTrackSize() const { - return m_cache.size(); + return size(); } template < @@ -200,7 +207,11 @@ TaggedCache< KeyEqual, Mutex>::clear() { + for (auto& mutex : partitionLocks_) + mutex.lock(); m_cache.clear(); + for (auto& mutex : partitionLocks_) + mutex.unlock(); m_cache_count.store(0, std::memory_order_relaxed); } @@ -224,7 +235,11 @@ TaggedCache< KeyEqual, Mutex>::reset() { + for (auto& mutex : partitionLocks_) + mutex.lock(); m_cache.clear(); + for (auto& mutex : partitionLocks_) + mutex.unlock(); m_cache_count.store(0, std::memory_order_relaxed); m_hits.store(0, std::memory_order_relaxed); m_misses.store(0, std::memory_order_relaxed); @@ -251,7 +266,7 @@ TaggedCache< KeyEqual, Mutex>::touch_if_exists(KeyComparable const& key) { - // TOOD: acuiqring iterator should lock the partition + std::lock_guard lock(lockPartition(key)); auto const iter(m_cache.find(key)); if (iter == m_cache.end()) { @@ -293,8 +308,6 @@ TaggedCache< auto const start = std::chrono::steady_clock::now(); { - // TODO: lock the cache partition - if (m_target_size == 0 || (static_cast(m_cache.size()) <= m_target_size)) { @@ -325,7 +338,8 @@ TaggedCache< now, m_cache.map()[p], allStuffToSweep[p], - allRemovals)); + allRemovals, + partitionLocks_[p])); } for (std::thread& worker : workers) worker.join(); @@ -365,7 +379,9 @@ TaggedCache< { // Remove from cache, if !valid, remove from map too. Returns true if // removed from cache - // TODO: acuiqring iterator should lock the partition + + std::lock_guard lock(lockPartition(key)); + auto cit = m_cache.find(key); if (cit == m_cache.end()) @@ -416,9 +432,8 @@ TaggedCache< // Return canonical value, store if needed, refresh in cache // Return values: true=we had the data already - // TODO: acuiqring iterator should lock the partition + std::lock_guard lock(lockPartition(key)); auto cit = m_cache.find(key); - if (cit == m_cache.end()) { m_cache.emplace( @@ -555,7 +570,8 @@ TaggedCache< KeyEqual, Mutex>::fetch(key_type const& key) { - // TODO: do we need any lock here, since we are returing a shared pointer? + std::lock_guard lock(lockPartition(key)); + auto ret = initialFetch(key); if (!ret) m_misses.fetch_add(1, std::memory_order_relaxed); @@ -623,11 +639,11 @@ TaggedCache< -> std::enable_if_t { clock_type::time_point const now(m_clock.now()); - auto [it, inserted] = - m_cache.emplace( // TODO: make sure partition is locked - std::piecewise_construct, - std::forward_as_tuple(key), - std::forward_as_tuple(now)); + std::lock_guard lock(lockPartition(key)); + auto [it, inserted] = m_cache.emplace( + std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(now)); if (!inserted) it->second.last_access = now; return inserted; @@ -687,8 +703,12 @@ TaggedCache< { v.reserve(m_cache.size()); - for (auto const& _ : m_cache) // TODO: make sure partition is locked - v.push_back(_.first); + for (std::size_t i = 0; i < partitionLocks_.size(); ++i) + { + std::lock_guard lock(partitionLocks_[i]); + for (auto const& entry : m_cache.map()[i]) + v.push_back(entry.first); + } } return v; @@ -743,11 +763,10 @@ TaggedCache< KeyEqual, Mutex>::fetch(key_type const& digest, Handler const& h) { - // TODO: do we need any lock here, since we are returing a shared pointer? - { // TODO: potenially remove this scope - if (auto ret = initialFetch(digest)) - return ret; - } + std::lock_guard lock(lockPartition(digest)); + + if (auto ret = initialFetch(digest)) + return ret; auto sle = h(); if (!sle) @@ -782,6 +801,8 @@ TaggedCache< KeyEqual, Mutex>::initialFetch(key_type const& key) { + std::lock_guard lock(lockPartition(key)); + auto cit = m_cache.find(key); if (cit == m_cache.end()) return {}; @@ -793,7 +814,7 @@ TaggedCache< entry.touch(m_clock.now()); return entry.ptr.getStrong(); } - entry.ptr = entry.lock(); // TODO what is this? + entry.ptr = entry.lock(); if (entry.isCached()) { // independent of cache size, so not counted as a hit @@ -866,12 +887,15 @@ TaggedCache< [[maybe_unused]] clock_type::time_point const& now, typename KeyValueCacheType::map_type& partition, SweptPointersVector& stuffToSweep, - std::atomic& allRemovals) + std::atomic& allRemovals, + Mutex& partitionLock) { return std::thread([&, this]() { int cacheRemovals = 0; int mapRemovals = 0; + std::lock_guard lock(partitionLock); + // Keep references to all the stuff we sweep // so that we can destroy them outside the lock. stuffToSweep.reserve(partition.size()); @@ -954,12 +978,15 @@ TaggedCache< clock_type::time_point const& now, typename KeyOnlyCacheType::map_type& partition, SweptPointersVector&, - std::atomic& allRemovals) + std::atomic& allRemovals, + Mutex& partitionLock) { return std::thread([&, this]() { int cacheRemovals = 0; int mapRemovals = 0; + std::lock_guard lock(partitionLock); + // Keep references to all the stuff we sweep // so that we can destroy them outside the lock. { @@ -994,6 +1021,29 @@ TaggedCache< }); } +template < + class Key, + class T, + bool IsKeyCache, + class SharedWeakUnionPointer, + class SharedPointerType, + class Hash, + class KeyEqual, + class Mutex> +inline Mutex& +TaggedCache< + Key, + T, + IsKeyCache, + SharedWeakUnionPointer, + SharedPointerType, + Hash, + KeyEqual, + Mutex>::lockPartition(key_type const& key) const +{ + return partitionLocks_[m_cache.partition_index(key)]; +} + } // namespace ripple #endif diff --git a/include/xrpl/basics/partitioned_unordered_map.h b/include/xrpl/basics/partitioned_unordered_map.h index ec0fb298b8..d45221e4fb 100644 --- a/include/xrpl/basics/partitioned_unordered_map.h +++ b/include/xrpl/basics/partitioned_unordered_map.h @@ -277,6 +277,12 @@ public: return map_; } + partition_map_type const& + map() const + { + return map_; + } + iterator begin() { @@ -321,6 +327,12 @@ public: return cend(); } + std::size_t + partition_index(key_type const& key) const + { + return partitioner(key); + } + private: template void