Compare commits

...

3 Commits

Author SHA1 Message Date
Richard Holland
a1343056df clang 2024-10-12 18:50:30 +11:00
Richard Holland
0e818fc0f4 ensure peekMutex works correctly in the new setup 2024-10-12 18:38:04 +11:00
Richard Holland
f5705fa6db 16-way tagged cache to reduced mutex contention 2024-10-12 17:38:57 +11:00
3 changed files with 275 additions and 17 deletions

View File

@@ -58,12 +58,15 @@ LedgerHistory::insert(std::shared_ptr<Ledger const> ledger, bool validated)
assert(ledger->stateMap().getHash().isNonZero());
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
std::unique_lock sl(m_ledgers_by_hash.peekMutex(ledger->info().hash));
const bool alreadyHad = m_ledgers_by_hash.canonicalize_replace_cache(
ledger->info().hash, ledger);
if (validated)
{
std::unique_lock<std::shared_mutex> lock(mLedgersByIndexMutex);
mLedgersByIndex[ledger->info().seq] = ledger->info().hash;
}
return alreadyHad;
}
@@ -71,7 +74,7 @@ LedgerHistory::insert(std::shared_ptr<Ledger const> ledger, bool validated)
LedgerHash
LedgerHistory::getLedgerHash(LedgerIndex index)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
std::unique_lock<std::shared_mutex> lock(mLedgersByIndexMutex);
auto it = mLedgersByIndex.find(index);
if (it != mLedgersByIndex.end())
@@ -84,13 +87,13 @@ std::shared_ptr<Ledger const>
LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
std::unique_lock<std::shared_mutex> lock(mLedgersByIndexMutex);
auto it = mLedgersByIndex.find(index);
if (it != mLedgersByIndex.end())
{
uint256 hash = it->second;
sl.unlock();
return getLedgerByHash(hash);
}
}
@@ -104,11 +107,17 @@ LedgerHistory::getLedgerBySeq(LedgerIndex index)
{
// Add this ledger to the local tracking by index
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex(ret->info().hash));
assert(ret->isImmutable());
m_ledgers_by_hash.canonicalize_replace_client(
ret->info().hash, ret);
}
assert(ret->isImmutable());
m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
{
std::unique_lock<std::shared_mutex> lock(mLedgersByIndexMutex);
mLedgersByIndex[ret->info().seq] = ret->info().hash;
}
return (ret->info().seq == index) ? ret : nullptr;
}
}
@@ -440,7 +449,7 @@ LedgerHistory::builtLedger(
LedgerHash hash = ledger->info().hash;
assert(!hash.isZero());
std::unique_lock sl(m_consensus_validated.peekMutex());
std::unique_lock sl(m_consensus_validated.peekMutex(index));
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -480,7 +489,7 @@ LedgerHistory::validatedLedger(
LedgerHash hash = ledger->info().hash;
assert(!hash.isZero());
std::unique_lock sl(m_consensus_validated.peekMutex());
std::unique_lock sl(m_consensus_validated.peekMutex(index));
auto entry = std::make_shared<cv_entry>();
m_consensus_validated.canonicalize_replace_client(index, entry);
@@ -515,7 +524,7 @@ LedgerHistory::validatedLedger(
bool
LedgerHistory::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash)
{
std::unique_lock sl(m_ledgers_by_hash.peekMutex());
std::unique_lock<std::shared_mutex> lock(mLedgersByIndexMutex);
auto it = mLedgersByIndex.find(ledgerIndex);
if ((it != mLedgersByIndex.end()) && (it->second != ledgerHash))

View File

@@ -27,6 +27,7 @@
#include <ripple/protocol/RippleLedgerHash.h>
#include <optional>
#include <shared_mutex>
namespace ripple {
@@ -150,6 +151,7 @@ private:
// Maps ledger indexes to the corresponding hash.
std::map<LedgerIndex, LedgerHash> mLedgersByIndex; // validated ledgers
std::shared_mutex mLedgersByIndexMutex;
beast::Journal j_;
};

View File

@@ -53,7 +53,7 @@ template <
class Hash = hardened_hash<>,
class KeyEqual = std::equal_to<Key>,
class Mutex = std::recursive_mutex>
class TaggedCache
class TaggedCacheSingle
{
public:
using mutex_type = Mutex;
@@ -62,7 +62,7 @@ public:
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
public:
TaggedCache(
TaggedCacheSingle(
std::string const& name,
int size,
clock_type::duration expiration,
@@ -74,7 +74,7 @@ public:
, m_clock(clock)
, m_stats(
name,
std::bind(&TaggedCache::collect_metrics, this),
std::bind(&TaggedCacheSingle::collect_metrics, this),
collector)
, m_name(name)
, m_target_size(size)
@@ -258,7 +258,7 @@ public:
// At this point allStuffToSweep will go out of scope outside the lock
// and decrement the reference count on each strong pointer.
JLOG(m_journal.debug())
<< m_name << " TaggedCache sweep lock duration "
<< m_name << " TaggedCacheSingle sweep lock duration "
<< std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start)
.count()
@@ -715,7 +715,7 @@ private:
if (mapRemovals || cacheRemovals)
{
JLOG(m_journal.debug())
<< "TaggedCache partition sweep " << m_name
<< "TaggedCacheSingle partition sweep " << m_name
<< ": cache = " << partition.size() << "-" << cacheRemovals
<< ", map-=" << mapRemovals;
}
@@ -762,7 +762,7 @@ private:
if (mapRemovals || cacheRemovals)
{
JLOG(m_journal.debug())
<< "TaggedCache partition sweep " << m_name
<< "TaggedCacheSingle partition sweep " << m_name
<< ": cache = " << partition.size() << "-" << cacheRemovals
<< ", map-=" << mapRemovals;
}
@@ -793,6 +793,253 @@ private:
std::uint64_t m_misses;
};
#include <array>
#include <cstdint>
#include <memory>
#include <string>
#include <type_traits>
template <
class Key,
class T,
bool IsKeyCache = false,
class Hash = hardened_hash<>,
class KeyEqual = std::equal_to<Key>,
class Mutex = std::recursive_mutex>
class TaggedCache
{
private:
static constexpr size_t NUM_CACHES = 16;
using CacheType =
TaggedCacheSingle<Key, T, IsKeyCache, Hash, KeyEqual, Mutex>;
std::array<std::unique_ptr<CacheType>, NUM_CACHES> caches;
// Helper function to get the index of the cache based on the key
size_t
getCacheIndex(const Key& key) const
{
// Assuming Key can be hashed
size_t hash = Hash{}(key);
return hash & 0xF; // Use the least significant nibble
}
public:
using mutex_type = Mutex;
using key_type = Key;
using mapped_type = T;
using clock_type = beast::abstract_clock<std::chrono::steady_clock>;
TaggedCache(
std::string const& name,
int size,
clock_type::duration expiration,
clock_type& clock,
beast::Journal journal,
beast::insight::Collector::ptr const& collector =
beast::insight::NullCollector::New())
{
for (size_t i = 0; i < NUM_CACHES; ++i)
{
caches[i] = std::make_unique<CacheType>(
name + "_" + std::to_string(i),
size / NUM_CACHES, // Distribute size across caches
expiration,
clock,
journal,
collector);
}
}
// Implement all public methods of TaggedCache, delegating to the
// appropriate cache instance
clock_type&
clock()
{
return caches[0]->clock(); // All caches share the same clock
}
std::size_t
size() const
{
std::size_t total = 0;
for (const auto& cache : caches)
total += cache->size();
return total;
}
void
setTargetSize(int s)
{
int sizePerCache = s / NUM_CACHES;
for (auto& cache : caches)
cache->setTargetSize(sizePerCache);
}
clock_type::duration
getTargetAge() const
{
return caches[0]
->getTargetAge(); // All caches share the same target age
}
void
setTargetAge(clock_type::duration s)
{
for (auto& cache : caches)
cache->setTargetAge(s);
}
int
getCacheSize() const
{
int total = 0;
for (const auto& cache : caches)
total += cache->getCacheSize();
return total;
}
int
getTrackSize() const
{
int total = 0;
for (const auto& cache : caches)
total += cache->getTrackSize();
return total;
}
float
getHitRate()
{
float totalHitRate = 0;
for (const auto& cache : caches)
totalHitRate += cache->getHitRate();
return totalHitRate / NUM_CACHES;
}
void
clear()
{
for (auto& cache : caches)
cache->clear();
}
void
reset()
{
for (auto& cache : caches)
cache->reset();
}
template <class KeyComparable>
bool
touch_if_exists(KeyComparable const& key)
{
return caches[getCacheIndex(key)]->touch_if_exists(key);
}
void
sweep()
{
for (auto& cache : caches)
cache->sweep();
}
bool
del(const key_type& key, bool valid)
{
return caches[getCacheIndex(key)]->del(key, valid);
}
bool
canonicalize(
const key_type& key,
std::shared_ptr<T>& data,
std::function<bool(std::shared_ptr<T> const&)>&& replace)
{
return caches[getCacheIndex(key)]->canonicalize(
key, data, std::move(replace));
}
bool
canonicalize_replace_cache(
const key_type& key,
std::shared_ptr<T> const& data)
{
return caches[getCacheIndex(key)]->canonicalize_replace_cache(
key, data);
}
bool
canonicalize_replace_client(const key_type& key, std::shared_ptr<T>& data)
{
return caches[getCacheIndex(key)]->canonicalize_replace_client(
key, data);
}
std::shared_ptr<T>
fetch(const key_type& key)
{
return caches[getCacheIndex(key)]->fetch(key);
}
template <class ReturnType = bool>
auto
insert(key_type const& key, T const& value)
-> std::enable_if_t<!IsKeyCache, ReturnType>
{
return caches[getCacheIndex(key)]->insert(key, value);
}
template <class ReturnType = bool>
auto
insert(key_type const& key) -> std::enable_if_t<IsKeyCache, ReturnType>
{
return caches[getCacheIndex(key)]->insert(key);
}
bool
retrieve(const key_type& key, T& data)
{
return caches[getCacheIndex(key)]->retrieve(key, data);
}
mutex_type&
peekMutex(key_type const& key)
{
return caches[getCacheIndex(key)]->peekMutex();
}
std::vector<key_type>
getKeys() const
{
std::vector<key_type> allKeys;
for (const auto& cache : caches)
{
auto keys = cache->getKeys();
allKeys.insert(allKeys.end(), keys.begin(), keys.end());
}
return allKeys;
}
double
rate() const
{
double totalRate = 0;
for (const auto& cache : caches)
totalRate += cache->rate();
return totalRate / NUM_CACHES;
}
template <class Handler>
std::shared_ptr<T>
fetch(key_type const& digest, Handler const& h)
{
return caches[getCacheIndex(digest)]->fetch(digest, h);
}
};
} // namespace ripple
#endif