From ce3f0fcc235f5503d78fbcfdf88f00267837fede Mon Sep 17 00:00:00 2001 From: Nik Bougalis Date: Tue, 27 Sep 2022 22:39:02 -0700 Subject: [PATCH] Add true partitioning support to TaggedCache: The primary change introduced in this commit is partitioning of the `TaggedCache`, with each partition being indepedent of each other, making it possible to potentially perform multiple cache operations in parallel. In particular, the `sweep` operation is now parallelized by default on systems with at least four cores present. The `TaggedCache` could also be instantiated in 'key-only' mode which complicated the interface significantly but was only used by a single consumer (the `FullBelowCache`). This commit splits the 'key-only' functionality of `TaggedCache`, and incorporates directly into `FullBelowCache`, resulting in simple and cleaner interfaces for both `TaggedCache` and `FullBelowCache` but at a cost: some code duplication. Lastly, this commit includes a medley of changes, including the restructuring of `Transaction`, reducing its size by 48 bytes. --- Builds/CMake/RippledCore.cmake | 1 - Builds/levelization/results/ordering.txt | 1 + src/ripple/app/ledger/ConsensusTransSetSF.cpp | 17 +- src/ripple/app/ledger/ConsensusTransSetSF.h | 2 +- src/ripple/app/ledger/LedgerHistory.cpp | 144 +- src/ripple/app/ledger/LedgerHistory.h | 28 +- src/ripple/app/ledger/LedgerMaster.h | 8 +- src/ripple/app/ledger/TransactionMaster.h | 10 +- src/ripple/app/ledger/impl/LedgerMaster.cpp | 50 +- .../app/ledger/impl/TransactionMaster.cpp | 29 +- src/ripple/app/main/Application.h | 14 +- src/ripple/app/misc/NetworkOPs.cpp | 12 +- src/ripple/app/misc/SHAMapStoreImp.cpp | 19 +- src/ripple/app/misc/SHAMapStoreImp.h | 23 +- src/ripple/app/misc/Transaction.h | 147 +- src/ripple/app/misc/impl/AccountTxPaging.cpp | 13 +- src/ripple/app/misc/impl/Transaction.cpp | 52 +- .../app/rdb/backend/detail/impl/Node.cpp | 41 +- .../app/rdb/backend/detail/impl/Shard.cpp | 2 +- .../app/rdb/backend/impl/PostgresDatabase.cpp | 12 +- src/ripple/basics/KeyCache.h | 32 - src/ripple/basics/TaggedCache.h | 1231 ++++++++--------- src/ripple/beast/utility/atomic_shared_ptr.h | 214 +++ src/ripple/ledger/CachedSLEs.h | 53 +- src/ripple/nodestore/impl/DatabaseNodeImp.cpp | 8 +- src/ripple/nodestore/impl/Shard.cpp | 15 +- src/ripple/nodestore/impl/Shard.h | 3 - src/ripple/protocol/jss.h | 1 - src/ripple/rpc/handlers/AccountTx.cpp | 3 +- src/ripple/rpc/handlers/GetCounts.cpp | 41 +- src/ripple/rpc/handlers/Submit.cpp | 8 +- src/ripple/rpc/handlers/Tx.cpp | 11 +- src/ripple/rpc/handlers/TxHistory.cpp | 2 +- src/ripple/rpc/impl/TransactionSign.cpp | 39 +- src/ripple/shamap/Family.h | 6 +- src/ripple/shamap/FullBelowCache.h | 444 +++++- src/ripple/shamap/NodeFamily.h | 11 +- src/ripple/shamap/SHAMap.h | 1 - src/ripple/shamap/ShardFamily.h | 34 +- src/ripple/shamap/impl/NodeFamily.cpp | 54 +- src/ripple/shamap/impl/SHAMap.cpp | 2 +- src/ripple/shamap/impl/SHAMapSync.cpp | 4 +- src/ripple/shamap/impl/ShardFamily.cpp | 80 +- src/test/basics/KeyCache_test.cpp | 99 -- src/test/basics/TaggedCache_test.cpp | 13 +- src/test/shamap/common.h | 41 +- 46 files changed, 1712 insertions(+), 1363 deletions(-) delete mode 100644 src/ripple/basics/KeyCache.h create mode 100644 src/ripple/beast/utility/atomic_shared_ptr.h delete mode 100644 src/test/basics/KeyCache_test.cpp diff --git a/Builds/CMake/RippledCore.cmake b/Builds/CMake/RippledCore.cmake index 3633742e0..049262e2f 100644 --- a/Builds/CMake/RippledCore.cmake +++ b/Builds/CMake/RippledCore.cmake @@ -764,7 +764,6 @@ if (tests) src/test/basics/Expected_test.cpp src/test/basics/FileUtilities_test.cpp src/test/basics/IOUAmount_test.cpp - src/test/basics/KeyCache_test.cpp src/test/basics/Number_test.cpp src/test/basics/PerfLog_test.cpp src/test/basics/RangeSet_test.cpp diff --git a/Builds/levelization/results/ordering.txt b/Builds/levelization/results/ordering.txt index 92f663ab0..d3d831587 100644 --- a/Builds/levelization/results/ordering.txt +++ b/Builds/levelization/results/ordering.txt @@ -79,6 +79,7 @@ ripple.server > ripple.protocol ripple.shamap > ripple.basics ripple.shamap > ripple.beast ripple.shamap > ripple.crypto +ripple.shamap > ripple.json ripple.shamap > ripple.nodestore ripple.shamap > ripple.protocol test.app > ripple.app diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.cpp b/src/ripple/app/ledger/ConsensusTransSetSF.cpp index 476c75751..2b1f2bc34 100644 --- a/src/ripple/app/ledger/ConsensusTransSetSF.cpp +++ b/src/ripple/app/ledger/ConsensusTransSetSF.cpp @@ -46,7 +46,7 @@ ConsensusTransSetSF::gotNode( if (fromFilter) return; - m_nodeCache.insert(nodeHash, nodeData); + m_nodeCache.insert(nodeHash.as_uint256(), nodeData); if ((type == SHAMapNodeType::tnTRANSACTION_NM) && (nodeData.size() > 16)) { @@ -78,23 +78,20 @@ ConsensusTransSetSF::gotNode( std::optional ConsensusTransSetSF::getNode(SHAMapHash const& nodeHash) const { - Blob nodeData; - if (m_nodeCache.retrieve(nodeHash, nodeData)) - return nodeData; + auto const nh = nodeHash.as_uint256(); - auto txn = - app_.getMasterTransaction().fetch_from_cache(nodeHash.as_uint256()); + if (auto e = m_nodeCache.fetch(nh)) + return *e; // This requires copying the blob. - if (txn) + if (auto txn = app_.getMasterTransaction().fetch_from_cache(nh)) { // this is a transaction, and we have it JLOG(j_.trace()) << "Node in our acquiring TX set is TXN we have"; Serializer s; s.add32(HashPrefix::transactionID); txn->getSTransaction()->add(s); - assert(sha512Half(s.slice()) == nodeHash.as_uint256()); - nodeData = s.peekData(); - return nodeData; + assert(sha512Half(s.slice()) == nh); + return std::move(s.modData()); } return std::nullopt; diff --git a/src/ripple/app/ledger/ConsensusTransSetSF.h b/src/ripple/app/ledger/ConsensusTransSetSF.h index ad5b2a23a..ec353a680 100644 --- a/src/ripple/app/ledger/ConsensusTransSetSF.h +++ b/src/ripple/app/ledger/ConsensusTransSetSF.h @@ -34,7 +34,7 @@ namespace ripple { class ConsensusTransSetSF : public SHAMapSyncFilter { public: - using NodeCache = TaggedCache; + using NodeCache = TaggedCache; ConsensusTransSetSF(Application& app, NodeCache& nodeCache); diff --git a/src/ripple/app/ledger/LedgerHistory.cpp b/src/ripple/app/ledger/LedgerHistory.cpp index ed2ccd074..d79a79e36 100644 --- a/src/ripple/app/ledger/LedgerHistory.cpp +++ b/src/ripple/app/ledger/LedgerHistory.cpp @@ -34,13 +34,13 @@ LedgerHistory::LedgerHistory( : app_(app) , collector_(collector) , mismatch_counter_(collector->make_counter("ledger.history", "mismatch")) - , m_ledgers_by_hash( + , ledgerCache_( "LedgerCache", app_.config().getValueFor(SizedItem::ledgerSize), std::chrono::seconds{app_.config().getValueFor(SizedItem::ledgerAge)}, stopwatch(), app_.journal("TaggedCache")) - , m_consensus_validated( + , consensusValidated_( "ConsensusValidated", 64, std::chrono::minutes{5}, @@ -60,10 +60,11 @@ LedgerHistory::insert( assert(ledger->stateMap().getHash().isNonZero()); - std::unique_lock sl(m_ledgers_by_hash.peekMutex()); + const bool alreadyHad = + ledgerCache_.insert_or_assign(ledger->info().hash, ledger); + + std::unique_lock sl(mutex_); - const bool alreadyHad = m_ledgers_by_hash.canonicalize_replace_cache( - ledger->info().hash, ledger); if (validated) mLedgersByIndex[ledger->info().seq] = ledger->info().hash; @@ -73,9 +74,11 @@ LedgerHistory::insert( LedgerHash LedgerHistory::getLedgerHash(LedgerIndex index) { - std::unique_lock sl(m_ledgers_by_hash.peekMutex()); + std::unique_lock sl(mutex_); + if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end()) return it->second; + return {}; } @@ -83,10 +86,9 @@ std::shared_ptr LedgerHistory::getLedgerBySeq(LedgerIndex index) { { - std::unique_lock sl(m_ledgers_by_hash.peekMutex()); - auto it = mLedgersByIndex.find(index); + std::unique_lock sl(mutex_); - if (it != mLedgersByIndex.end()) + if (auto it = mLedgersByIndex.find(index); it != mLedgersByIndex.end()) { uint256 hash = it->second; sl.unlock(); @@ -100,22 +102,19 @@ LedgerHistory::getLedgerBySeq(LedgerIndex index) return ret; assert(ret->info().seq == index); + assert(ret->isImmutable()); + ledgerCache_.retrieve_or_insert(ret->info().hash, ret); - { - // Add this ledger to the local tracking by index - std::unique_lock sl(m_ledgers_by_hash.peekMutex()); - - assert(ret->isImmutable()); - m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret); - mLedgersByIndex[ret->info().seq] = ret->info().hash; - return (ret->info().seq == index) ? ret : nullptr; - } + // Add this ledger to the local tracking by index + std::lock_guard sl(mutex_); + mLedgersByIndex[ret->info().seq] = ret->info().hash; + return (ret->info().seq == index) ? ret : nullptr; } std::shared_ptr LedgerHistory::getLedgerByHash(LedgerHash const& hash) { - auto ret = m_ledgers_by_hash.fetch(hash); + auto ret = ledgerCache_.fetch(hash); if (ret) { @@ -131,7 +130,7 @@ LedgerHistory::getLedgerByHash(LedgerHash const& hash) assert(ret->isImmutable()); assert(ret->info().hash == hash); - m_ledgers_by_hash.canonicalize_replace_client(ret->info().hash, ret); + ledgerCache_.retrieve_or_insert(ret->info().hash, ret); assert(ret->info().hash == hash); return ret; @@ -299,21 +298,6 @@ log_metadata_difference( } //------------------------------------------------------------------------------ - -// Return list of leaves sorted by key -static std::vector -leaves(SHAMap const& sm) -{ - std::vector v; - for (auto const& item : sm) - v.push_back(&item); - std::sort( - v.begin(), v.end(), [](SHAMapItem const* lhs, SHAMapItem const* rhs) { - return lhs->key() < rhs->key(); - }); - return v; -} - void LedgerHistory::handleMismatch( LedgerHash const& built, @@ -341,9 +325,10 @@ LedgerHistory::handleMismatch( if (auto stream = j_.debug()) { - stream << "Built: " << getJson({*builtLedger, {}}); - stream << "Valid: " << getJson({*validLedger, {}}); - stream << "Consensus: " << consensus; + stream << "Mismatch on " << builtLedger->info().seq << ":\n" + << " Built: " << getJson({*builtLedger, {}}) << "\n" + << " Valid: " << getJson({*validLedger, {}}) << "\n" + << " Consensus: " << consensus; } // Determine the mismatch reason, distinguishing Byzantine @@ -375,6 +360,23 @@ LedgerHistory::handleMismatch( << to_string(*builtConsensusHash); } + // Grab the leaves from the specified SHAMap and sort them by key: + auto leaves = [](SHAMap const& sm) { + std::vector v; + + for (auto const& item : sm) + v.push_back(&item); + + std::sort( + v.begin(), + v.end(), + [](SHAMapItem const* lhs, SHAMapItem const* rhs) { + return lhs->key() < rhs->key(); + }); + + return v; + }; + // Find differences between built and valid ledgers auto const builtTx = leaves(builtLedger->txMap()); auto const validTx = leaves(validLedger->txMap()); @@ -432,10 +434,8 @@ LedgerHistory::builtLedger( LedgerHash hash = ledger->info().hash; assert(!hash.isZero()); - std::unique_lock sl(m_consensus_validated.peekMutex()); - auto entry = std::make_shared(); - m_consensus_validated.canonicalize_replace_client(index, entry); + consensusValidated_.retrieve_or_insert(index, entry); if (entry->validated && !entry->built) { @@ -472,61 +472,65 @@ LedgerHistory::validatedLedger( LedgerHash hash = ledger->info().hash; assert(!hash.isZero()); - std::unique_lock sl(m_consensus_validated.peekMutex()); - auto entry = std::make_shared(); - m_consensus_validated.canonicalize_replace_client(index, entry); + consensusValidated_.retrieve_or_insert(index, entry); - if (entry->built && !entry->validated) + if (entry->built && !entry->validated && entry->built.value() != hash) { - if (entry->built.value() != hash) - { - JLOG(j_.error()) - << "MISMATCH: seq=" << index - << " built:" << entry->built.value() << " then:" << hash; - handleMismatch( - entry->built.value(), - hash, - entry->builtConsensusHash, - consensusHash, - entry->consensus.value()); - } - else - { - // We built a ledger locally and then validated it - JLOG(j_.debug()) << "MATCH: seq=" << index; - } + JLOG(j_.error()) << "Mismatch on validated ledger (seq " << index + << "): built is" << entry->built.value() + << ", validated is:" << hash; + + handleMismatch( + entry->built.value(), + hash, + entry->builtConsensusHash, + consensusHash, + entry->consensus.value()); } entry->validated.emplace(hash); entry->validatedConsensusHash = consensusHash; } -/** Ensure m_ledgers_by_hash doesn't have the wrong hash for a particular index +/** Ensure ledgerCache_ doesn't have the wrong hash for a particular index */ bool LedgerHistory::fixIndex(LedgerIndex ledgerIndex, LedgerHash const& ledgerHash) { - std::unique_lock sl(m_ledgers_by_hash.peekMutex()); - auto it = mLedgersByIndex.find(ledgerIndex); + std::lock_guard sl(mutex_); - if ((it != mLedgersByIndex.end()) && (it->second != ledgerHash)) + if (auto it = mLedgersByIndex.find(ledgerIndex); + (it != mLedgersByIndex.end()) && (it->second != ledgerHash)) { it->second = ledgerHash; return false; } + return true; } void LedgerHistory::clearLedgerCachePrior(LedgerIndex seq) { - for (LedgerHash it : m_ledgers_by_hash.getKeys()) + ledgerCache_.erase_if( + [seq](Ledger const& ledger) { return ledger.info().seq < seq; }); +} + +Json::Value +LedgerHistory::info() const +{ + Json::Value ret(Json::objectValue); + + ret["lc"] = ledgerCache_.info(); + ret["cv"] = consensusValidated_.info(); + { - auto const ledger = getLedgerByHash(it); - if (!ledger || ledger->info().seq < seq) - m_ledgers_by_hash.del(it, false); + std::lock_guard sl(mutex_); + ret["lbi"] = std::to_string(mLedgersByIndex.size()); } + + return ret; } } // namespace ripple diff --git a/src/ripple/app/ledger/LedgerHistory.h b/src/ripple/app/ledger/LedgerHistory.h index 5733ca763..702a63be2 100644 --- a/src/ripple/app/ledger/LedgerHistory.h +++ b/src/ripple/app/ledger/LedgerHistory.h @@ -24,6 +24,7 @@ #include #include #include +#include #include #include @@ -46,15 +47,6 @@ public: bool insert(std::shared_ptr const& ledger, bool validated); - /** Get the ledgers_by_hash cache hit rate - @return the hit rate - */ - float - getCacheHitRate() - { - return m_ledgers_by_hash.getHitRate(); - } - /** Get a ledger given its sequence number */ std::shared_ptr getLedgerBySeq(LedgerIndex ledgerIndex); @@ -75,8 +67,8 @@ public: void sweep() { - m_ledgers_by_hash.sweep(); - m_consensus_validated.sweep(); + ledgerCache_.sweep(); + consensusValidated_.sweep(); } /** Report that we have locally built a particular ledger */ @@ -103,6 +95,9 @@ public: void clearLedgerCachePrior(LedgerIndex seq); + Json::Value + info() const; + private: /** Log details in the case where we build one ledger but validate a different one. @@ -126,9 +121,7 @@ private: beast::insight::Collector::ptr collector_; beast::insight::Counter mismatch_counter_; - using LedgersByHash = TaggedCache; - - LedgersByHash m_ledgers_by_hash; + TaggedCache ledgerCache_; // Maps ledger indexes to the corresponding hashes // For debug and logging purposes @@ -145,8 +138,11 @@ private: // Consensus metadata of built ledger std::optional consensus; }; - using ConsensusValidated = TaggedCache; - ConsensusValidated m_consensus_validated; + + TaggedCache consensusValidated_; + + // Protects mLedgersByIndex + std::mutex mutable mutex_; // Maps ledger indexes to the corresponding hash. std::map mLedgersByIndex; // validated ledgers diff --git a/src/ripple/app/ledger/LedgerMaster.h b/src/ripple/app/ledger/LedgerMaster.h index 1b1dc15dd..e656f175d 100644 --- a/src/ripple/app/ledger/LedgerMaster.h +++ b/src/ripple/app/ledger/LedgerMaster.h @@ -33,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -221,8 +222,6 @@ public: void sweep(); - float - getCacheHitRate(); void checkAccept(std::shared_ptr const& ledger); @@ -296,6 +295,9 @@ public: std::optional txnIDfromIndex(uint32_t ledgerSeq, uint32_t txnIndex); + Json::Value + info() const; + private: void setValidLedger(std::shared_ptr const& l); @@ -368,7 +370,7 @@ private: // A set of transactions to replay during the next close std::unique_ptr replayData; - std::recursive_mutex mCompleteLock; + std::recursive_mutex mutable mCompleteLock; RangeSet mCompleteLedgers; // Publish thread is running. diff --git a/src/ripple/app/ledger/TransactionMaster.h b/src/ripple/app/ledger/TransactionMaster.h index 51441475e..d7e9390c3 100644 --- a/src/ripple/app/ledger/TransactionMaster.h +++ b/src/ripple/app/ledger/TransactionMaster.h @@ -76,14 +76,14 @@ public: bool inLedger(uint256 const& hash, std::uint32_t ledger); - void - canonicalize(std::shared_ptr* pTransaction); + [[nodiscard]] std::shared_ptr + canonicalize(std::shared_ptr tx); void - sweep(void); + sweep(); - TaggedCache& - getCache(); + Json::Value + info() const; private: Application& mApp; diff --git a/src/ripple/app/ledger/impl/LedgerMaster.cpp b/src/ripple/app/ledger/impl/LedgerMaster.cpp index 009c79669..20e8c9260 100644 --- a/src/ripple/app/ledger/impl/LedgerMaster.cpp +++ b/src/ripple/app/ledger/impl/LedgerMaster.cpp @@ -1881,12 +1881,6 @@ LedgerMaster::sweep() fetch_packs_.sweep(); } -float -LedgerMaster::getCacheHitRate() -{ - return mLedgerHistory.getCacheHitRate(); -} - void LedgerMaster::clearPriorLedgers(LedgerIndex seq) { @@ -2143,20 +2137,22 @@ LedgerMaster::doAdvance(std::unique_lock& sl) void LedgerMaster::addFetchPack(uint256 const& hash, std::shared_ptr data) { - fetch_packs_.canonicalize_replace_client(hash, data); + fetch_packs_.retrieve_or_insert(hash, data); } std::optional LedgerMaster::getFetchPack(uint256 const& hash) { - Blob data; - if (fetch_packs_.retrieve(hash, data)) + std::optional ret; + + // We retrieve the data from the cache _and_ remove it. + if (auto bp = fetch_packs_.fetch(hash, true); + bp && hash == sha512Half(makeSlice(*bp))) { - fetch_packs_.del(hash, false); - if (hash == sha512Half(makeSlice(data))) - return data; + ret.emplace(std::move(*bp)); } - return std::nullopt; + + return ret; } void @@ -2366,6 +2362,34 @@ LedgerMaster::getFetchPackCacheSize() const return fetch_packs_.getCacheSize(); } +Json::Value +LedgerMaster::info() const +{ + Json::Value ret(Json::objectValue); + + ret["fetch_packs"] = fetch_packs_.info(); + + { + std::lock_guard sl(mCompleteLock); + ret["complete_ledgers"] = to_string(mCompleteLedgers); + } + + { + std::lock_guard sl(m_mutex); + ret["validated"] = Json::objectValue; + ret["validated"]["hash"] = to_string(mLastValidLedger.first); + ret["validated"]["index"] = mLastValidLedger.second; + } + + ret["history"] = mLedgerHistory.info(); + + ret["fetch_depth"] = fetch_depth_; + ret["ledger_history"] = ledger_history_; + ret["ledger_fetch_size"] = ledger_fetch_size_; + + return ret; +} + // Returns the minimum ledger sequence in SQL database, if any. std::optional LedgerMaster::minSqlSeq() diff --git a/src/ripple/app/ledger/impl/TransactionMaster.cpp b/src/ripple/app/ledger/impl/TransactionMaster.cpp index c3c4fa5cc..c5cff8b34 100644 --- a/src/ripple/app/ledger/impl/TransactionMaster.cpp +++ b/src/ripple/app/ledger/impl/TransactionMaster.cpp @@ -73,7 +73,7 @@ TransactionMaster::fetch(uint256 const& txnID, error_code_i& ec) auto [txn, txnMeta] = std::get(v); if (txn) - mCache.canonicalize_replace_client(txnID, txn); + mCache.retrieve_or_insert(txnID, txn); return std::pair{std::move(txn), std::move(txnMeta)}; } @@ -100,7 +100,7 @@ TransactionMaster::fetch( auto [txn, txnMeta] = std::get(v); if (txn) - mCache.canonicalize_replace_client(txnID, txn); + mCache.retrieve_or_insert(txnID, txn); return std::pair{std::move(txn), std::move(txnMeta)}; } @@ -139,29 +139,26 @@ TransactionMaster::fetch( return txn; } -void -TransactionMaster::canonicalize(std::shared_ptr* pTransaction) +std::shared_ptr +TransactionMaster::canonicalize(std::shared_ptr tx) { - uint256 const tid = (*pTransaction)->getID(); - if (tid != beast::zero) - { - auto txn = *pTransaction; - // VFALCO NOTE canonicalize can change the value of txn! - mCache.canonicalize_replace_client(tid, txn); - *pTransaction = txn; - } + // Note that retrieve_or_insert can change the value of tx + if (uint256 const tid = tx->getID(); tid != beast::zero) + mCache.retrieve_or_insert(tid, tx); + + return tx; } void -TransactionMaster::sweep(void) +TransactionMaster::sweep() { mCache.sweep(); } -TaggedCache& -TransactionMaster::getCache() +Json::Value +TransactionMaster::info() const { - return mCache; + return mCache.info(); } } // namespace ripple diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index cadf3b149..cadeb1147 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -54,17 +54,13 @@ class ShardArchiveHandler; // VFALCO TODO Fix forward declares required for header dependency loops class AmendmentTable; -template < - class Key, - class T, - bool IsKeyCache, - class Hash, - class KeyEqual, - class Mutex> +template class TaggedCache; + class STLedgerEntry; using SLE = STLedgerEntry; -using CachedSLEs = TaggedCache; + +class CachedSLEs; class CollectorManager; class Family; @@ -105,7 +101,7 @@ class SHAMapStore; class ReportingETL; -using NodeCache = TaggedCache; +using NodeCache = TaggedCache; template class Validations; diff --git a/src/ripple/app/misc/NetworkOPs.cpp b/src/ripple/app/misc/NetworkOPs.cpp index dbfd4d31f..d184e1c8f 100644 --- a/src/ripple/app/misc/NetworkOPs.cpp +++ b/src/ripple/app/misc/NetworkOPs.cpp @@ -1183,9 +1183,7 @@ NetworkOPsImp::submitTransaction(std::shared_ptr const& iTrans) return; } - std::string reason; - - auto tx = std::make_shared(trans, reason, app_); + auto tx = std::make_shared(trans); m_job_queue.addJob(jtTRANSACTION, "submitTxn", [this, tx]() { auto t = tx; @@ -1250,8 +1248,7 @@ NetworkOPsImp::processTransaction( return; } - // canonicalize can change our pointer - app_.getMasterTransaction().canonicalize(&transaction); + transaction = app_.getMasterTransaction().canonicalize(transaction); if (bLocal) doTransactionSync(transaction, bUnlimited, failType); @@ -1454,9 +1451,8 @@ NetworkOPsImp::apply(std::unique_lock& batchLock) auto const txNext = m_ledgerMaster.popAcctTransaction(txCur); if (txNext) { - std::string reason; auto const trans = sterilize(*txNext); - auto t = std::make_shared(trans, reason, app_); + auto t = std::make_shared(trans); submit_held.emplace_back(t, false, false, FailHard::no); t->setApplying(); } @@ -2984,7 +2980,7 @@ NetworkOPsImp::pubLedger(std::shared_ptr const& lpAccepted) if (!alpAccepted) { alpAccepted = std::make_shared(lpAccepted, app_); - app_.getAcceptedLedgerCache().canonicalize_replace_client( + app_.getAcceptedLedgerCache().retrieve_or_insert( lpAccepted->info().hash, alpAccepted); } diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index d5cb07792..733aafe7e 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -290,8 +290,7 @@ SHAMapStoreImp::run() LedgerIndex lastRotated = state_db_.getState().lastRotated; netOPs_ = &app_.getOPs(); ledgerMaster_ = &app_.getLedgerMaster(); - fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache(0)); - treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache(0)); + fullBelowCache_ = app_.getNodeFamily().getFullBelowCache(0); if (advisoryDelete_) canDelete_ = state_db_.getCanDelete(); @@ -388,13 +387,6 @@ SHAMapStoreImp::run() JLOG(journal_.debug()) << "copied ledger " << validatedSeq << " nodecount " << nodeCount; - JLOG(journal_.debug()) << "freshening caches"; - freshenCaches(); - if (healthWait() == stopping) - return; - // Only log if we completed without a "health" abort - JLOG(journal_.debug()) << validatedSeq << " freshened caches"; - JLOG(journal_.trace()) << "Making a new backend"; auto newBackend = makeBackendRotating(); JLOG(journal_.debug()) @@ -604,15 +596,6 @@ SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq) fullBelowCache_->clear(); } -void -SHAMapStoreImp::freshenCaches() -{ - if (freshenCache(*treeNodeCache_)) - return; - if (freshenCache(app_.getMasterTransaction().getCache())) - return; -} - void SHAMapStoreImp::clearPrior(LedgerIndex lastRotated) { diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 995ee0267..fea72103c 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -114,8 +114,8 @@ private: // as of run() or before NetworkOPs* netOPs_ = nullptr; LedgerMaster* ledgerMaster_ = nullptr; - FullBelowCache* fullBelowCache_ = nullptr; - TreeNodeCache* treeNodeCache_ = nullptr; + + std::shared_ptr fullBelowCache_; static constexpr auto nodeStoreName_ = "NodeStore"; @@ -188,23 +188,6 @@ private: std::unique_ptr makeBackendRotating(std::string path = std::string()); - template - bool - freshenCache(CacheInstance& cache) - { - std::uint64_t check = 0; - - for (auto const& key : cache.getKeys()) - { - dbRotating_->fetchNodeObject( - key, 0, NodeStore::FetchType::synchronous, true); - if (!(++check % checkHealthInterval_) && healthWait() == stopping) - return true; - } - - return false; - } - /** delete from sqlite table in batches to not lock the db excessively. * Pause briefly to extend access time to other users. * Call with mutex object unlocked. @@ -218,8 +201,6 @@ private: void clearCaches(LedgerIndex validatedSeq); void - freshenCaches(); - void clearPrior(LedgerIndex lastRotated); /** diff --git a/src/ripple/app/misc/Transaction.h b/src/ripple/app/misc/Transaction.h index 07802becf..adce50c92 100644 --- a/src/ripple/app/misc/Transaction.h +++ b/src/ripple/app/misc/Transaction.h @@ -21,7 +21,6 @@ #define RIPPLE_APP_MISC_TRANSACTION_H_INCLUDED #include -#include #include #include #include @@ -42,7 +41,7 @@ class Application; class Database; class Rules; -enum TransStatus { +enum TransStatus : std::uint8_t { NEW = 0, // just received / generated INVALID = 1, // no valid signature, insufficient funds INCLUDED = 2, // added to the current ledger @@ -56,33 +55,28 @@ enum TransStatus { enum class TxSearched { all, some, unknown }; +// The boost::optional parameter is because SOCI requires +// boost::optional (not std::optional) parameters. +TransStatus +sqlTransactionStatus(boost::optional const& status); + // This class is for constructing and examining transactions. // Transactions are static so manipulation functions are unnecessary. class Transaction : public std::enable_shared_from_this, public CountedObject { + /** Flags used for the state of a transaction. */ + constexpr static std::uint8_t const applied_ = 1; + constexpr static std::uint8_t const broadcast_ = 2; + constexpr static std::uint8_t const queued_ = 4; + constexpr static std::uint8_t const kept_ = 8; + constexpr static std::uint8_t const applying_ = 16; + public: - using pointer = std::shared_ptr; - using ref = const pointer&; - - Transaction( - std::shared_ptr const&, - std::string&, - Application&) noexcept; - - // The two boost::optional parameters are because SOCI requires - // boost::optional (not std::optional) parameters. - static Transaction::pointer - transactionFromSQL( - boost::optional const& ledgerSeq, - boost::optional const& status, - Blob const& rawTxn, - Application& app); - - // The boost::optional parameter is because SOCI requires - // boost::optional (not std::optional) parameters. - static TransStatus - sqlTransactionStatus(boost::optional const& status); + Transaction(std::shared_ptr const& stx) noexcept + : mTransaction(stx) + { + } std::shared_ptr const& getSTransaction() @@ -90,10 +84,10 @@ public: return mTransaction; } - uint256 const& + uint256 getID() const { - return mTransactionID; + return mTransaction->getTransactionID(); } LedgerIndex @@ -111,7 +105,7 @@ public: TransStatus getStatus() const { - return mStatus; + return status_; } TER @@ -127,18 +121,16 @@ public: } void - setStatus(TransStatus status, std::uint32_t ledgerSeq); + setStatus(TransStatus status, std::uint32_t ledgerSeq) + { + status_ = status; + mInLedger = ledgerSeq; + } void setStatus(TransStatus status) { - mStatus = status; - } - - void - setLedger(LedgerIndex ledger) - { - mInLedger = ledger; + status_ = status; } /** @@ -147,7 +139,7 @@ public: void setApplying() { - mApplying = true; + state_ |= applying_; } /** @@ -158,7 +150,7 @@ public: bool getApplying() { - return mApplying; + return (state_ & applying_) == applying_; } /** @@ -167,37 +159,33 @@ public: void clearApplying() { - mApplying = false; + state_ &= ~applying_; } - struct SubmitResult + class SubmitResult { - /** - * @brief clear Clear all states - */ - void - clear() + friend class Transaction; + + SubmitResult(std::uint8_t state) + : applied(state & applied_) + , broadcast(state & broadcast_) + , queued(state & queued_) + , kept(state & kept_) { - applied = false; - broadcast = false; - queued = false; - kept = false; } - /** - * @brief any Get true of any state is true - * @return True if any state if true - */ + public: + bool const applied; + bool const broadcast; + bool const queued; + bool const kept; + + /** Returns true if any state if true */ bool any() const { return applied || broadcast || queued || kept; } - - bool applied = false; - bool broadcast = false; - bool queued = false; - bool kept = false; }; /** @@ -207,7 +195,7 @@ public: SubmitResult getSubmitResult() const { - return submitResult_; + return {state_.load()}; } /** @@ -216,43 +204,35 @@ public: void clearSubmitResult() { - submitResult_.clear(); + state_ &= ~(applied_ | broadcast_ | queued_ | kept_); } - /** - * @brief setApplied Set this flag once was applied to open ledger - */ + /** Note that the transaction was applied to open ledger */ void setApplied() { - submitResult_.applied = true; + state_ |= applied_; } - /** - * @brief setQueued Set this flag once was put into heldtxns queue - */ + /** Note that the trnasaction was put into heldtxns queue */ void setQueued() { - submitResult_.queued = true; + state_ |= queued_; } - /** - * @brief setBroadcast Set this flag once was broadcasted via network - */ + /** Note that the transaction was broadcast via network */ void setBroadcast() { - submitResult_.broadcast = true; + state_ |= broadcast_; } - /** - * @brief setKept Set this flag once was put to localtxns queue - */ + /** Note that the transaction was put to localtxns queue */ void setKept() { - submitResult_.kept = true; + state_ |= kept_; } struct CurrentLedgerState @@ -306,7 +286,7 @@ public: } Json::Value - getJson(JsonOptions options, bool binary = false) const; + getJson(Application& app, JsonOptions options, bool binary = false) const; // Information used to locate a transaction. // Contains a nodestore hash and ledger sequence pair if the transaction was @@ -384,21 +364,20 @@ private: std::optional> const& range, error_code_i& ec); - uint256 mTransactionID; - + /** The ledger in which this transaction appears, or 0. */ LedgerIndex mInLedger = 0; - TransStatus mStatus = INVALID; + + /** The result of applying this transaction. */ TER mResult = temUNCERTAIN; - bool mApplying = false; - /** different ways for transaction to be accepted */ - SubmitResult submitResult_; + /** The current state of the transaction. */ + std::atomic state_ = 0; - std::optional currentLedgerState_; + /** The status of the transaction. */ + TransStatus status_ = NEW; std::shared_ptr mTransaction; - Application& mApp; - beast::Journal j_; + std::optional currentLedgerState_; }; } // namespace ripple diff --git a/src/ripple/app/misc/impl/AccountTxPaging.cpp b/src/ripple/app/misc/impl/AccountTxPaging.cpp index 433463e28..e57ed746d 100644 --- a/src/ripple/app/misc/impl/AccountTxPaging.cpp +++ b/src/ripple/app/misc/impl/AccountTxPaging.cpp @@ -40,17 +40,14 @@ convertBlobsToTxResult( { SerialIter it(makeSlice(rawTxn)); auto txn = std::make_shared(it); - std::string reason; - auto tr = std::make_shared(txn, reason, app); + auto tr = std::make_shared(txn); - tr->setStatus(Transaction::sqlTransactionStatus(status)); - tr->setLedger(ledger_index); + tr->setStatus(sqlTransactionStatus(status), ledger_index); - auto metaset = - std::make_shared(tr->getID(), tr->getLedger(), rawMeta); - - to.emplace_back(std::move(tr), metaset); + to.emplace_back( + std::move(tr), + std::make_shared(tr->getID(), tr->getLedger(), rawMeta)); }; void diff --git a/src/ripple/app/misc/impl/Transaction.cpp b/src/ripple/app/misc/impl/Transaction.cpp index 9adef982d..ea385a791 100644 --- a/src/ripple/app/misc/impl/Transaction.cpp +++ b/src/ripple/app/misc/impl/Transaction.cpp @@ -35,38 +35,12 @@ namespace ripple { -Transaction::Transaction( - std::shared_ptr const& stx, - std::string& reason, - Application& app) noexcept - : mTransaction(stx), mApp(app), j_(app.journal("Ledger")) -{ - try - { - mTransactionID = mTransaction->getTransactionID(); - } - catch (std::exception& e) - { - reason = e.what(); - return; - } - - mStatus = NEW; -} - // // Misc. // -void -Transaction::setStatus(TransStatus ts, std::uint32_t lseq) -{ - mStatus = ts; - mInLedger = lseq; -} - TransStatus -Transaction::sqlTransactionStatus(boost::optional const& status) +sqlTransactionStatus(boost::optional const& status) { char const c = (status) ? (*status)[0] : safe_cast(txnSqlUnknown); @@ -88,26 +62,6 @@ Transaction::sqlTransactionStatus(boost::optional const& status) return INVALID; } -Transaction::pointer -Transaction::transactionFromSQL( - boost::optional const& ledgerSeq, - boost::optional const& status, - Blob const& rawTxn, - Application& app) -{ - std::uint32_t const inLedger = - rangeCheckedCast(ledgerSeq.value_or(0)); - - SerialIter it(makeSlice(rawTxn)); - auto txn = std::make_shared(it); - std::string reason; - auto tr = std::make_shared(txn, reason, app); - - tr->setStatus(sqlTransactionStatus(status)); - tr->setLedger(inLedger); - return tr; -} - std::variant< std::pair, std::shared_ptr>, TxSearched> @@ -165,7 +119,7 @@ Transaction::load( // options 1 to include the date of the transaction Json::Value -Transaction::getJson(JsonOptions options, bool binary) const +Transaction::getJson(Application& app, JsonOptions options, bool binary) const { Json::Value ret(mTransaction->getJson(JsonOptions::none, binary)); @@ -176,7 +130,7 @@ Transaction::getJson(JsonOptions options, bool binary) const if (options == JsonOptions::include_date) { - auto ct = mApp.getLedgerMaster().getCloseTimeBySeq(mInLedger); + auto ct = app.getLedgerMaster().getCloseTimeBySeq(mInLedger); if (ct) ret[jss::date] = ct->time_since_epoch().count(); } diff --git a/src/ripple/app/rdb/backend/detail/impl/Node.cpp b/src/ripple/app/rdb/backend/detail/impl/Node.cpp index b3b354ebe..ecfbd2fb5 100644 --- a/src/ripple/app/rdb/backend/detail/impl/Node.cpp +++ b/src/ripple/app/rdb/backend/detail/impl/Node.cpp @@ -231,7 +231,7 @@ saveValidatedLedger( if (!aLedger) { aLedger = std::make_shared(ledger, app); - app.getAcceptedLedgerCache().canonicalize_replace_client( + app.getAcceptedLedgerCache().retrieve_or_insert( ledger->info().hash, aLedger); } } @@ -617,6 +617,36 @@ getHashesByIndex( return res; } +/** Construct a transaction object from the database. + + @param ledgerSeq if set, specifies the ledger in which this transaction was + was included. + @param status if set, specifies the status that the server believes this + transaction achieved. + @param rawTxn The transaction, in serialized format. + @param app The main application object. + + @note The two optional parameters use boost::optional because that's the + interface that SOCI expects. + */ +static std::shared_ptr +transactionFromSQL( + boost::optional const& ledgerSeq, + boost::optional const& status, + Blob const& rawTxn, + Application& app) +{ + std::uint32_t const inLedger = + rangeCheckedCast(ledgerSeq.value_or(0)); + + SerialIter it(makeSlice(rawTxn)); + auto txn = std::make_shared(it); + + auto tr = std::make_shared(txn); + tr->setStatus(sqlTransactionStatus(status), inLedger); + return tr; +} + std::pair>, int> getTxHistory( soci::session& session, @@ -656,8 +686,7 @@ getTxHistory( else rawTxn.clear(); - if (auto trans = Transaction::transactionFromSQL( - ledgerSeq, status, rawTxn, app)) + if (auto trans = transactionFromSQL(ledgerSeq, status, rawTxn, app)) { total++; txs.push_back(trans); @@ -859,8 +888,7 @@ getAccountTxs( else txnMeta.clear(); - auto txn = - Transaction::transactionFromSQL(ledgerSeq, status, rawTxn, app); + auto txn = transactionFromSQL(ledgerSeq, status, rawTxn, app); if (txnMeta.empty()) { // Work around a bug that could leave the metadata missing @@ -1342,8 +1370,7 @@ getTransaction( try { - auto txn = - Transaction::transactionFromSQL(ledgerSeq, status, rawTxn, app); + auto txn = transactionFromSQL(ledgerSeq, status, rawTxn, app); if (!ledgerSeq) return std::pair{std::move(txn), nullptr}; diff --git a/src/ripple/app/rdb/backend/detail/impl/Shard.cpp b/src/ripple/app/rdb/backend/detail/impl/Shard.cpp index f7a0ce457..21d31eae5 100644 --- a/src/ripple/app/rdb/backend/detail/impl/Shard.cpp +++ b/src/ripple/app/rdb/backend/detail/impl/Shard.cpp @@ -78,7 +78,7 @@ saveLedgerMeta( if (!aLedger) { aLedger = std::make_shared(ledger, app); - app.getAcceptedLedgerCache().canonicalize_replace_client( + app.getAcceptedLedgerCache().retrieve_or_insert( ledger->info().hash, aLedger); } diff --git a/src/ripple/app/rdb/backend/impl/PostgresDatabase.cpp b/src/ripple/app/rdb/backend/impl/PostgresDatabase.cpp index 5ee4ce551..774229ad3 100644 --- a/src/ripple/app/rdb/backend/impl/PostgresDatabase.cpp +++ b/src/ripple/app/rdb/backend/impl/PostgresDatabase.cpp @@ -376,10 +376,8 @@ flatFetchTransactions( else { auto& transactions = std::get(ret); - std::string reason; - auto txnRet = std::make_shared(txn, reason, app); - txnRet->setLedger(ledgerSequences[i]); - txnRet->setStatus(COMMITTED); + auto txnRet = std::make_shared(txn); + txnRet->setStatus(COMMITTED, ledgerSequences[i]); auto txMeta = std::make_shared( txnRet->getID(), ledgerSequences[i], *meta); transactions.push_back(std::make_pair(txnRet, txMeta)); @@ -829,10 +827,8 @@ PostgresDatabaseImp::getTxHistory(LedgerIndex startIndex) auto const& [sttx, meta] = txns[i]; assert(sttx); - std::string reason; - auto txn = std::make_shared(sttx, reason, app_); - txn->setLedger(ledgerSequences[i]); - txn->setStatus(COMMITTED); + auto txn = std::make_shared(sttx); + txn->setStatus(COMMITTED, ledgerSequences[i]); ret.push_back(txn); } diff --git a/src/ripple/basics/KeyCache.h b/src/ripple/basics/KeyCache.h deleted file mode 100644 index d8fa4910a..000000000 --- a/src/ripple/basics/KeyCache.h +++ /dev/null @@ -1,32 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2021 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#ifndef RIPPLE_BASICS_KEYCACHE_H -#define RIPPLE_BASICS_KEYCACHE_H - -#include -#include - -namespace ripple { - -using KeyCache = TaggedCache; - -} // namespace ripple - -#endif // RIPPLE_BASICS_KEYCACHE_H diff --git a/src/ripple/basics/TaggedCache.h b/src/ripple/basics/TaggedCache.h index 6765ff16b..ee6405f58 100644 --- a/src/ripple/basics/TaggedCache.h +++ b/src/ripple/basics/TaggedCache.h @@ -22,276 +22,311 @@ #include #include +#include #include #include +#include #include +#include +#include +#include +#include #include #include #include #include #include +#include #include namespace ripple { -/** Map/cache combination. - This class implements a cache and a map. The cache keeps objects alive - in the map. The map allows multiple code paths that reference objects - with the same tag to get the same actual object. +/** Cache that can be used - So long as data is in the cache, it will stay in memory. - If it stays in memory even after it is ejected from the cache, - the map will track it. + Once items are added to the cache, they are kept alive (i.e. the cache keeps + a shared_ptr to the item) until the items time out. At that point the cache + relinquishes the shared_ptr, but continues to keep track of the items using + a weak_ptr. + + The cache keeps objects alive in + the map. The map allows multiple code paths that reference objects with the + same key to get the same actual object. + + So long as data is in the cache, it will stay in memory. If it stays in + memory even after it is ejected from the cache, the map will track it. + + @tparam Key the type used as the key for the cache + @tparam Value the mapped type; may not be `void`. + @tparam N the number of partitions to segment the keyspace in. + @tparam Hash the hash function to use for this cache. + @tparam KeyEqual the equality operator for keys. + + @note The number of partitions N defaults to 64. Evidence suggests that + larger values do not have a measurable impact on performance, but + smaller values may make sense in some cases. @note Callers must not modify data objects that are stored in the cache unless they hold their own lock over all cache operations. */ template < class Key, - class T, - bool IsKeyCache = false, + class Value, + std::size_t N = 64, class Hash = hardened_hash<>, - class KeyEqual = std::equal_to, - class Mutex = std::recursive_mutex> + class KeyEqual = std::equal_to> class TaggedCache { + static_assert( + !std::is_same_v, + "You may not use void as the value for TaggedCache"); + + static_assert( + N != 0 && N <= 256, + "The number of partitions for TaggedCache must be between 1 and 256"); + public: - using mutex_type = Mutex; using key_type = Key; - using mapped_type = T; using clock_type = beast::abstract_clock; +private: + struct Stats + { + template + Stats( + std::string const& prefix, + Handler const& handler, + beast::insight::Collector::ptr const& collector) + : hook(collector->make_hook(handler)) + , size(collector->make_gauge(prefix, "size")) + , hit_rate(collector->make_gauge(prefix, "hit_rate")) + { + } + + beast::insight::Hook hook; + beast::insight::Gauge size; + beast::insight::Gauge hit_rate; + }; + + struct Entry + { + clock_type::time_point last_access; + std::variant, std::weak_ptr> ptr; + + Entry( + clock_type::time_point const& last_access_, + std::shared_ptr const& ptr_) + : last_access(last_access_), ptr(ptr_) + { + } + + void + touch(clock_type::time_point const& now) + { + last_access = now; + } + + /** Returns true if the entry holds a strong pointer. */ + bool + pinned() const + { + return std::holds_alternative>(ptr); + } + + /** Returns true if the data behind this entry is still in memory. */ + bool + valid() const + { + if (std::holds_alternative>(ptr)) + return !std::get>(ptr).expired(); + + return static_cast(std::get>(ptr)); + } + + bool + unique() const + { + if (std::holds_alternative>(ptr)) + return std::get>(ptr).use_count() == 1; + + return false; + } + + /** Returns a (possibly unseated) strong pointer to the data + + @note The item pinned state doesn't change; that is if the item + was previously unpinned (i.e. we had a weak pointer to it) + isn't "pinned" (i.e. + */ + std::shared_ptr + data() + { + if (std::holds_alternative>(ptr)) + return std::get>(ptr).lock(); + + return std::get>(ptr); + } + + /** Returns a strong pointer to the cached data. + + @return A pair of a (possibly unseated) pointer to the data, along + with a boolean that indicates whether the data was unpinned + prior to this operation. + * */ + std::pair, bool> + pin() + { + bool const weak = std::holds_alternative>(ptr); + + if (weak) + { + auto sp = std::get>(ptr).lock(); + + if (!sp) + return {nullptr, weak}; + + ptr = sp; + } + + return {std::get>(ptr), weak}; + } + }; + + Hash hash_; + + // Used for logging + std::string name_; + + beast::WrappedSink sink_; + beast::Journal journal_; + clock_type& clock_; + Stats m_stats; + + // Desired number of cache entries (0 = ignore) + std::size_t const targetSize_; + + // Desired maximum cache age + std::chrono::seconds const targetAge_; + + /** Map partitions: + + The idea is to partition the key space into multiple, independent + maps, each with their own lock. This helps to increase concurrency + since it's possible to operate on multiple partitions at a time. + */ + struct Partition + { + std::mutex mutable mutex; + std::size_t const index; + hardened_hash_map items; + + Partition(std::size_t i) : index(i) + { + } + }; + + /** The partitions that, together, map the entire key space. */ + std::array partitions_; + + /** Number of items where we have either a strong or weak pointer to. */ + std::atomic totalSize_ = 0; + + /** Number of items where that we have a strong pointer to. */ + std::atomic strongSize_ = 0; + + /** The number of times that we found an item in the cache */ + std::atomic hits_ = 0; + std::atomic misses_ = 0; + + template + Partition& + getPartition(K const& key) noexcept + { + if constexpr (std::is_integral_v) + return partitions_[key % partitions_.size()]; + else if constexpr (std::is_same_v< + K, + base_uint>) + return partitions_[*key.data() % partitions_.size()]; + else + return partitions_[hash_(key) % partitions_.size()]; + } + +private: + template + TaggedCache( + std::string name, + std::size_t size, + std::chrono::seconds expiration, + clock_type& clock, + beast::Journal journal, + beast::insight::Collector::ptr const& collector, + std::index_sequence) + : name_(std::move(name)) + , sink_(journal, "[" + name_ + "] ") + , journal_(sink_) + , clock_(clock) + , m_stats( + name_, + [this]() { + m_stats.size.set(getCacheSize()); + m_stats.hit_rate.set([this]() { + auto const h = hits_.load(); + auto const m = misses_.load(); + + auto ret = h + m; + + if (ret != 0) + ret = (h * 100) / ret; + + return ret; + }()); + }, + collector) + , targetSize_(size) + , targetAge_(expiration) + , partitions_{(Is)...} + { + } + public: TaggedCache( - std::string const& name, - int size, - clock_type::duration expiration, + std::string name, + std::size_t size, + std::chrono::seconds expiration, clock_type& clock, beast::Journal journal, beast::insight::Collector::ptr const& collector = beast::insight::NullCollector::New()) - : m_journal(journal) - , m_clock(clock) - , m_stats( - name, - std::bind(&TaggedCache::collect_metrics, this), - collector) - , m_name(name) - , m_target_size(size) - , m_target_age(expiration) - , m_cache_count(0) - , m_hits(0) - , m_misses(0) + : TaggedCache( + std::move(name), + size, + expiration, + clock, + journal, + collector, + std::make_index_sequence{}) { } -public: - /** Return the clock associated with the cache. */ - clock_type& - clock() - { - return m_clock; - } - - /** Returns the number of items in the container. */ + /** Returns the number of cached items that we hold strong pointers to. */ std::size_t - size() const - { - std::lock_guard lock(m_mutex); - return m_cache.size(); - } - - void - setTargetSize(int s) - { - std::lock_guard lock(m_mutex); - m_target_size = s; - - if (s > 0) - { - for (auto& partition : m_cache.map()) - { - partition.rehash(static_cast( - (s + (s >> 2)) / - (partition.max_load_factor() * m_cache.partitions()) + - 1)); - } - } - - JLOG(m_journal.debug()) << m_name << " target size set to " << s; - } - - clock_type::duration - getTargetAge() const - { - std::lock_guard lock(m_mutex); - return m_target_age; - } - - void - setTargetAge(clock_type::duration s) - { - std::lock_guard lock(m_mutex); - m_target_age = s; - JLOG(m_journal.debug()) - << m_name << " target age set to " << m_target_age.count(); - } - - int getCacheSize() const { - std::lock_guard lock(m_mutex); - return m_cache_count; + return strongSize_.load(); } - int + /** Returns the total number cached items. */ + std::size_t getTrackSize() const { - std::lock_guard lock(m_mutex); - return m_cache.size(); + return totalSize_.load(); } - float - getHitRate() + /** Returns the name of the TaggedCache instance. */ + std::string const& + name() const { - std::lock_guard lock(m_mutex); - auto const total = static_cast(m_hits + m_misses); - return m_hits * (100.0f / std::max(1.0f, total)); - } - - void - clear() - { - std::lock_guard lock(m_mutex); - m_cache.clear(); - m_cache_count = 0; - } - - void - reset() - { - std::lock_guard lock(m_mutex); - m_cache.clear(); - m_cache_count = 0; - m_hits = 0; - m_misses = 0; - } - - /** Refresh the last access time on a key if present. - @return `true` If the key was found. - */ - template - bool - touch_if_exists(KeyComparable const& key) - { - std::lock_guard lock(m_mutex); - auto const iter(m_cache.find(key)); - if (iter == m_cache.end()) - { - ++m_stats.misses; - return false; - } - iter->second.touch(m_clock.now()); - ++m_stats.hits; - return true; - } - - using SweptPointersVector = std::pair< - std::vector>, - std::vector>>; - - void - sweep() - { - // Keep references to all the stuff we sweep - // For performance, each worker thread should exit before the swept data - // is destroyed but still within the main cache lock. - std::vector allStuffToSweep(m_cache.partitions()); - - clock_type::time_point const now(m_clock.now()); - clock_type::time_point when_expire; - - auto const start = std::chrono::steady_clock::now(); - { - std::lock_guard lock(m_mutex); - - if (m_target_size == 0 || - (static_cast(m_cache.size()) <= m_target_size)) - { - when_expire = now - m_target_age; - } - else - { - when_expire = - now - m_target_age * m_target_size / m_cache.size(); - - clock_type::duration const minimumAge(std::chrono::seconds(1)); - if (when_expire > (now - minimumAge)) - when_expire = now - minimumAge; - - JLOG(m_journal.trace()) - << m_name << " is growing fast " << m_cache.size() << " of " - << m_target_size << " aging at " - << (now - when_expire).count() << " of " - << m_target_age.count(); - } - - std::vector workers; - workers.reserve(m_cache.partitions()); - std::atomic allRemovals = 0; - - for (std::size_t p = 0; p < m_cache.partitions(); ++p) - { - workers.push_back(sweepHelper( - when_expire, - now, - m_cache.map()[p], - allStuffToSweep[p], - allRemovals, - lock)); - } - for (std::thread& worker : workers) - worker.join(); - - m_cache_count -= allRemovals; - } - // At this point allStuffToSweep will go out of scope outside the lock - // and decrement the reference count on each strong pointer. - JLOG(m_journal.debug()) - << m_name << " TaggedCache sweep lock duration " - << std::chrono::duration_cast( - std::chrono::steady_clock::now() - start) - .count() - << "ms"; - } - - bool - del(const key_type& key, bool valid) - { - // Remove from cache, if !valid, remove from map too. Returns true if - // removed from cache - std::lock_guard lock(m_mutex); - - auto cit = m_cache.find(key); - - if (cit == m_cache.end()) - return false; - - Entry& entry = cit->second; - - bool ret = false; - - if (entry.isCached()) - { - --m_cache_count; - entry.ptr.reset(); - ret = true; - } - - if (!valid || entry.isExpired()) - m_cache.erase(cit); - - return ret; + return name_; } /** Replace aliased objects with originals. @@ -307,490 +342,378 @@ public: @return `true` If the key already existed. */ -public: bool canonicalize( - const key_type& key, - std::shared_ptr& data, - std::function const&)>&& replace) + Key const& key, + std::shared_ptr& data, + std::function const&)>&& replace) { // Return canonical value, store if needed, refresh in cache // Return values: true=we had the data already - std::lock_guard lock(m_mutex); + auto& p = getPartition(key); - auto cit = m_cache.find(key); + std::lock_guard lock(p.mutex); - if (cit == m_cache.end()) + auto cit = p.items.find(key); + + if (cit == p.items.end()) { - m_cache.emplace( + p.items.emplace( std::piecewise_construct, std::forward_as_tuple(key), - std::forward_as_tuple(m_clock.now(), data)); - ++m_cache_count; + std::forward_as_tuple(clock_.now(), data)); + strongSize_.fetch_add(1, std::memory_order_relaxed); + totalSize_.fetch_add(1, std::memory_order_relaxed); return false; } - Entry& entry = cit->second; - entry.touch(m_clock.now()); + auto& entry = cit->second; - if (entry.isCached()) + entry.touch(clock_.now()); + + // If the entry has valid data, check whether we want to replace it. + if (auto curr = entry.pin(); curr.first) { - if (replace(entry.ptr)) - { + if (replace(curr.first)) entry.ptr = data; - entry.weak_ptr = data; - } else - { - data = entry.ptr; - } + data = curr.first; + + // If the entry was freshy pinned, track it. This works even if + // we are replacing the entry: if the original was pinned, we + // already counted it. + if (curr.second) + strongSize_.fetch_add(1, std::memory_order_relaxed); return true; } - auto cachedData = entry.lock(); - - if (cachedData) - { - if (replace(entry.ptr)) - { - entry.ptr = data; - entry.weak_ptr = data; - } - else - { - entry.ptr = cachedData; - data = cachedData; - } - - ++m_cache_count; - return true; - } - + // The entry had an expired weak pointer; take the new data. entry.ptr = data; - entry.weak_ptr = data; - ++m_cache_count; + strongSize_.fetch_add(1, std::memory_order_relaxed); return false; } bool - canonicalize_replace_cache( - const key_type& key, - std::shared_ptr const& data) + insert_or_assign(Key const& key, std::shared_ptr const& data) { return canonicalize( key, - const_cast&>(data), - [](std::shared_ptr const&) { return true; }); + const_cast&>(data), + [](std::shared_ptr const&) { return true; }); } bool - canonicalize_replace_client(const key_type& key, std::shared_ptr& data) + retrieve_or_insert(Key const& key, std::shared_ptr& data) { return canonicalize( - key, data, [](std::shared_ptr const&) { return false; }); + key, data, [](std::shared_ptr const&) { return false; }); } - std::shared_ptr - fetch(const key_type& key) + [[nodiscard]] std::shared_ptr + fetch(Key const& key, bool remove = false) { - std::lock_guard l(m_mutex); - auto ret = initialFetch(key, l); - if (!ret) - ++m_misses; - return ret; + auto& p = getPartition(key); + + std::lock_guard l(p.mutex); + + if (auto cit = p.items.find(key); cit != p.items.end()) + { + // Get a strong pointer to the object, if possible. + auto ret = cit->second.pin(); + + if (ret.first) + { + if (!remove) + { + if (ret.second) + strongSize_.fetch_add(1, std::memory_order_relaxed); + + cit->second.touch(clock_.now()); + } + + ++hits_; + } + + if (!ret.first || remove) + { + // Track state count if the pointer was already pinned. + if (cit->second.pinned()) + strongSize_.fetch_sub(1, std::memory_order_relaxed); + + totalSize_.fetch_sub(1, std::memory_order_relaxed); + ; + p.items.erase(cit); + } + + return ret.first; + } + + misses_++; + return {}; } /** Insert the element into the container. If the key already exists, nothing happens. @return `true` If the element was inserted */ - template - auto - insert(key_type const& key, T const& value) - -> std::enable_if_t - { - auto p = std::make_shared(std::cref(value)); - return canonicalize_replace_client(key, p); - } - - template - auto - insert(key_type const& key) -> std::enable_if_t - { - std::lock_guard lock(m_mutex); - clock_type::time_point const now(m_clock.now()); - auto [it, inserted] = m_cache.emplace( - std::piecewise_construct, - std::forward_as_tuple(key), - std::forward_as_tuple(now)); - if (!inserted) - it->second.last_access = now; - return inserted; - } - - // VFALCO NOTE It looks like this returns a copy of the data in - // the output parameter 'data'. This could be expensive. - // Perhaps it should work like standard containers, which - // simply return an iterator. - // bool - retrieve(const key_type& key, T& data) + insert(Key const& key, Value const& value) { - // retrieve the value of the stored data - auto entry = fetch(key); - - if (!entry) - return false; - - data = *entry; - return true; + auto p = std::make_shared(std::cref(value)); + return retrieve_or_insert(key, p); } - mutex_type& - peekMutex() - { - return m_mutex; - } + /** Erases all elements that match the predicate. - std::vector - getKeys() const - { - std::vector v; + @param pred the predicate that decides whether to delete an item. + @note The predicate function can inspect and modify the state of the + internal object (e.g. by pinning or unpinning it). + * */ + /** @{ */ + template + void + erase_if_impl(Pred pred, std::string op = {}) + { + auto eraser = + [op, this, now = clock_.now(), &pred](Partition& partition) { + // Used to delete items outside the lock, if possible. + std::vector> destroy; + destroy.reserve(1000); + + std::size_t decTotal = 0; + std::size_t decStrong = 0; + std::size_t incStrong = 0; + + std::lock_guard lock(partition.mutex); + + auto it = partition.items.begin(); + + while (it != partition.items.end()) + { + // Opportunistically eliminate any items that + // reference data that is no longer in memory. + if (!it->second.valid()) + { + ++decTotal; + it = partition.items.erase(it); + continue; + } + + // The predicate could pin or unpin an entry and that's + // something we need to track. + bool const wasPinned = it->second.pinned(); + + if (pred(it)) + { + if (it->second.pinned()) + { + destroy.emplace_back(it->second.data()); + + if (destroy.size() == 1000) + { + decltype(destroy) background_destroy; + background_destroy.reserve(1000); + + std::swap(destroy, background_destroy); + + std::thread t( + [](decltype(background_destroy) d) { + // just invoke the destructor of d + }, + std::move(background_destroy)); + t.detach(); + } + + ++decStrong; + } + + if (wasPinned && !it->second.pinned()) + ++decStrong; + else if (!wasPinned && it->second.pinned()) + ++incStrong; + + ++decTotal; + + it = partition.items.erase(it); + continue; + } + + if (wasPinned && !it->second.pinned()) + ++decStrong; + else if (!wasPinned && it->second.pinned()) + ++incStrong; + + // This item is fine. + ++it; + } + + if (decTotal) + totalSize_.fetch_sub(decTotal, std::memory_order_relaxed); + + if (decStrong) + strongSize_.fetch_sub(decStrong, std::memory_order_relaxed); + + if (incStrong) + strongSize_.fetch_add(incStrong, std::memory_order_relaxed); + }; + + auto const start = clock_.now(); + + // For systems with a small number of cores always use the single + // threaded algorithm. + if (N <= 4 || std::thread::hardware_concurrency() <= 4) { - std::lock_guard lock(m_mutex); - v.reserve(m_cache.size()); - for (auto const& _ : m_cache) - v.push_back(_.first); + for (auto& p : partitions_) + eraser(p); } + else + { + // We want to limit the number of threads to avoid resource + // starvation. + std::array threads{}; + + for (std::size_t i = 0; i != threads.size(); ++i) + { + threads[i] = std::thread( + [this, &eraser, step = threads.size()](std::size_t j) { + while (j < partitions_.size()) + { + eraser(partitions_[j]); + j += step; + } + }, + i); + } + + for (auto& t : threads) + t.join(); + } + + if (auto const d = clock_.now() - start; d >= std::chrono::seconds(2)) + { + if (!op.empty()) + op += ": "; + + auto const secs = + std::chrono::duration_cast(d); + auto const usecs = + std::chrono::duration_cast(d) - + std::chrono::duration_cast(secs); + + JLOG(journal_.info()) + << op << "Iteration over " << totalSize_.load() + << " items took " << secs.count() << "." << usecs.count() + << " seconds."; + } + } + + template < + class Pred, + class V = Value, + class = std::enable_if_t>> + void + erase_if(Pred&& pred) + { + erase_if_impl([this, &pred](auto it) { + if (auto d = it->second.data()) + return pred(*d); + + return false; + }); + } + + template < + class Pred, + class = std::enable_if_t>> + void + erase_if(Pred&& pred, std::string op = {}) + { + erase_if_impl([&pred](auto& it) { return pred(it->second); }, op); + } + + template < + class Pred, + class = std::enable_if_t>> + void + erase_if(Pred pred, std::string op = {}) + { + erase_if_impl([&pred](auto it) { return pred(it->first); }, op); + } + /** @} */ + + void + sweep() + { + // Calculate the expiration time + auto const expire = [this]() { + using namespace std::chrono_literals; + + auto exp = targetAge_; + + // If the size of the cache exceeds the target size, then we adjust + // the expiry timeout down to prune data faster. + if (exp != std::chrono::seconds::zero() && targetSize_ != 0) + { + if (auto const size = totalSize_.load(); size > targetSize_) + exp = exp * targetSize_ / size; + } + + return std::max(1s, exp); + }(); + + erase_if( + [this, + expire, + oversized = (totalSize_.load() > targetSize_), + now = clock_.now()](Entry& e) { + // If the item wasn't pinned and the cache is oversized, + // unconditionally remove it. + if (!e.pinned() && oversized) + return true; + + if (targetAge_ != targetAge_.zero() && + e.last_access + expire <= now) + { + // If the cache holds the only strong pointer to this + // item, unconditionally remove it. + if (e.unique()) + return true; + + // Otherwise, relinquish our strong pointer. If the + // cache wasn't oversized when we started sweeping we + // will keep the item and (presumably) get rid of it + // the next time we sweep. + if (e.pinned()) + e.ptr = std::weak_ptr( + std::get>(e.ptr)); + + return oversized || !e.valid(); + } + + return false; + }, + "sweep"); + } + + Json::Value + info() const + { + Json::Value v{Json::objectValue}; + + v["name"] = name_; + v["partitions"] = static_cast(partitions_.size()); + v["total_size"] = std::to_string(totalSize_.load()); + v["cache_size"] = std::to_string(strongSize_.load()); + v["cache_hits"] = std::to_string(hits_.load()); + v["cache_misses"] = std::to_string(misses_.load()); + v["target_size"] = std::to_string(targetSize_); + v["target_age"] = std::to_string(targetAge_.count()); return v; } - - // CachedSLEs functions. - /** Returns the fraction of cache hits. */ - double - rate() const - { - std::lock_guard lock(m_mutex); - auto const tot = m_hits + m_misses; - if (tot == 0) - return 0; - return double(m_hits) / tot; - } - - /** Fetch an item from the cache. - If the digest was not found, Handler - will be called with this signature: - std::shared_ptr(void) - */ - template - std::shared_ptr - fetch(key_type const& digest, Handler const& h) - { - { - std::lock_guard l(m_mutex); - if (auto ret = initialFetch(digest, l)) - return ret; - } - - auto sle = h(); - if (!sle) - return {}; - - std::lock_guard l(m_mutex); - ++m_misses; - auto const [it, inserted] = - m_cache.emplace(digest, Entry(m_clock.now(), std::move(sle))); - if (!inserted) - it->second.touch(m_clock.now()); - return it->second.ptr; - } - // End CachedSLEs functions. - -private: - std::shared_ptr - initialFetch(key_type const& key, std::lock_guard const& l) - { - auto cit = m_cache.find(key); - if (cit == m_cache.end()) - return {}; - - Entry& entry = cit->second; - if (entry.isCached()) - { - ++m_hits; - entry.touch(m_clock.now()); - return entry.ptr; - } - entry.ptr = entry.lock(); - if (entry.isCached()) - { - // independent of cache size, so not counted as a hit - ++m_cache_count; - entry.touch(m_clock.now()); - return entry.ptr; - } - - m_cache.erase(cit); - return {}; - } - - void - collect_metrics() - { - m_stats.size.set(getCacheSize()); - - { - beast::insight::Gauge::value_type hit_rate(0); - { - std::lock_guard lock(m_mutex); - auto const total(m_hits + m_misses); - if (total != 0) - hit_rate = (m_hits * 100) / total; - } - m_stats.hit_rate.set(hit_rate); - } - } - -private: - struct Stats - { - template - Stats( - std::string const& prefix, - Handler const& handler, - beast::insight::Collector::ptr const& collector) - : hook(collector->make_hook(handler)) - , size(collector->make_gauge(prefix, "size")) - , hit_rate(collector->make_gauge(prefix, "hit_rate")) - , hits(0) - , misses(0) - { - } - - beast::insight::Hook hook; - beast::insight::Gauge size; - beast::insight::Gauge hit_rate; - - std::size_t hits; - std::size_t misses; - }; - - class KeyOnlyEntry - { - public: - clock_type::time_point last_access; - - explicit KeyOnlyEntry(clock_type::time_point const& last_access_) - : last_access(last_access_) - { - } - - void - touch(clock_type::time_point const& now) - { - last_access = now; - } - }; - - class ValueEntry - { - public: - std::shared_ptr ptr; - std::weak_ptr weak_ptr; - clock_type::time_point last_access; - - ValueEntry( - clock_type::time_point const& last_access_, - std::shared_ptr const& ptr_) - : ptr(ptr_), weak_ptr(ptr_), last_access(last_access_) - { - } - - bool - isWeak() const - { - return ptr == nullptr; - } - bool - isCached() const - { - return ptr != nullptr; - } - bool - isExpired() const - { - return weak_ptr.expired(); - } - std::shared_ptr - lock() - { - return weak_ptr.lock(); - } - void - touch(clock_type::time_point const& now) - { - last_access = now; - } - }; - - typedef - typename std::conditional::type - Entry; - - using KeyOnlyCacheType = - hardened_partitioned_hash_map; - - using KeyValueCacheType = - hardened_partitioned_hash_map; - - using cache_type = - hardened_partitioned_hash_map; - - [[nodiscard]] std::thread - sweepHelper( - clock_type::time_point const& when_expire, - [[maybe_unused]] clock_type::time_point const& now, - typename KeyValueCacheType::map_type& partition, - SweptPointersVector& stuffToSweep, - std::atomic& allRemovals, - std::lock_guard const&) - { - return std::thread([&, this]() { - int cacheRemovals = 0; - int mapRemovals = 0; - - // Keep references to all the stuff we sweep - // so that we can destroy them outside the lock. - stuffToSweep.first.reserve(partition.size()); - stuffToSweep.second.reserve(partition.size()); - { - auto cit = partition.begin(); - while (cit != partition.end()) - { - if (cit->second.isWeak()) - { - // weak - if (cit->second.isExpired()) - { - stuffToSweep.second.push_back( - std::move(cit->second.weak_ptr)); - ++mapRemovals; - cit = partition.erase(cit); - } - else - { - ++cit; - } - } - else if (cit->second.last_access <= when_expire) - { - // strong, expired - ++cacheRemovals; - if (cit->second.ptr.use_count() == 1) - { - stuffToSweep.first.push_back( - std::move(cit->second.ptr)); - ++mapRemovals; - cit = partition.erase(cit); - } - else - { - // remains weakly cached - cit->second.ptr.reset(); - ++cit; - } - } - else - { - // strong, not expired - ++cit; - } - } - } - - if (mapRemovals || cacheRemovals) - { - JLOG(m_journal.debug()) - << "TaggedCache partition sweep " << m_name - << ": cache = " << partition.size() << "-" << cacheRemovals - << ", map-=" << mapRemovals; - } - - allRemovals += cacheRemovals; - }); - } - - [[nodiscard]] std::thread - sweepHelper( - clock_type::time_point const& when_expire, - clock_type::time_point const& now, - typename KeyOnlyCacheType::map_type& partition, - SweptPointersVector&, - std::atomic& allRemovals, - std::lock_guard const&) - { - return std::thread([&, this]() { - int cacheRemovals = 0; - int mapRemovals = 0; - - // Keep references to all the stuff we sweep - // so that we can destroy them outside the lock. - { - auto cit = partition.begin(); - while (cit != partition.end()) - { - if (cit->second.last_access > now) - { - cit->second.last_access = now; - ++cit; - } - else if (cit->second.last_access <= when_expire) - { - cit = partition.erase(cit); - } - else - { - ++cit; - } - } - } - - if (mapRemovals || cacheRemovals) - { - JLOG(m_journal.debug()) - << "TaggedCache partition sweep " << m_name - << ": cache = " << partition.size() << "-" << cacheRemovals - << ", map-=" << mapRemovals; - } - - allRemovals += cacheRemovals; - }); - }; - - beast::Journal m_journal; - clock_type& m_clock; - Stats m_stats; - - mutex_type mutable m_mutex; - - // Used for logging - std::string m_name; - - // Desired number of cache entries (0 = ignore) - int m_target_size; - - // Desired maximum cache age - clock_type::duration m_target_age; - - // Number of items cached - int m_cache_count; - cache_type m_cache; // Hold strong reference to recent objects - std::uint64_t m_hits; - std::uint64_t m_misses; }; } // namespace ripple diff --git a/src/ripple/beast/utility/atomic_shared_ptr.h b/src/ripple/beast/utility/atomic_shared_ptr.h new file mode 100644 index 000000000..e5aa21f40 --- /dev/null +++ b/src/ripple/beast/utility/atomic_shared_ptr.h @@ -0,0 +1,214 @@ +//------------------------------------------------------------------------------ +/* + This file is part of rippled: https://github.com/ripple/rippled + Copyright (c) 2022 Ripple Labs Inc. + + Permission to use, copy, modify, and/or distribute this software for any + purpose with or without fee is hereby granted, provided that the above + copyright notice and this permission notice appear in all copies. + + THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. +*/ +//============================================================================== + +#ifndef BEAST_UTILITY_ATOMIC_SHARED_PTR_INCLUDED +#define BEAST_UTILITY_ATOMIC_SHARED_PTR_INCLUDED + +// Temporarily shim in support for atomic> + +#ifndef __cpp_lib_atomic_shared_ptr + +namespace std { + +template +struct atomic> +{ +private: + shared_ptr p_; + +public: + constexpr atomic() noexcept = default; + atomic(const atomic&) = delete; + void + operator=(const atomic&) = delete; + + constexpr atomic(nullptr_t) noexcept; + atomic(shared_ptr desired) noexcept; + + using value_type = shared_ptr; + + static constexpr bool is_always_lock_free = false; + bool + is_lock_free() const noexcept; + + shared_ptr + load(memory_order order = memory_order_seq_cst) const noexcept; + operator shared_ptr() const noexcept; + void + store( + shared_ptr desired, + memory_order order = memory_order_seq_cst) noexcept; + void + operator=(shared_ptr desired) noexcept; + + shared_ptr + exchange( + shared_ptr desired, + memory_order order = memory_order_seq_cst) noexcept; + bool + compare_exchange_weak( + shared_ptr& expected, + shared_ptr desired, + memory_order order = memory_order_seq_cst) noexcept; + bool + compare_exchange_weak( + shared_ptr& expected, + shared_ptr desired, + memory_order success, + memory_order failure) noexcept; + bool + compare_exchange_strong( + shared_ptr& expected, + shared_ptr desired, + memory_order order = memory_order_seq_cst) noexcept; + bool + compare_exchange_strong( + shared_ptr& expected, + shared_ptr desired, + memory_order success, + memory_order failure) noexcept; + + // Not supported: + // void wait(shared_ptr old, + // memory_order order = memory_order_seq_cst) const noexcept; + // void notify_one() noexcept; + // void notify_all() noexcept; + +private: + static constexpr memory_order + transform(memory_order order) noexcept; +}; + +template +inline constexpr atomic>::atomic(nullptr_t) noexcept : atomic() +{ +} + +template +inline atomic>::atomic(shared_ptr desired) noexcept + : p_{desired} +{ +} + +template +inline bool +atomic>::is_lock_free() const noexcept +{ + return atomic_is_lock_free(&p_); +} + +template +inline shared_ptr +atomic>::load(memory_order order) const noexcept +{ + return atomic_load_explicit(&p_, order); +} + +template +inline atomic>::operator shared_ptr() const noexcept +{ + return load(); +} + +template +inline void +atomic>::store(shared_ptr desired, memory_order order) noexcept +{ + atomic_store_explicit(&p_, std::move(desired), order); +} + +template +inline void +atomic>::operator=(shared_ptr desired) noexcept +{ + store(std::move(desired)); +} + +template +inline shared_ptr +atomic>::exchange( + shared_ptr desired, + memory_order order) noexcept +{ + return atomic_exchange_explicit(&p_, std::move(desired), order); +} + +template +inline constexpr memory_order +atomic>::transform(memory_order order) noexcept +{ + memory_order fail_order = order; + if (fail_order == memory_order_acq_rel) + order = memory_order_acquire; + else if (fail_order == memory_order_release) + order = memory_order_relaxed; + return order; +} + +template +inline bool +atomic>::compare_exchange_weak( + shared_ptr& expected, + shared_ptr desired, + memory_order order) noexcept +{ + return compare_exchange_weak( + expected, std::move(desired), order, transform(order)); +} + +template +inline bool +atomic>::compare_exchange_weak( + shared_ptr& expected, + shared_ptr desired, + memory_order success, + memory_order failure) noexcept +{ + return atomic_compare_exchange_weak_explicit( + &p_, &expected, std::move(desired), success, failure); +} + +template +inline bool +atomic>::compare_exchange_strong( + shared_ptr& expected, + shared_ptr desired, + memory_order order) noexcept +{ + return compare_exchange_weak( + expected, std::move(desired), order, transform(order)); +} + +template +inline bool +atomic>::compare_exchange_strong( + shared_ptr& expected, + shared_ptr desired, + memory_order success, + memory_order failure) noexcept +{ + return atomic_compare_exchange_strong_explicit( + &p_, &expected, std::move(desired), success, failure); +} + +} // namespace std + +#endif // __cpp_lib_atomic_shared_ptr + +#endif diff --git a/src/ripple/ledger/CachedSLEs.h b/src/ripple/ledger/CachedSLEs.h index d2b04e2cb..d9d1a38f6 100644 --- a/src/ripple/ledger/CachedSLEs.h +++ b/src/ripple/ledger/CachedSLEs.h @@ -25,7 +25,56 @@ #include namespace ripple { -using CachedSLEs = TaggedCache; -} + +class CachedSLEs : public TaggedCache +{ + std::atomic handlerHits_ = 0; + std::atomic handlerMisses_ = 0; + +public: + using TaggedCache::TaggedCache; + + /** Fetch from the cache; if needed, invoke the handler to load the item. */ + template + std::shared_ptr + fetch(uint256 const& digest, Handler const& handler) + { + if (auto ret = TaggedCache::fetch(digest)) + return ret; + + if (auto sle = handler(); sle) + { + if (retrieve_or_insert(digest, sle)) + handlerHits_++; + + return sle; + } + + handlerMisses_++; + return {}; + } + + // Reintroduce the function we just hid. + using TaggedCache::fetch; + + /** Returns the fraction of cache hits. */ + double + rate() const + { + // TODO + return 0; + } + + Json::Value + info() + { + auto ret = TaggedCache::info(); + ret["handler_hits"] = std::to_string(handlerHits_.load()); + ret["handler_misses"] = std::to_string(handlerMisses_.load()); + return ret; + } +}; + +} // namespace ripple #endif // RIPPLE_LEDGER_CACHEDSLES_H_INCLUDED diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp index 9ef878bf3..7651c83c1 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.cpp @@ -105,12 +105,12 @@ DatabaseNodeImp::fetchNodeObject( if (cache_) { if (nodeObject) - cache_->canonicalize_replace_client(hash, nodeObject); + cache_->retrieve_or_insert(hash, nodeObject); else { auto notFound = NodeObject::createObject(hotDUMMY, {}, hash); - cache_->canonicalize_replace_client(hash, notFound); + cache_->retrieve_or_insert(hash, notFound); if (notFound->getType() != hotDUMMY) nodeObject = notFound; } @@ -188,7 +188,7 @@ DatabaseNodeImp::fetchBatch(std::vector const& hashes) { // Ensure all threads get the same object if (cache_) - cache_->canonicalize_replace_client(hash, nObj); + cache_->retrieve_or_insert(hash, nObj); } else { @@ -198,7 +198,7 @@ DatabaseNodeImp::fetchBatch(std::vector const& hashes) if (cache_) { auto notFound = NodeObject::createObject(hotDUMMY, {}, hash); - cache_->canonicalize_replace_client(hash, notFound); + cache_->retrieve_or_insert(hash, notFound); if (notFound->getType() != hotDUMMY) nObj = std::move(notFound); } diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 0c3d56bcf..227dc00d3 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -174,8 +174,7 @@ Shard::tryClose() acquireInfo_.reset(); // Reset caches to reduce memory use - app_.getShardFamily()->getFullBelowCache(lastSeq_)->reset(); - app_.getShardFamily()->getTreeNodeCache(lastSeq_)->reset(); + app_.getShardFamily()->resetCacheFor(lastSeq_); return true; } @@ -647,12 +646,11 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) std::shared_ptr next; auto const lastLedgerHash{hash}; auto& shardFamily{*app_.getShardFamily()}; - auto const fullBelowCache{shardFamily.getFullBelowCache(lastSeq_)}; - auto const treeNodeCache{shardFamily.getTreeNodeCache(lastSeq_)}; - // Reset caches to reduce memory usage - fullBelowCache->reset(); - treeNodeCache->reset(); + shardFamily.resetCacheFor(lastSeq_); + + auto const fullBelowCache = shardFamily.getFullBelowCache(lastSeq_); + auto const treeNodeCache = shardFamily.getTreeNodeCache(lastSeq_); Serializer s; s.add32(version); @@ -720,8 +718,7 @@ Shard::finalize(bool writeSQLite, std::optional const& referenceHash) --ledgerSeq; - fullBelowCache->reset(); - treeNodeCache->reset(); + shardFamily.resetCacheFor(lastSeq_); } JLOG(j_.debug()) << "shard " << index_ << " is valid"; diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 210bdd54a..0b493e53e 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -23,7 +23,6 @@ #include #include #include -#include #include #include #include @@ -40,8 +39,6 @@ namespace ripple { namespace NodeStore { -using PCache = TaggedCache; -using NCache = KeyCache; class DatabaseShard; /* A range of historical ledgers backed by a node store. diff --git a/src/ripple/protocol/jss.h b/src/ripple/protocol/jss.h index 3c5e36d80..3f2772697 100644 --- a/src/ripple/protocol/jss.h +++ b/src/ripple/protocol/jss.h @@ -390,7 +390,6 @@ JSS(ledger_hash); // in: RPCHelpers, LedgerRequest, // out: NetworkOPs, RPCHelpers, // LedgerClosed, LedgerData, // AccountLines -JSS(ledger_hit_rate); // out: GetCounts JSS(ledger_index); // in/out: many JSS(ledger_index_max); // in, out: AccountTx* JSS(ledger_index_min); // in, out: AccountTx* diff --git a/src/ripple/rpc/handlers/AccountTx.cpp b/src/ripple/rpc/handlers/AccountTx.cpp index 67c80ad9b..5419e2031 100644 --- a/src/ripple/rpc/handlers/AccountTx.cpp +++ b/src/ripple/rpc/handlers/AccountTx.cpp @@ -299,7 +299,8 @@ populateJsonResponse( { Json::Value& jvObj = jvTxns.append(Json::objectValue); - jvObj[jss::tx] = txn->getJson(JsonOptions::include_date); + jvObj[jss::tx] = + txn->getJson(context.app, JsonOptions::include_date); if (txnMeta) { jvObj[jss::meta] = diff --git a/src/ripple/rpc/handlers/GetCounts.cpp b/src/ripple/rpc/handlers/GetCounts.cpp index cf3e72902..24d2b1a99 100644 --- a/src/ripple/rpc/handlers/GetCounts.cpp +++ b/src/ripple/rpc/handlers/GetCounts.cpp @@ -103,21 +103,22 @@ getCountsJson(Application& app, int minObjectCount) } } + ret["ledger_master"] = app.getLedgerMaster().info(); + + auto& caches = (ret["caches"] = Json::arrayValue); + + { + caches.append(app.getNodeFamily().getFullBelowCache(0)->info()); + caches.append(app.getNodeFamily().getTreeNodeCache(0)->info()); + caches.append(app.getTempNodeCache().info()); + caches.append(app.getAcceptedLedgerCache().info()); + caches.append(app.cachedSLEs().info()); + } + ret[jss::write_load] = app.getNodeStore().getWriteLoad(); ret[jss::historical_perminute] = static_cast(app.getInboundLedgers().fetchRate()); - ret[jss::SLE_hit_rate] = app.cachedSLEs().rate(); - ret[jss::ledger_hit_rate] = app.getLedgerMaster().getCacheHitRate(); - ret[jss::AL_size] = Json::UInt(app.getAcceptedLedgerCache().size()); - ret[jss::AL_hit_rate] = app.getAcceptedLedgerCache().getHitRate(); - - ret[jss::fullbelow_size] = - static_cast(app.getNodeFamily().getFullBelowCache(0)->size()); - ret[jss::treenode_cache_size] = - app.getNodeFamily().getTreeNodeCache(0)->getCacheSize(); - ret[jss::treenode_track_size] = - app.getNodeFamily().getTreeNodeCache(0)->getTrackSize(); std::string uptime; auto s = UptimeClock::now(); @@ -132,12 +133,22 @@ getCountsJson(Application& app, int minObjectCount) if (auto shardStore = app.getShardStore()) { auto shardFamily{dynamic_cast(app.getShardFamily())}; - auto const [cacheSz, trackSz] = shardFamily->getTreeNodeCacheSize(); + Json::Value& jv = (ret[jss::shards] = Json::objectValue); - jv[jss::fullbelow_size] = shardFamily->getFullBelowCacheSize(); - jv[jss::treenode_cache_size] = cacheSz; - jv[jss::treenode_track_size] = trackSz; + auto& caches = (ret["caches"] = Json::arrayValue); + + { + shardFamily->forEachFullBelowCache( + [&caches](FullBelowCache const* fbc) { + caches.append(fbc->info()); + }); + shardFamily->forEachTreeNodeCache( + [&caches](TreeNodeCache const* tnc) { + caches.append(tnc->info()); + }); + } + ret[jss::write_load] = shardStore->getWriteLoad(); jv[jss::node_writes] = std::to_string(shardStore->getStoreCount()); jv[jss::node_reads_total] = shardStore->getFetchTotalCount(); diff --git a/src/ripple/rpc/handlers/Submit.cpp b/src/ripple/rpc/handlers/Submit.cpp index 2b5c8bba9..fa63703ad 100644 --- a/src/ripple/rpc/handlers/Submit.cpp +++ b/src/ripple/rpc/handlers/Submit.cpp @@ -116,12 +116,11 @@ doSubmit(RPC::JsonContext& context) } } - std::string reason; - auto tpTrans = std::make_shared(stpTrans, reason, context.app); + auto tpTrans = std::make_shared(stpTrans); if (tpTrans->getStatus() != NEW) { jvResult[jss::error] = "invalidTransaction"; - jvResult[jss::error_exception] = "fails local checks: " + reason; + jvResult[jss::error_exception] = "fails local checks"; return jvResult; } @@ -143,7 +142,8 @@ doSubmit(RPC::JsonContext& context) try { - jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none); + jvResult[jss::tx_json] = + tpTrans->getJson(context.app, JsonOptions::none); jvResult[jss::tx_blob] = strHex(tpTrans->getSTransaction()->getSerializer().peekData()); diff --git a/src/ripple/rpc/handlers/Tx.cpp b/src/ripple/rpc/handlers/Tx.cpp index e9c4439f9..c2de48843 100644 --- a/src/ripple/rpc/handlers/Tx.cpp +++ b/src/ripple/rpc/handlers/Tx.cpp @@ -49,7 +49,7 @@ isValidated(LedgerMaster& ledgerMaster, std::uint32_t seq, uint256 const& hash) struct TxResult { - Transaction::pointer txn; + std::shared_ptr txn; std::variant, Blob> meta; bool validated = false; std::optional ctid; @@ -121,10 +121,8 @@ doTxPostgres(RPC::Context& context, TxArgs const& args) assert(false); return {res, {rpcINTERNAL, "Error deserializing SHAMap node"}}; } - std::string reason; - res.txn = std::make_shared(sttx, reason, context.app); - res.txn->setLedger(locator.getLedgerSequence()); - res.txn->setStatus(COMMITTED); + res.txn = std::make_shared(sttx); + res.txn->setStatus(COMMITTED, locator.getLedgerSequence()); if (args.binary) { SerialIter it(item->slice()); @@ -305,7 +303,8 @@ populateJsonResponse( // no errors else if (result.txn) { - response = result.txn->getJson(JsonOptions::include_date, args.binary); + response = result.txn->getJson( + context.app, JsonOptions::include_date, args.binary); // populate binary metadata if (auto blob = std::get_if(&result.meta)) diff --git a/src/ripple/rpc/handlers/TxHistory.cpp b/src/ripple/rpc/handlers/TxHistory.cpp index 4c76bfac0..4432688b8 100644 --- a/src/ripple/rpc/handlers/TxHistory.cpp +++ b/src/ripple/rpc/handlers/TxHistory.cpp @@ -63,7 +63,7 @@ doTxHistory(RPC::JsonContext& context) obj["used_postgres"] = true; for (auto const& t : trans) - txs.append(t->getJson(JsonOptions::none)); + txs.append(t->getJson(context.app, JsonOptions::none)); return obj; } diff --git a/src/ripple/rpc/impl/TransactionSign.cpp b/src/ripple/rpc/impl/TransactionSign.cpp index c903c26f8..4e8f46f73 100644 --- a/src/ripple/rpc/impl/TransactionSign.cpp +++ b/src/ripple/rpc/impl/TransactionSign.cpp @@ -550,23 +550,22 @@ transactionPreProcessImpl( return transactionPreProcessResult{std::move(stpTrans)}; } -static std::pair +static std::pair> transactionConstructImpl( std::shared_ptr const& stpTrans, Rules const& rules, Application& app) { - std::pair ret; + std::pair> ret; // Turn the passed in STTx into a Transaction. - Transaction::pointer tpTrans; + std::shared_ptr tpTrans; { - std::string reason; - tpTrans = std::make_shared(stpTrans, reason, app); + tpTrans = std::make_shared(stpTrans); if (tpTrans->getStatus() != NEW) { - ret.first = RPC::make_error( - rpcINTERNAL, "Unable to construct transaction: " + reason); + ret.first = + RPC::make_error(rpcINTERNAL, "Unable to construct transaction"); return ret; } } @@ -597,9 +596,7 @@ transactionConstructImpl( return ret; } - std::string reason; - auto tpTransNew = - std::make_shared(sttxNew, reason, app); + auto tpTransNew = std::make_shared(sttxNew); if (tpTransNew) { @@ -629,12 +626,14 @@ transactionConstructImpl( } static Json::Value -transactionFormatResultImpl(Transaction::pointer tpTrans) +transactionFormatResultImpl( + std::shared_ptr tpTrans, + Application& app) { Json::Value jvResult; try { - jvResult[jss::tx_json] = tpTrans->getJson(JsonOptions::none); + jvResult[jss::tx_json] = tpTrans->getJson(app, JsonOptions::none); jvResult[jss::tx_blob] = strHex(tpTrans->getSTransaction()->getSerializer().peekData()); @@ -785,13 +784,13 @@ transactionSign( else ledger = app.openLedger().current(); // Make sure the STTx makes a legitimate Transaction. - std::pair txn = + std::pair> txn = transactionConstructImpl(preprocResult.second, ledger->rules(), app); if (!txn.second) return txn.first; - return transactionFormatResultImpl(txn.second); + return transactionFormatResultImpl(txn.second, app); } /** Returns a Json::objectValue. */ @@ -819,7 +818,7 @@ transactionSubmit( return preprocResult.first; // Make sure the STTx makes a legitimate Transaction. - std::pair txn = + std::pair> txn = transactionConstructImpl(preprocResult.second, ledger->rules(), app); if (!txn.second) @@ -837,7 +836,7 @@ transactionSubmit( rpcINTERNAL, "Exception occurred during transaction submission."); } - return transactionFormatResultImpl(txn.second); + return transactionFormatResultImpl(txn.second, app); } namespace detail { @@ -1021,13 +1020,13 @@ transactionSignFor( } // Make sure the STTx makes a legitimate Transaction. - std::pair txn = + std::pair> txn = transactionConstructImpl(sttx, ledger->rules(), app); if (!txn.second) return txn.first; - return transactionFormatResultImpl(txn.second); + return transactionFormatResultImpl(txn.second, app); } /** Returns a Json::objectValue. */ @@ -1201,7 +1200,7 @@ transactionSubmitMultiSigned( return err; // Make sure the SerializedTransaction makes a legitimate Transaction. - std::pair txn = + std::pair> txn = transactionConstructImpl(stpTrans, ledger->rules(), app); if (!txn.second) @@ -1219,7 +1218,7 @@ transactionSubmitMultiSigned( rpcINTERNAL, "Exception occurred during transaction submission."); } - return transactionFormatResultImpl(txn.second); + return transactionFormatResultImpl(txn.second, app); } } // namespace RPC diff --git a/src/ripple/shamap/Family.h b/src/ripple/shamap/Family.h index fea5545d3..47c26d3e6 100644 --- a/src/ripple/shamap/Family.h +++ b/src/ripple/shamap/Family.h @@ -53,6 +53,9 @@ public: virtual beast::Journal const& journal() = 0; + virtual void + resetCacheFor(std::uint32_t ledgerSeq) = 0; + /** Return a pointer to the Family Full Below Cache @param ledgerSeq ledger sequence determines a corresponding shard cache @@ -92,9 +95,6 @@ public: */ virtual void missingNodeAcquireByHash(uint256 const& refHash, std::uint32_t refNum) = 0; - - virtual void - reset() = 0; }; } // namespace ripple diff --git a/src/ripple/shamap/FullBelowCache.h b/src/ripple/shamap/FullBelowCache.h index 6d809d3b9..5f75d248f 100644 --- a/src/ripple/shamap/FullBelowCache.h +++ b/src/ripple/shamap/FullBelowCache.h @@ -20,31 +20,350 @@ #ifndef RIPPLE_SHAMAP_FULLBELOWCACHE_H_INCLUDED #define RIPPLE_SHAMAP_FULLBELOWCACHE_H_INCLUDED -#include -#include +#include +#include #include +#include +#include #include +#include #include +#include +#include +#include +#include #include +#include +#include #include +#include +#include +#include +#include namespace ripple { namespace detail { +template < + class Key, + class Hash = hardened_hash<>, + class KeyEqual = std::equal_to> +class FullBelowCacheImpl +{ +public: + using key_type = Key; + using clock_type = beast::abstract_clock; + +private: + struct Stats + { + template + Stats( + std::string const& prefix, + Handler const& handler, + beast::insight::Collector::ptr const& collector) + : hook(collector->make_hook(handler)) + , size(collector->make_gauge(prefix, "size")) + , hit_rate(collector->make_gauge(prefix, "hit_rate")) + { + } + + beast::insight::Hook hook; + beast::insight::Gauge size; + beast::insight::Gauge hit_rate; + }; + + Hash hash_; + + // Used for logging + std::string name_; + + beast::WrappedSink sink_; + beast::Journal journal_; + clock_type& clock_; + Stats m_stats; + + // Desired number of cache entries (0 = ignore) + std::size_t const targetSize_; + + // Desired maximum cache age + std::chrono::seconds const targetAge_; + + /** Map partitions: + + The idea is to partition the key space into multiple, independent + maps, each with their own lock. This helps to increase concurrency + since it's possible to operate on multiple partitions at a time. + */ + struct Partition + { + std::mutex mutable mutex; + std::size_t const index; + hardened_hash_map items; + + Partition(std::size_t i) : index(i) + { + } + }; + + /** The partitions that, together, map the entire key space. */ + std::array partitions_; + + /** Number of items where we have either a strong or weak pointer to. */ + std::atomic size_ = 0; + + /** The number of times that we found an item in the cache */ + std::atomic hits_ = 0; + std::atomic misses_ = 0; + + Partition& + getPartition(Key const& key) noexcept + { + return partitions_[hash_(key) % partitions_.size()]; + } + +private: + template + FullBelowCacheImpl( + std::string name, + std::size_t targetSize, + std::chrono::seconds expiration, + clock_type& clock, + beast::Journal journal, + beast::insight::Collector::ptr const& collector, + std::index_sequence) + : name_(std::move(name)) + , sink_(journal, "[" + name_ + "] ") + , journal_(sink_) + , clock_(clock) + , m_stats( + name_, + [this]() { + m_stats.size.set(size()); + m_stats.hit_rate.set([this]() { + auto const h = hits_.load(); + auto const m = misses_.load(); + + auto ret = h + m; + + if (ret != 0) + ret = (h * 100) / ret; + + return ret; + }()); + }, + collector) + , targetSize_(targetSize) + , targetAge_(expiration) + , partitions_{(Is)...} + { + } + +public: + FullBelowCacheImpl( + std::string name, + std::size_t targetSize, + std::chrono::seconds expiration, + clock_type& clock, + beast::Journal journal, + beast::insight::Collector::ptr const& collector = + beast::insight::NullCollector::New()) + : FullBelowCacheImpl( + std::move(name), + targetSize, + expiration, + clock, + journal, + collector, + std::make_index_sequence<64>{}) + { + } + + /** Returns the total number cached items. */ + std::size_t + size() const + { + return size_.load(); + } + + /** Returns the name of the FullBelowCacheImpl instance. */ + std::string const& + name() const + { + return name_; + } + + /** Refresh the last access time on a key if present. + @return `true` If the key was found. + */ + bool + touch_if_exists(Key const& key) + { + auto& p = getPartition(key); + + { + std::lock_guard lock(p.mutex); + + if (auto const it = p.items.find(key); it != p.items.end()) + { + it->second = clock_.now(); + ++hits_; + return true; + } + } + + ++misses_; + return false; + } + + bool + insert(Key const& key) + { + auto& p = getPartition(key); + + std::lock_guard l(p.mutex); + + auto const now = clock_.now(); + + auto [it, inserted] = p.items.emplace( + std::piecewise_construct, + std::forward_as_tuple(key), + std::forward_as_tuple(now)); + + if (inserted) + { + size_.fetch_add(1, std::memory_order_relaxed); + return true; + } + + it->second = now; + return false; + } + + /** Erases all elements that match the predicate. */ + void + sweep() + { + // Calculate the expiration time + auto const expire = [this]() { + using namespace std::chrono_literals; + + auto exp = targetAge_; + + // If the size of the cache exceeds the target size, then we adjust + // the expiry timeout down to prune data faster. + if (exp != std::chrono::seconds::zero() && targetSize_ != 0) + { + if (auto const size = size_.load(); size > targetSize_) + exp = exp * targetSize_ / size; + } + + return std::max(1s, exp); + }(); + + auto eraser = [this, expire]( + Partition& partition, + clock_type::time_point const& now) { + std::size_t decTotal = 0; + + std::lock_guard lock(partition.mutex); + + auto it = partition.items.begin(); + + while (it != partition.items.end()) + { + if (it->second + expire <= now) + { + ++decTotal; + it = partition.items.erase(it); + continue; + } + + // This item is fine. + ++it; + } + + if (decTotal) + size_.fetch_sub(decTotal, std::memory_order_relaxed); + }; + + auto const start = clock_.now(); + + // For systems with a small number of cores always use the single + // threaded algorithm. + if (std::thread::hardware_concurrency() <= 4) + { + for (auto& p : partitions_) + eraser(p, start); + } + else + { + // We want to limit the number of threads to avoid resource + // starvation. + std::array threads{}; + + for (std::size_t i = 0; i != threads.size(); ++i) + { + threads[i] = std::thread( + [this, start, &eraser, step = threads.size()]( + std::size_t j) { + while (j < partitions_.size()) + { + eraser(partitions_[j], start); + j += step; + } + }, + i); + } + + for (auto& t : threads) + t.join(); + } + + if (auto const d = clock_.now() - start; d >= std::chrono::seconds(2)) + { + auto const secs = + std::chrono::duration_cast(d); + auto const usecs = + std::chrono::duration_cast(d) - + std::chrono::duration_cast(secs); + + JLOG(journal_.info()) + << "sweep: Iteration over " << size_.load() << " items took " + << secs.count() << "." << usecs.count() << " seconds."; + } + } + + Json::Value + info() const + { + Json::Value v{Json::objectValue}; + + v["name"] = name_; + v["partitions"] = static_cast(partitions_.size()); + v["total_size"] = std::to_string(size_.load()); + v["cache_hits"] = std::to_string(hits_.load()); + v["cache_misses"] = std::to_string(misses_.load()); + v["target_size"] = std::to_string(targetSize_); + v["target_age"] = std::to_string(targetAge_.count()); + + return v; + } +}; + +} // namespace detail + /** Remembers which tree keys have all descendants resident. This optimizes the process of acquiring a complete tree. */ -class BasicFullBelowCache +class FullBelowCache { -private: - using CacheType = KeyCache; + using KeyCache = detail::FullBelowCacheImpl; public: - enum { defaultCacheTargetSize = 0 }; - - using key_type = uint256; - using clock_type = typename CacheType::clock_type; + using key_type = KeyCache::key_type; + using clock_type = KeyCache::clock_type; /** Construct the cache. @@ -53,23 +372,22 @@ public: @param targetSize The cache target size. @param targetExpirationSeconds The expiration time for items. */ - BasicFullBelowCache( + FullBelowCache( std::string const& name, clock_type& clock, beast::Journal j, + std::size_t targetSize, + std::chrono::seconds expiration = std::chrono::minutes{2}, beast::insight::Collector::ptr const& collector = - beast::insight::NullCollector::New(), - std::size_t target_size = defaultCacheTargetSize, - std::chrono::seconds expiration = std::chrono::minutes{2}) - : m_cache(name, target_size, expiration, clock, j, collector), m_gen(1) + beast::insight::NullCollector::New()) + : name_(name) + , clock_(clock) + , journal_(j) + , collector_(collector) + , targetSize_(targetSize) + , expiration_(expiration) { - } - - /** Return the clock associated with the cache. */ - clock_type& - clock() - { - return m_cache.clock(); + clear(); } /** Return the number of elements in the cache. @@ -79,29 +397,39 @@ public: std::size_t size() const { - return m_cache.size(); + if (auto c = cache_.load()) + return c->size(); + + return 0; } /** Remove expired cache items. + Thread safety: Safe to call from any thread. */ void sweep() { - m_cache.sweep(); + if (auto c = cache_.load()) + c->sweep(); } /** Refresh the last access time of an item, if it exists. + Thread safety: Safe to call from any thread. + @param key The key to refresh. @return `true` If the key exists. */ bool touch_if_exists(key_type const& key) { - return m_cache.touch_if_exists(key); + if (auto c = cache_.load()) + return c->touch_if_exists(key); + + return false; } /** Insert a key into the cache. @@ -114,39 +442,75 @@ public: void insert(key_type const& key) { - m_cache.insert(key); + if (auto c = cache_.load()) + c->insert(key); } - /** generation determines whether cached entry is valid */ + /** Returns the generation that can determine if a cached entry is valid. */ std::uint32_t - getGeneration(void) const + generation() const { - return m_gen; + return generation_; } + /** Clears the cache. + + This effectively replaces the cache with an entirely new instance, and + destroys the old instance. + + It is safe to call this method from any thread. + */ void clear() { - m_cache.clear(); - ++m_gen; + auto c = cache_.load(); + + ++generation_; + + cache_.store(std::make_shared( + "FullBelow: " + name_, + targetSize_, + expiration_, + clock_, + journal_, + collector_)); + + if (c) + { + std::thread t( + [](std::shared_ptr cache) { + // just invoke the destructor of cache + }, + std::move(c)); + t.detach(); + } } - void - reset() + Json::Value + info() const { - m_cache.clear(); - m_gen = 1; + if (auto c = cache_.load()) + { + auto ret = c->info(); + ret["generation"] = std::to_string(generation_); + return ret; + } + + return {Json::objectValue}; } private: - CacheType m_cache; - std::atomic m_gen; + std::string const name_; + clock_type& clock_; + beast::Journal journal_; + beast::insight::Collector::ptr collector_; + std::size_t const targetSize_; + std::chrono::seconds const expiration_; + + std::atomic generation_ = 0; + std::atomic> cache_; }; -} // namespace detail - -using FullBelowCache = detail::BasicFullBelowCache; - } // namespace ripple #endif diff --git a/src/ripple/shamap/NodeFamily.h b/src/ripple/shamap/NodeFamily.h index f20abccce..c3799530c 100644 --- a/src/ripple/shamap/NodeFamily.h +++ b/src/ripple/shamap/NodeFamily.h @@ -66,6 +66,11 @@ public: return false; } + void resetCacheFor(std::uint32_t) override + { + return; + } + std::shared_ptr getFullBelowCache(std::uint32_t) override { return fbCache_; @@ -79,9 +84,6 @@ public: void sweep() override; - void - reset() override; - void missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& hash) override; @@ -105,6 +107,9 @@ private: void acquire(uint256 const& hash, std::uint32_t seq); + + void + initCaches(); }; } // namespace ripple diff --git a/src/ripple/shamap/SHAMap.h b/src/ripple/shamap/SHAMap.h index 2d1aa192f..5e90476db 100644 --- a/src/ripple/shamap/SHAMap.h +++ b/src/ripple/shamap/SHAMap.h @@ -32,7 +32,6 @@ #include #include #include -#include #include #include #include diff --git a/src/ripple/shamap/ShardFamily.h b/src/ripple/shamap/ShardFamily.h index de809cf58..5a726e0f9 100644 --- a/src/ripple/shamap/ShardFamily.h +++ b/src/ripple/shamap/ShardFamily.h @@ -66,28 +66,18 @@ public: return true; } + void + resetCacheFor(std::uint32_t ledgerSeq) override; + std::shared_ptr getFullBelowCache(std::uint32_t ledgerSeq) override; - /** Return the number of entries in the cache */ - int - getFullBelowCacheSize(); - std::shared_ptr getTreeNodeCache(std::uint32_t ledgerSeq) override; - /** Return a pair where the first item is the number of items cached - and the second item is the number of entries in the cached - */ - std::pair - getTreeNodeCacheSize(); - void sweep() override; - void - reset() override; - void missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash) override; @@ -98,6 +88,24 @@ public: acquire(hash, seq); } + template + void + forEachFullBelowCache(Func&& f) + { + std::lock_guard lock(fbCacheMutex_); + for (auto const& e : fbCache_) + f(e.second.get()); + } + + template + void + forEachTreeNodeCache(Func&& f) + { + std::lock_guard lock(tnCacheMutex_); + for (auto const& e : tnCache_) + f(e.second.get()); + } + private: Application& app_; NodeStore::Database& db_; diff --git a/src/ripple/shamap/impl/NodeFamily.cpp b/src/ripple/shamap/impl/NodeFamily.cpp index 1752db06a..944861b4c 100644 --- a/src/ripple/shamap/impl/NodeFamily.cpp +++ b/src/ripple/shamap/impl/NodeFamily.cpp @@ -26,24 +26,9 @@ namespace ripple { NodeFamily::NodeFamily(Application& app, CollectorManager& cm) - : app_(app) - , db_(app.getNodeStore()) - , j_(app.journal("NodeFamily")) - , fbCache_(std::make_shared( - "Node family full below cache", - stopwatch(), - app.journal("NodeFamilyFulLBelowCache"), - cm.collector(), - fullBelowTargetSize, - fullBelowExpiration)) - , tnCache_(std::make_shared( - "Node family tree node cache", - app.config().getValueFor(SizedItem::treeCacheSize), - std::chrono::seconds( - app.config().getValueFor(SizedItem::treeCacheAge)), - stopwatch(), - j_)) + : app_(app), db_(app.getNodeStore()), j_(app.journal("NodeFamily")) { + initCaches(); } void @@ -54,21 +39,22 @@ NodeFamily::sweep() } void -NodeFamily::reset() +NodeFamily::acquire(uint256 const& hash, std::uint32_t seq) { + if (hash.isNonZero()) { - std::lock_guard lock(maxSeqMutex_); - maxSeq_ = 0; - } + JLOG(j_.error()) << "Missing node in " << to_string(hash); - fbCache_->reset(); - tnCache_->reset(); + app_.getInboundLedgers().acquire( + hash, seq, InboundLedger::Reason::GENERIC); + } } void NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash) { JLOG(j_.error()) << "Missing node in " << seq; + if (app_.config().reporting()) { std::stringstream ss; @@ -103,15 +89,23 @@ NodeFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash) } void -NodeFamily::acquire(uint256 const& hash, std::uint32_t seq) +NodeFamily::initCaches() { - if (hash.isNonZero()) - { - JLOG(j_.error()) << "Missing node in " << to_string(hash); + fbCache_ = std::make_shared( + "NodeFamily", + stopwatch(), + j_, + fullBelowTargetSize, + fullBelowExpiration, + app_.getCollectorManager().collector()); - app_.getInboundLedgers().acquire( - hash, seq, InboundLedger::Reason::GENERIC); - } + tnCache_ = std::make_shared( + "TreeNodeCache: NodeFamily", + app_.config().getValueFor(SizedItem::treeCacheSize), + std::chrono::seconds( + app_.config().getValueFor(SizedItem::treeCacheAge)), + stopwatch(), + j_); } } // namespace ripple diff --git a/src/ripple/shamap/impl/SHAMap.cpp b/src/ripple/shamap/impl/SHAMap.cpp index dbf0c5d66..8ca0e3963 100644 --- a/src/ripple/shamap/impl/SHAMap.cpp +++ b/src/ripple/shamap/impl/SHAMap.cpp @@ -1177,7 +1177,7 @@ SHAMap::canonicalize( assert(node->getHash() == hash); f_.getTreeNodeCache(ledgerSeq_) - ->canonicalize_replace_client(hash.as_uint256(), node); + ->retrieve_or_insert(hash.as_uint256(), node); } void diff --git a/src/ripple/shamap/impl/SHAMapSync.cpp b/src/ripple/shamap/impl/SHAMapSync.cpp index 7333425df..07c625ba9 100644 --- a/src/ripple/shamap/impl/SHAMapSync.cpp +++ b/src/ripple/shamap/impl/SHAMapSync.cpp @@ -323,7 +323,7 @@ SHAMap::getMissingNodes(int max, SHAMapSyncFilter* filter) max, filter, 512, // number of async reads per pass - f_.getFullBelowCache(ledgerSeq_)->getGeneration()); + f_.getFullBelowCache(ledgerSeq_)->generation()); if (!root_->isInner() || std::static_pointer_cast(root_)->isFullBelow( @@ -579,7 +579,7 @@ SHAMap::addKnownNode( return SHAMapAddNode::duplicate(); } - auto const generation = f_.getFullBelowCache(ledgerSeq_)->getGeneration(); + auto const generation = f_.getFullBelowCache(ledgerSeq_)->generation(); SHAMapNodeID iNodeID; auto iNode = root_.get(); diff --git a/src/ripple/shamap/impl/ShardFamily.cpp b/src/ripple/shamap/impl/ShardFamily.cpp index f22d4152e..19cd52e35 100644 --- a/src/ripple/shamap/impl/ShardFamily.cpp +++ b/src/ripple/shamap/impl/ShardFamily.cpp @@ -44,6 +44,39 @@ ShardFamily::ShardFamily(Application& app, CollectorManager& cm) { } +void +ShardFamily::resetCacheFor(std::uint32_t ledgerSeq) +{ + auto const shardIndex = app_.getShardStore()->seqToShardIndex(ledgerSeq); + + { // Destroy the cache without a lock + std::shared_ptr fbc; + + { + std::lock_guard lock(fbCacheMutex_); + if (auto const it = fbCache_.find(shardIndex); it != fbCache_.end()) + { + fbc = std::move(it->second); + fbCache_.erase(it); + } + } + } + + { // Destroy the cache without a lock + std::shared_ptr tnc; + + { + std::lock_guard lock(tnCacheMutex_); + + if (auto const it = tnCache_.find(shardIndex); it != tnCache_.end()) + { + tnc = std::move(it->second); + tnCache_.erase(it); + } + } + } +} + std::shared_ptr ShardFamily::getFullBelowCache(std::uint32_t ledgerSeq) { @@ -54,25 +87,15 @@ ShardFamily::getFullBelowCache(std::uint32_t ledgerSeq) // Create a cache for the corresponding shard auto fbCache{std::make_shared( - "Shard family full below cache shard " + std::to_string(shardIndex), + "Shard #" + std::to_string(shardIndex), stopwatch(), j_, - cm_.collector(), fullBelowTargetSize, - fullBelowExpiration)}; + fullBelowExpiration, + cm_.collector())}; return fbCache_.emplace(shardIndex, std::move(fbCache)).first->second; } -int -ShardFamily::getFullBelowCacheSize() -{ - size_t sz{0}; - std::lock_guard lock(fbCacheMutex_); - for (auto const& e : fbCache_) - sz += e.second->size(); - return sz; -} - std::shared_ptr ShardFamily::getTreeNodeCache(std::uint32_t ledgerSeq) { @@ -91,20 +114,6 @@ ShardFamily::getTreeNodeCache(std::uint32_t ledgerSeq) return tnCache_.emplace(shardIndex, std::move(tnCache)).first->second; } -std::pair -ShardFamily::getTreeNodeCacheSize() -{ - int cacheSz{0}; - int trackSz{0}; - std::lock_guard lock(tnCacheMutex_); - for (auto const& e : tnCache_) - { - cacheSz += e.second->getCacheSize(); - trackSz += e.second->getTrackSize(); - } - return {cacheSz, trackSz}; -} - void ShardFamily::sweep() { @@ -135,23 +144,6 @@ ShardFamily::sweep() } } -void -ShardFamily::reset() -{ - { - std::lock_guard lock(maxSeqMutex_); - maxSeq_ = 0; - } - - { - std::lock_guard lock(fbCacheMutex_); - fbCache_.clear(); - } - - std::lock_guard lock(tnCacheMutex_); - tnCache_.clear(); -} - void ShardFamily::missingNodeAcquireBySeq(std::uint32_t seq, uint256 const& nodeHash) { diff --git a/src/test/basics/KeyCache_test.cpp b/src/test/basics/KeyCache_test.cpp deleted file mode 100644 index 7f3f13e27..000000000 --- a/src/test/basics/KeyCache_test.cpp +++ /dev/null @@ -1,99 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include -#include -#include -#include -#include -#include - -namespace ripple { - -class KeyCache_test : public beast::unit_test::suite -{ -public: - void - run() override - { - using namespace std::chrono_literals; - TestStopwatch clock; - clock.set(0); - - using Key = std::string; - using Cache = TaggedCache; - - test::SuiteJournal j("KeyCacheTest", *this); - - // Insert an item, retrieve it, and age it so it gets purged. - { - Cache c("test", LedgerIndex(1), 2s, clock, j); - - BEAST_EXPECT(c.size() == 0); - BEAST_EXPECT(c.insert("one")); - BEAST_EXPECT(!c.insert("one")); - BEAST_EXPECT(c.size() == 1); - BEAST_EXPECT(c.touch_if_exists("one")); - ++clock; - c.sweep(); - BEAST_EXPECT(c.size() == 1); - ++clock; - c.sweep(); - BEAST_EXPECT(c.size() == 0); - BEAST_EXPECT(!c.touch_if_exists("one")); - } - - // Insert two items, have one expire - { - Cache c("test", LedgerIndex(2), 2s, clock, j); - - BEAST_EXPECT(c.insert("one")); - BEAST_EXPECT(c.size() == 1); - BEAST_EXPECT(c.insert("two")); - BEAST_EXPECT(c.size() == 2); - ++clock; - c.sweep(); - BEAST_EXPECT(c.size() == 2); - BEAST_EXPECT(c.touch_if_exists("two")); - ++clock; - c.sweep(); - BEAST_EXPECT(c.size() == 1); - } - - // Insert three items (1 over limit), sweep - { - Cache c("test", LedgerIndex(2), 3s, clock, j); - - BEAST_EXPECT(c.insert("one")); - ++clock; - BEAST_EXPECT(c.insert("two")); - ++clock; - BEAST_EXPECT(c.insert("three")); - ++clock; - BEAST_EXPECT(c.size() == 3); - c.sweep(); - BEAST_EXPECT(c.size() < 3); - } - } -}; - -BEAST_DEFINE_TESTSUITE(KeyCache, common, ripple); - -} // namespace ripple diff --git a/src/test/basics/TaggedCache_test.cpp b/src/test/basics/TaggedCache_test.cpp index 6a5b44299..9aeb06e87 100644 --- a/src/test/basics/TaggedCache_test.cpp +++ b/src/test/basics/TaggedCache_test.cpp @@ -51,9 +51,8 @@ public: using Key = LedgerIndex; using Value = std::string; - using Cache = TaggedCache; - Cache c("test", 1, 1s, clock, journal); + TaggedCache c("test", 1, 1s, clock, journal); // Insert an item, retrieve it, and age it so it gets purged. { @@ -63,12 +62,6 @@ public: BEAST_EXPECT(c.getCacheSize() == 1); BEAST_EXPECT(c.getTrackSize() == 1); - { - std::string s; - BEAST_EXPECT(c.retrieve(1, s)); - BEAST_EXPECT(s == "one"); - } - ++clock; c.sweep(); BEAST_EXPECT(c.getCacheSize() == 0); @@ -105,7 +98,7 @@ public: { auto const p1 = c.fetch(3); auto p2 = std::make_shared("three"); - c.canonicalize_replace_client(3, p2); + c.retrieve_or_insert(3, p2); BEAST_EXPECT(p1.get() == p2.get()); } ++clock; @@ -136,7 +129,7 @@ public: BEAST_EXPECT(c.getTrackSize() == 1); // Canonicalize a new object with the same key auto p2 = std::make_shared("four"); - BEAST_EXPECT(c.canonicalize_replace_client(4, p2)); + BEAST_EXPECT(c.retrieve_or_insert(4, p2)); BEAST_EXPECT(c.getCacheSize() == 1); BEAST_EXPECT(c.getTrackSize() == 1); // Make sure we get the original object diff --git a/src/test/shamap/common.h b/src/test/shamap/common.h index d89acb988..88c06ae25 100644 --- a/src/test/shamap/common.h +++ b/src/test/shamap/common.h @@ -32,6 +32,8 @@ namespace tests { class TestNodeFamily : public Family { private: + beast::Journal const j_; + std::unique_ptr db_; std::shared_ptr fbCache_; @@ -40,21 +42,28 @@ private: TestStopwatch clock_; NodeStore::DummyScheduler scheduler_; - beast::Journal const j_; +private: + std::shared_ptr + initTreeNodeCache() + { + return std::make_shared( + "App family tree node cache", + 65536, + std::chrono::minutes{1}, + clock_, + j_); + } public: TestNodeFamily(beast::Journal j) - : fbCache_(std::make_shared( + : j_(j) + , fbCache_(std::make_shared( "App family full below cache", clock_, - j)) - , tnCache_(std::make_shared( - "App family tree node cache", - 65536, - std::chrono::minutes{1}, - clock_, - j)) - , j_(j) + j, + 100000, + std::chrono::minutes{2})) + , tnCache_(initTreeNodeCache()) { Section testSection; testSection.set("type", "memory"); @@ -81,6 +90,11 @@ public: return j_; } + void resetCacheFor(std::uint32_t) override + { + (void)0; + } + std::shared_ptr getFullBelowCache(std::uint32_t) override { return fbCache_; @@ -118,13 +132,6 @@ public: Throw("missing node"); } - void - reset() override - { - fbCache_->reset(); - tnCache_->reset(); - } - beast::manual_clock clock() {