Negative cache support for node store

Adds support to TaggedCache to support smart replacement
(Needed to avoid race conditions with negative caching.)

Create a "hotDUMMY" object that represents the absence
of an object.

Allow DatabaseNodeImp::asyncFetch to complete immediately
if object is in cache (positive or negative).

Fix a bug in asyncFetch where the object returned may not
be the correct canonical version because we stash the
object in the results array before we canonicalize it.
This commit is contained in:
JoelKatz
2021-12-29 23:10:22 -08:00
committed by manojsdoshi
parent b9903bbcc4
commit 3eb8aa8b80
5 changed files with 76 additions and 20 deletions

View File

@@ -300,19 +300,16 @@ public:
@param key The key corresponding to the object @param key The key corresponding to the object
@param data A shared pointer to the data corresponding to the object. @param data A shared pointer to the data corresponding to the object.
@param replace `true` if `data` is the up to date version of the object. @param replace Function that decides if cache should be replaced
@return `true` If the key already existed. @return `true` If the key already existed.
*/ */
private: public:
template <bool replace>
bool bool
canonicalize( canonicalize(
const key_type& key, const key_type& key,
std::conditional_t< std::shared_ptr<T>& data,
replace, std::function<bool(std::shared_ptr<T> const&)>&& replace)
std::shared_ptr<T> const,
std::shared_ptr<T>>& data)
{ {
// Return canonical value, store if needed, refresh in cache // Return canonical value, store if needed, refresh in cache
// Return values: true=we had the data already // Return values: true=we had the data already
@@ -335,7 +332,7 @@ private:
if (entry.isCached()) if (entry.isCached())
{ {
if constexpr (replace) if (replace(entry.ptr))
{ {
entry.ptr = data; entry.ptr = data;
entry.weak_ptr = data; entry.weak_ptr = data;
@@ -352,7 +349,7 @@ private:
if (cachedData) if (cachedData)
{ {
if constexpr (replace) if (replace(entry.ptr))
{ {
entry.ptr = data; entry.ptr = data;
entry.weak_ptr = data; entry.weak_ptr = data;
@@ -374,19 +371,22 @@ private:
return false; return false;
} }
public:
bool bool
canonicalize_replace_cache( canonicalize_replace_cache(
const key_type& key, const key_type& key,
std::shared_ptr<T> const& data) std::shared_ptr<T> const& data)
{ {
return canonicalize<true>(key, data); return canonicalize(
key,
const_cast<std::shared_ptr<T>&>(data),
[](std::shared_ptr<T> const&) { return true; });
} }
bool bool
canonicalize_replace_client(const key_type& key, std::shared_ptr<T>& data) canonicalize_replace_client(const key_type& key, std::shared_ptr<T>& data)
{ {
return canonicalize<false>(key, data); return canonicalize(
key, data, [](std::shared_ptr<T> const&) { return false; });
} }
std::shared_ptr<T> std::shared_ptr<T>

View File

@@ -156,7 +156,7 @@ public:
object is stored, used by the shard store. object is stored, used by the shard store.
@param callback Callback function when read completes @param callback Callback function when read completes
*/ */
void virtual void
asyncFetch( asyncFetch(
uint256 const& hash, uint256 const& hash,
std::uint32_t ledgerSeq, std::uint32_t ledgerSeq,

View File

@@ -33,7 +33,8 @@ enum NodeObjectType : std::uint32_t {
hotUNKNOWN = 0, hotUNKNOWN = 0,
hotLEDGER = 1, hotLEDGER = 1,
hotACCOUNT_NODE = 3, hotACCOUNT_NODE = 3,
hotTRANSACTION_NODE = 4 hotTRANSACTION_NODE = 4,
hotDUMMY = 512 // an invalid or missing object
}; };
/** A simple object that the Ledger uses to store entries. /** A simple object that the Ledger uses to store entries.

View File

@@ -33,7 +33,34 @@ DatabaseNodeImp::store(
{ {
storeStats(1, data.size()); storeStats(1, data.size());
backend_->store(NodeObject::createObject(type, std::move(data), hash)); auto obj = NodeObject::createObject(type, std::move(data), hash);
backend_->store(obj);
if (cache_)
{
// After the store, replace a negative cache entry if there is one
cache_->canonicalize(
hash, obj, [](std::shared_ptr<NodeObject> const& n) {
return n->getType() == hotDUMMY;
});
}
}
void
DatabaseNodeImp::asyncFetch(
uint256 const& hash,
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
{
if (cache_)
{
std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
if (obj)
{
callback(obj->getType() == hotDUMMY ? nullptr : obj);
return;
}
}
Database::asyncFetch(hash, ledgerSeq, std::move(callback));
} }
void void
@@ -75,8 +102,19 @@ DatabaseNodeImp::fetchNodeObject(
switch (status) switch (status)
{ {
case ok: case ok:
if (nodeObject && cache_) if (cache_)
{
if (nodeObject)
cache_->canonicalize_replace_client(hash, nodeObject); cache_->canonicalize_replace_client(hash, nodeObject);
else
{
auto notFound =
NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nodeObject = notFound;
}
}
break; break;
case notFound: case notFound:
break; break;
@@ -95,6 +133,8 @@ DatabaseNodeImp::fetchNodeObject(
{ {
JLOG(j_.trace()) << "fetchNodeObject " << hash JLOG(j_.trace()) << "fetchNodeObject " << hash
<< ": record found in cache"; << ": record found in cache";
if (nodeObject->getType() == hotDUMMY)
nodeObject.reset();
} }
if (nodeObject) if (nodeObject)
@@ -127,7 +167,7 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
} }
else else
{ {
results[i] = nObj; results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
// It was in the cache. // It was in the cache.
++hits; ++hits;
} }
@@ -140,9 +180,8 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
for (size_t i = 0; i < dbResults.size(); ++i) for (size_t i = 0; i < dbResults.size(); ++i)
{ {
auto nObj = dbResults[i]; auto nObj = std::move(dbResults[i]);
size_t index = indexMap[cacheMisses[i]]; size_t index = indexMap[cacheMisses[i]];
results[index] = nObj;
auto const& hash = hashes[index]; auto const& hash = hashes[index];
if (nObj) if (nObj)
@@ -156,8 +195,16 @@ DatabaseNodeImp::fetchBatch(std::vector<uint256> const& hashes)
JLOG(j_.error()) JLOG(j_.error())
<< "fetchBatch - " << "fetchBatch - "
<< "record not found in db or cache. hash = " << strHex(hash); << "record not found in db or cache. hash = " << strHex(hash);
if (cache_)
{
auto notFound = NodeObject::createObject(hotDUMMY, {}, hash);
cache_->canonicalize_replace_client(hash, notFound);
if (notFound->getType() != hotDUMMY)
nObj = std::move(notFound);
} }
} }
results[index] = std::move(nObj);
}
auto fetchDurationUs = auto fetchDurationUs =
std::chrono::duration_cast<std::chrono::microseconds>( std::chrono::duration_cast<std::chrono::microseconds>(

View File

@@ -111,6 +111,7 @@ public:
// only one database // only one database
return true; return true;
} }
void void
sync() override sync() override
{ {
@@ -120,6 +121,13 @@ public:
std::vector<std::shared_ptr<NodeObject>> std::vector<std::shared_ptr<NodeObject>>
fetchBatch(std::vector<uint256> const& hashes); fetchBatch(std::vector<uint256> const& hashes);
void
asyncFetch(
uint256 const& hash,
std::uint32_t ledgerSeq,
std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
override;
bool bool
storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override storeLedger(std::shared_ptr<Ledger const> const& srcLedger) override
{ {