rippled
Loading...
Searching...
No Matches
Database.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/nodestore/Database.h>
21#include <xrpl/basics/chrono.h>
22#include <xrpl/beast/core/CurrentThreadName.h>
23#include <xrpl/json/json_value.h>
24#include <xrpl/protocol/HashPrefix.h>
25#include <xrpl/protocol/jss.h>
26#include <chrono>
27
28namespace ripple {
29namespace NodeStore {
30
32 Scheduler& scheduler,
33 int readThreads,
34 Section const& config,
35 beast::Journal journal)
36 : j_(journal)
37 , scheduler_(scheduler)
38 , earliestLedgerSeq_(
39 get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
40 , requestBundle_(get<int>(config, "rq_bundle", 4))
41 , readThreads_(std::max(1, readThreads))
42{
43 XRPL_ASSERT(
44 readThreads,
45 "ripple::NodeStore::Database::Database : nonzero threads input");
46
47 if (earliestLedgerSeq_ < 1)
48 Throw<std::runtime_error>("Invalid earliest_seq");
49
50 if (requestBundle_ < 1 || requestBundle_ > 64)
51 Throw<std::runtime_error>("Invalid rq_bundle");
52
53 for (int i = readThreads_.load(); i != 0; --i)
54 {
56 [this](int i) {
58
60 "db prefetch #" + std::to_string(i));
61
62 decltype(read_) read;
63
64 while (true)
65 {
66 {
68
69 if (isStopping())
70 break;
71
72 if (read_.empty())
73 {
75 readCondVar_.wait(lock);
77 }
78
79 if (isStopping())
80 break;
81
82 // extract multiple object at a time to minimize the
83 // overhead of acquiring the mutex.
84 for (int cnt = 0;
85 !read_.empty() && cnt != requestBundle_;
86 ++cnt)
87 read.insert(read_.extract(read_.begin()));
88 }
89
90 for (auto it = read.begin(); it != read.end(); ++it)
91 {
92 XRPL_ASSERT(
93 !it->second.empty(),
94 "ripple::NodeStore::Database::Database : non-empty "
95 "data");
96
97 auto const& hash = it->first;
98 auto const& data = it->second;
99 auto const seqn = data[0].first;
100
101 auto obj =
103
104 // This could be further optimized: if there are
105 // multiple requests for sequence numbers mapping to
106 // multiple databases by sorting requests such that all
107 // indices mapping to the same database are grouped
108 // together and serviced by a single read.
109 for (auto const& req : data)
110 {
111 req.second(
112 (seqn == req.first) || isSameDB(req.first, seqn)
113 ? obj
115 hash, req.first, FetchType::async));
116 }
117 }
118
119 read.clear();
120 }
121
123 --readThreads_;
124 },
125 i);
126 t.detach();
127 }
128}
129
131{
132 // NOTE!
133 // Any derived class should call the stop() method in its
134 // destructor. Otherwise, occasionally, the derived class may
135 // crash during shutdown when its members are accessed by one of
136 // these threads after the derived class is destroyed but before
137 // this base class is destroyed.
138 stop();
139}
140
141bool
143{
144 return readStopping_.load(std::memory_order_relaxed);
145}
146
147void
149{
150 {
152
153 if (!readStopping_.exchange(true, std::memory_order_relaxed))
154 {
155 JLOG(j_.debug()) << "Clearing read queue because of stop request";
156 read_.clear();
158 }
159 }
160
161 JLOG(j_.debug()) << "Waiting for stop request to complete...";
162
163 using namespace std::chrono;
164
165 auto const start = steady_clock::now();
166
167 while (readThreads_.load() != 0)
168 {
169 XRPL_ASSERT(
170 steady_clock::now() - start < 30s,
171 "ripple::NodeStore::Database::stop : maximum stop duration");
173 }
174
175 JLOG(j_.debug()) << "Stop request completed in "
176 << duration_cast<std::chrono::milliseconds>(
177 steady_clock::now() - start)
178 .count()
179 << " millseconds";
180}
181
182void
184 uint256 const& hash,
185 std::uint32_t ledgerSeq,
187{
189
190 if (!isStopping())
191 {
192 read_[hash].emplace_back(ledgerSeq, std::move(cb));
194 }
195}
196
197void
199{
200 Batch batch;
202 auto storeBatch = [&, fname = __func__]() {
203 try
204 {
205 dstBackend.storeBatch(batch);
206 }
207 catch (std::exception const& e)
208 {
209 JLOG(j_.error()) << "Exception caught in function " << fname
210 << ". Error: " << e.what();
211 return;
212 }
213
214 std::uint64_t sz{0};
215 for (auto const& nodeObject : batch)
216 sz += nodeObject->getData().size();
217 storeStats(batch.size(), sz);
218 batch.clear();
219 };
220
221 srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
222 XRPL_ASSERT(
223 nodeObject,
224 "ripple::NodeStore::Database::importInternal : non-null node");
225 if (!nodeObject) // This should never happen
226 return;
227
228 batch.emplace_back(std::move(nodeObject));
229 if (batch.size() >= batchWritePreallocationSize)
230 storeBatch();
231 });
232
233 if (!batch.empty())
234 storeBatch();
235}
236
237// Perform a fetch and report the time it took
240 uint256 const& hash,
241 std::uint32_t ledgerSeq,
242 FetchType fetchType,
243 bool duplicate)
244{
245 FetchReport fetchReport(fetchType);
246
247 using namespace std::chrono;
248 auto const begin{steady_clock::now()};
249
250 auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
251 auto dur = steady_clock::now() - begin;
252 fetchDurationUs_ += duration_cast<microseconds>(dur).count();
253 if (nodeObject)
254 {
256 fetchSz_ += nodeObject->getData().size();
257 }
259
260 fetchReport.elapsed = duration_cast<milliseconds>(dur);
261 scheduler_.onFetch(fetchReport);
262 return nodeObject;
263}
264
265void
267{
268 XRPL_ASSERT(
269 obj.isObject(),
270 "ripple::NodeStore::Database::getCountsJson : valid input type");
271
272 {
274 obj["read_queue"] = static_cast<Json::UInt>(read_.size());
275 }
276
277 obj["read_threads_total"] = readThreads_.load();
278 obj["read_threads_running"] = runningThreads_.load();
279 obj["read_request_bundle"] = requestBundle_;
280
281 obj[jss::node_writes] = std::to_string(storeCount_);
282 obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
283 obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
284 obj[jss::node_written_bytes] = std::to_string(storeSz_);
285 obj[jss::node_read_bytes] = std::to_string(fetchSz_);
286 obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
287}
288
289} // namespace NodeStore
290} // namespace ripple
Represents a JSON value.
Definition: json_value.h:147
bool isObject() const
A generic endpoint for log messages.
Definition: Journal.h:59
Stream error() const
Definition: Journal.h:335
Stream debug() const
Definition: Journal.h:317
A backend used for the NodeStore.
Definition: Backend.h:40
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
Persistency layer for NodeObject.
Definition: Database.h:50
void getCountsJson(Json::Value &obj)
Definition: Database.cpp:266
std::atomic< std::uint32_t > fetchSz_
Definition: Database.h:231
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition: Database.h:247
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition: Database.cpp:183
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
virtual ~Database()
Destroy the node store.
Definition: Database.cpp:130
std::condition_variable readCondVar_
Definition: Database.h:276
std::atomic< std::uint64_t > storeCount_
Definition: Database.h:269
std::uint32_t const earliestLedgerSeq_
Definition: Database.h:239
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition: Database.h:284
std::atomic< std::uint64_t > storeSz_
Definition: Database.h:270
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition: Database.cpp:239
std::atomic< bool > readStopping_
Definition: Database.h:286
std::atomic< std::uint32_t > fetchHitCount_
Definition: Database.h:230
beast::Journal const j_
Definition: Database.h:226
std::atomic< std::uint64_t > fetchDurationUs_
Definition: Database.h:272
std::atomic< int > runningThreads_
Definition: Database.h:288
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
std::atomic< std::uint64_t > fetchTotalCount_
Definition: Database.h:271
std::atomic< int > readThreads_
Definition: Database.h:287
void importInternal(Backend &dstBackend, Database &srcDB)
Definition: Database.cpp:198
Scheduling for asynchronous backend activity.
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition: BasicConfig.h:46
T detach(T... args)
T emplace_back(T... args)
T empty(T... args)
T exchange(T... args)
T load(T... args)
unsigned int UInt
Definition: json_forwards.h:27
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void read(nudb::detail::istream &is, std::size_t &u)
Definition: varint.h:120
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
Definition: BasicConfig.h:356
STL namespace.
T reserve(T... args)
T size(T... args)
Contains information about a fetch operation.
T to_string(T... args)
T what(T... args)
T yield(T... args)