rippled
Loading...
Searching...
No Matches
DatabaseNodeImp.cpp
1#include <xrpl/nodestore/detail/DatabaseNodeImp.h>
2
3namespace xrpl {
4namespace NodeStore {
5
6void
8{
9 storeStats(1, data.size());
10
11 auto obj = NodeObject::createObject(type, std::move(data), hash);
12 backend_->store(obj);
13 if (cache_)
14 {
15 // After the store, replace a negative cache entry if there is one
16 cache_->canonicalize(hash, obj, [](std::shared_ptr<NodeObject> const& n) { return n->getType() == hotDUMMY; });
17 }
18}
19
20void
22 uint256 const& hash,
23 std::uint32_t ledgerSeq,
24 std::function<void(std::shared_ptr<NodeObject> const&)>&& callback)
25{
26 if (cache_)
27 {
28 std::shared_ptr<NodeObject> obj = cache_->fetch(hash);
29 if (obj)
30 {
31 callback(obj->getType() == hotDUMMY ? nullptr : obj);
32 return;
33 }
34 }
35 Database::asyncFetch(hash, ledgerSeq, std::move(callback));
36}
37
38void
40{
41 if (cache_)
42 cache_->sweep();
43}
44
46DatabaseNodeImp::fetchNodeObject(uint256 const& hash, std::uint32_t, FetchReport& fetchReport, bool duplicate)
47{
48 std::shared_ptr<NodeObject> nodeObject = cache_ ? cache_->fetch(hash) : nullptr;
49
50 if (!nodeObject)
51 {
52 JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record not " << (cache_ ? "cached" : "found");
53
54 Status status;
55
56 try
57 {
58 status = backend_->fetch(hash.data(), &nodeObject);
59 }
60 catch (std::exception const& e)
61 {
62 JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": Exception fetching from backend: " << e.what();
63 Rethrow();
64 }
65
66 switch (status)
67 {
68 case ok:
69 if (cache_)
70 {
71 if (nodeObject)
72 cache_->canonicalize_replace_client(hash, nodeObject);
73 else
74 {
76 cache_->canonicalize_replace_client(hash, notFound);
77 if (notFound->getType() != hotDUMMY)
78 nodeObject = notFound;
79 }
80 }
81 break;
82 case notFound:
83 break;
84 case dataCorrupt:
85 JLOG(j_.fatal()) << "fetchNodeObject " << hash << ": nodestore data is corrupted";
86 break;
87 default:
88 JLOG(j_.warn()) << "fetchNodeObject " << hash << ": backend returns unknown result " << status;
89 break;
90 }
91 }
92 else
93 {
94 JLOG(j_.trace()) << "fetchNodeObject " << hash << ": record found in cache";
95 if (nodeObject->getType() == hotDUMMY)
96 nodeObject.reset();
97 }
98
99 if (nodeObject)
100 fetchReport.wasFound = true;
101
102 return nodeObject;
103}
104
107{
109 using namespace std::chrono;
110 auto const before = steady_clock::now();
112 std::vector<uint256 const*> cacheMisses;
113 uint64_t hits = 0;
114 uint64_t fetches = 0;
115 for (size_t i = 0; i < hashes.size(); ++i)
116 {
117 auto const& hash = hashes[i];
118 // See if the object already exists in the cache
119 auto nObj = cache_ ? cache_->fetch(hash) : nullptr;
120 ++fetches;
121 if (!nObj)
122 {
123 // Try the database
124 indexMap[&hash] = i;
125 cacheMisses.push_back(&hash);
126 }
127 else
128 {
129 results[i] = nObj->getType() == hotDUMMY ? nullptr : nObj;
130 // It was in the cache.
131 ++hits;
132 }
133 }
134
135 JLOG(j_.debug()) << "fetchBatch - cache hits = " << (hashes.size() - cacheMisses.size())
136 << " - cache misses = " << cacheMisses.size();
137 auto dbResults = backend_->fetchBatch(cacheMisses).first;
138
139 for (size_t i = 0; i < dbResults.size(); ++i)
140 {
141 auto nObj = std::move(dbResults[i]);
142 size_t index = indexMap[cacheMisses[i]];
143 auto const& hash = hashes[index];
144
145 if (nObj)
146 {
147 // Ensure all threads get the same object
148 if (cache_)
149 cache_->canonicalize_replace_client(hash, nObj);
150 }
151 else
152 {
153 JLOG(j_.error()) << "fetchBatch - "
154 << "record not found in db or cache. hash = " << strHex(hash);
155 if (cache_)
156 {
158 cache_->canonicalize_replace_client(hash, notFound);
159 if (notFound->getType() != hotDUMMY)
160 nObj = std::move(notFound);
161 }
162 }
163 results[index] = std::move(nObj);
164 }
165
166 auto fetchDurationUs = std::chrono::duration_cast<std::chrono::microseconds>(steady_clock::now() - before).count();
167 updateFetchMetrics(fetches, hits, fetchDurationUs);
168 return results;
169}
170
171} // namespace NodeStore
172} // namespace xrpl
Stream fatal() const
Definition Journal.h:325
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
Stream trace() const
Severity stream access functions.
Definition Journal.h:295
Stream warn() const
Definition Journal.h:313
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
std::shared_ptr< TaggedCache< uint256, NodeObject > > cache_
void store(NodeObjectType type, Blob &&data, uint256 const &hash, std::uint32_t) override
Store the object.
std::shared_ptr< Backend > backend_
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t, FetchReport &fetchReport, bool duplicate) override
void sweep() override
Remove expired entries from the positive and negative caches.
void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback) override
Fetch an object without waiting.
std::vector< std::shared_ptr< NodeObject > > fetchBatch(std::vector< uint256 > const &hashes)
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:221
void updateFetchMetrics(uint64_t fetches, uint64_t hits, uint64_t duration)
Definition Database.h:233
beast::Journal const j_
Definition Database.h:200
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:149
pointer data()
Definition base_uint.h:102
Status
Return codes from Backend operations.
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:6
std::string strHex(FwdIt begin, FwdIt end)
Definition strHex.h:11
NodeObjectType
The types of node objects.
Definition NodeObject.h:13
@ hotDUMMY
Definition NodeObject.h:18
void Rethrow()
Rethrow the exception currently being handled.
Definition contract.h:29
T push_back(T... args)
T reset(T... args)
T size(T... args)
Contains information about a fetch operation.
T what(T... args)