rippled
Loading...
Searching...
No Matches
Database.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpl/basics/chrono.h>
21#include <xrpl/beast/core/CurrentThreadName.h>
22#include <xrpl/json/json_value.h>
23#include <xrpl/nodestore/Database.h>
24#include <xrpl/protocol/HashPrefix.h>
25#include <xrpl/protocol/jss.h>
26
27#include <chrono>
28
29namespace ripple {
30namespace NodeStore {
31
33 Scheduler& scheduler,
34 int readThreads,
35 Section const& config,
36 beast::Journal journal)
37 : j_(journal)
38 , scheduler_(scheduler)
39 , earliestLedgerSeq_(
40 get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
41 , requestBundle_(get<int>(config, "rq_bundle", 4))
42 , readThreads_(std::max(1, readThreads))
43{
44 XRPL_ASSERT(
45 readThreads,
46 "ripple::NodeStore::Database::Database : nonzero threads input");
47
48 if (earliestLedgerSeq_ < 1)
49 Throw<std::runtime_error>("Invalid earliest_seq");
50
51 if (requestBundle_ < 1 || requestBundle_ > 64)
52 Throw<std::runtime_error>("Invalid rq_bundle");
53
54 for (int i = readThreads_.load(); i != 0; --i)
55 {
57 [this](int i) {
59
61 "db prefetch #" + std::to_string(i));
62
63 decltype(read_) read;
64
65 while (true)
66 {
67 {
69
70 if (isStopping())
71 break;
72
73 if (read_.empty())
74 {
76 readCondVar_.wait(lock);
78 }
79
80 if (isStopping())
81 break;
82
83 // extract multiple object at a time to minimize the
84 // overhead of acquiring the mutex.
85 for (int cnt = 0;
86 !read_.empty() && cnt != requestBundle_;
87 ++cnt)
88 read.insert(read_.extract(read_.begin()));
89 }
90
91 for (auto it = read.begin(); it != read.end(); ++it)
92 {
93 XRPL_ASSERT(
94 !it->second.empty(),
95 "ripple::NodeStore::Database::Database : non-empty "
96 "data");
97
98 auto const& hash = it->first;
99 auto const& data = it->second;
100 auto const seqn = data[0].first;
101
102 auto obj =
104
105 // This could be further optimized: if there are
106 // multiple requests for sequence numbers mapping to
107 // multiple databases by sorting requests such that all
108 // indices mapping to the same database are grouped
109 // together and serviced by a single read.
110 for (auto const& req : data)
111 {
112 req.second(
113 (seqn == req.first) || isSameDB(req.first, seqn)
114 ? obj
116 hash, req.first, FetchType::async));
117 }
118 }
119
120 read.clear();
121 }
122
124 --readThreads_;
125 },
126 i);
127 t.detach();
128 }
129}
130
132{
133 // NOTE!
134 // Any derived class should call the stop() method in its
135 // destructor. Otherwise, occasionally, the derived class may
136 // crash during shutdown when its members are accessed by one of
137 // these threads after the derived class is destroyed but before
138 // this base class is destroyed.
139 stop();
140}
141
142bool
147
148void
150{
151 {
153
155 {
156 JLOG(j_.debug()) << "Clearing read queue because of stop request";
157 read_.clear();
159 }
160 }
161
162 JLOG(j_.debug()) << "Waiting for stop request to complete...";
163
164 using namespace std::chrono;
165
166 auto const start = steady_clock::now();
167
168 while (readThreads_.load() != 0)
169 {
170 XRPL_ASSERT(
171 steady_clock::now() - start < 30s,
172 "ripple::NodeStore::Database::stop : maximum stop duration");
174 }
175
176 JLOG(j_.debug()) << "Stop request completed in "
177 << duration_cast<std::chrono::milliseconds>(
178 steady_clock::now() - start)
179 .count()
180 << " millseconds";
181}
182
183void
185 uint256 const& hash,
186 std::uint32_t ledgerSeq,
188{
190
191 if (!isStopping())
192 {
193 read_[hash].emplace_back(ledgerSeq, std::move(cb));
195 }
196}
197
198void
200{
201 Batch batch;
203 auto storeBatch = [&, fname = __func__]() {
204 try
205 {
206 dstBackend.storeBatch(batch);
207 }
208 catch (std::exception const& e)
209 {
210 JLOG(j_.error()) << "Exception caught in function " << fname
211 << ". Error: " << e.what();
212 return;
213 }
214
215 std::uint64_t sz{0};
216 for (auto const& nodeObject : batch)
217 sz += nodeObject->getData().size();
218 storeStats(batch.size(), sz);
219 batch.clear();
220 };
221
222 srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
223 XRPL_ASSERT(
224 nodeObject,
225 "ripple::NodeStore::Database::importInternal : non-null node");
226 if (!nodeObject) // This should never happen
227 return;
228
229 batch.emplace_back(std::move(nodeObject));
231 storeBatch();
232 });
233
234 if (!batch.empty())
235 storeBatch();
236}
237
238// Perform a fetch and report the time it took
241 uint256 const& hash,
242 std::uint32_t ledgerSeq,
243 FetchType fetchType,
244 bool duplicate)
245{
246 FetchReport fetchReport(fetchType);
247
248 using namespace std::chrono;
249 auto const begin{steady_clock::now()};
250
251 auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
252 auto dur = steady_clock::now() - begin;
253 fetchDurationUs_ += duration_cast<microseconds>(dur).count();
254 if (nodeObject)
255 {
257 fetchSz_ += nodeObject->getData().size();
258 }
260
261 fetchReport.elapsed = duration_cast<milliseconds>(dur);
262 scheduler_.onFetch(fetchReport);
263 return nodeObject;
264}
265
266void
268{
269 XRPL_ASSERT(
270 obj.isObject(),
271 "ripple::NodeStore::Database::getCountsJson : valid input type");
272
273 {
275 obj["read_queue"] = static_cast<Json::UInt>(read_.size());
276 }
277
278 obj["read_threads_total"] = readThreads_.load();
279 obj["read_threads_running"] = runningThreads_.load();
280 obj["read_request_bundle"] = requestBundle_;
281
282 obj[jss::node_writes] = std::to_string(storeCount_);
283 obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
284 obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
285 obj[jss::node_written_bytes] = std::to_string(storeSz_);
286 obj[jss::node_read_bytes] = std::to_string(fetchSz_);
287 obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
288}
289
290} // namespace NodeStore
291} // namespace ripple
Represents a JSON value.
Definition json_value.h:149
bool isObject() const
A generic endpoint for log messages.
Definition Journal.h:60
Stream error() const
Definition Journal.h:346
Stream debug() const
Definition Journal.h:328
A backend used for the NodeStore.
Definition Backend.h:40
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
Persistency layer for NodeObject.
Definition Database.h:51
void getCountsJson(Json::Value &obj)
Definition Database.cpp:267
std::atomic< std::uint32_t > fetchSz_
Definition Database.h:232
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:248
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:184
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
virtual ~Database()
Destroy the node store.
Definition Database.cpp:131
std::condition_variable readCondVar_
Definition Database.h:277
std::atomic< std::uint64_t > storeCount_
Definition Database.h:270
std::uint32_t const earliestLedgerSeq_
Definition Database.h:240
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition Database.h:285
std::atomic< std::uint64_t > storeSz_
Definition Database.h:271
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:240
std::atomic< bool > readStopping_
Definition Database.h:287
std::atomic< std::uint32_t > fetchHitCount_
Definition Database.h:231
beast::Journal const j_
Definition Database.h:227
std::atomic< std::uint64_t > fetchDurationUs_
Definition Database.h:273
std::atomic< int > runningThreads_
Definition Database.h:289
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
std::atomic< std::uint64_t > fetchTotalCount_
Definition Database.h:272
std::atomic< int > readThreads_
Definition Database.h:288
void importInternal(Backend &dstBackend, Database &srcDB)
Definition Database.cpp:199
Scheduling for asynchronous backend activity.
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:45
T detach(T... args)
T exchange(T... args)
T is_same_v
T load(T... args)
unsigned int UInt
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
void read(nudb::detail::istream &is, std::size_t &u)
Definition varint.h:121
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:25
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
STL namespace.
T reserve(T... args)
Contains information about a fetch operation.
T to_string(T... args)
T what(T... args)
T yield(T... args)