rippled
Loading...
Searching...
No Matches
Database.cpp
1#include <xrpl/basics/chrono.h>
2#include <xrpl/beast/core/CurrentThreadName.h>
3#include <xrpl/json/json_value.h>
4#include <xrpl/nodestore/Database.h>
5#include <xrpl/protocol/HashPrefix.h>
6#include <xrpl/protocol/jss.h>
7
8#include <chrono>
9
10namespace xrpl {
11namespace NodeStore {
12
14 Scheduler& scheduler,
15 int readThreads,
16 Section const& config,
17 beast::Journal journal)
18 : j_(journal)
19 , scheduler_(scheduler)
20 , earliestLedgerSeq_(get<std::uint32_t>(config, "earliest_seq", XRP_LEDGER_EARLIEST_SEQ))
21 , requestBundle_(get<int>(config, "rq_bundle", 4))
22 , readThreads_(std::max(1, readThreads))
23{
24 XRPL_ASSERT(readThreads, "xrpl::NodeStore::Database::Database : nonzero threads input");
25
26 if (earliestLedgerSeq_ < 1)
27 Throw<std::runtime_error>("Invalid earliest_seq");
28
29 if (requestBundle_ < 1 || requestBundle_ > 64)
30 Throw<std::runtime_error>("Invalid rq_bundle");
31
32 for (int i = readThreads_.load(); i != 0; --i)
33 {
35 [this](int i) {
37
38 beast::setCurrentThreadName("db prefetch #" + std::to_string(i));
39
40 decltype(read_) read;
41
42 while (true)
43 {
44 {
46
47 if (isStopping())
48 break;
49
50 if (read_.empty())
51 {
53 readCondVar_.wait(lock);
55 }
56
57 if (isStopping())
58 break;
59
60 // extract multiple object at a time to minimize the
61 // overhead of acquiring the mutex.
62 for (int cnt = 0; !read_.empty() && cnt != requestBundle_; ++cnt)
63 read.insert(read_.extract(read_.begin()));
64 }
65
66 for (auto it = read.begin(); it != read.end(); ++it)
67 {
68 XRPL_ASSERT(
69 !it->second.empty(),
70 "xrpl::NodeStore::Database::Database : non-empty "
71 "data");
72
73 auto const& hash = it->first;
74 auto const& data = it->second;
75 auto const seqn = data[0].first;
76
77 auto obj = fetchNodeObject(hash, seqn, FetchType::async);
78
79 // This could be further optimized: if there are
80 // multiple requests for sequence numbers mapping to
81 // multiple databases by sorting requests such that all
82 // indices mapping to the same database are grouped
83 // together and serviced by a single read.
84 for (auto const& req : data)
85 {
86 req.second(
87 (seqn == req.first) || isSameDB(req.first, seqn)
88 ? obj
89 : fetchNodeObject(hash, req.first, FetchType::async));
90 }
91 }
92
93 read.clear();
94 }
95
98 },
99 i);
100 t.detach();
101 }
102}
103
105{
106 // NOTE!
107 // Any derived class should call the stop() method in its
108 // destructor. Otherwise, occasionally, the derived class may
109 // crash during shutdown when its members are accessed by one of
110 // these threads after the derived class is destroyed but before
111 // this base class is destroyed.
112 stop();
113}
114
115bool
120
121void
123{
124 {
126
128 {
129 JLOG(j_.debug()) << "Clearing read queue because of stop request";
130 read_.clear();
132 }
133 }
134
135 JLOG(j_.debug()) << "Waiting for stop request to complete...";
136
137 using namespace std::chrono;
138
139 auto const start = steady_clock::now();
140
141 while (readThreads_.load() != 0)
142 {
143 XRPL_ASSERT(
144 steady_clock::now() - start < 30s,
145 "xrpl::NodeStore::Database::stop : maximum stop duration");
147 }
148
149 JLOG(j_.debug())
150 << "Stop request completed in "
151 << duration_cast<std::chrono::milliseconds>(steady_clock::now() - start).count()
152 << " milliseconds";
153}
154
155void
157 uint256 const& hash,
158 std::uint32_t ledgerSeq,
160{
162
163 if (!isStopping())
164 {
165 read_[hash].emplace_back(ledgerSeq, std::move(cb));
167 }
168}
169
170void
172{
173 Batch batch;
175 auto storeBatch = [&, fname = __func__]() {
176 try
177 {
178 dstBackend.storeBatch(batch);
179 }
180 catch (std::exception const& e)
181 {
182 JLOG(j_.error()) << "Exception caught in function " << fname << ". Error: " << e.what();
183 return;
184 }
185
186 std::uint64_t sz{0};
187 for (auto const& nodeObject : batch)
188 sz += nodeObject->getData().size();
189 storeStats(batch.size(), sz);
190 batch.clear();
191 };
192
193 srcDB.for_each([&](std::shared_ptr<NodeObject> nodeObject) {
194 XRPL_ASSERT(nodeObject, "xrpl::NodeStore::Database::importInternal : non-null node");
195 if (!nodeObject) // This should never happen
196 return;
197
198 batch.emplace_back(std::move(nodeObject));
200 storeBatch();
201 });
202
203 if (!batch.empty())
204 storeBatch();
205}
206
207// Perform a fetch and report the time it took
210 uint256 const& hash,
211 std::uint32_t ledgerSeq,
212 FetchType fetchType,
213 bool duplicate)
214{
215 FetchReport fetchReport(fetchType);
216
217 using namespace std::chrono;
218 auto const begin{steady_clock::now()};
219
220 auto nodeObject{fetchNodeObject(hash, ledgerSeq, fetchReport, duplicate)};
221 auto dur = steady_clock::now() - begin;
222 fetchDurationUs_ += duration_cast<microseconds>(dur).count();
223 if (nodeObject)
224 {
226 fetchSz_ += nodeObject->getData().size();
227 }
229
230 fetchReport.elapsed = duration_cast<milliseconds>(dur);
231 scheduler_.onFetch(fetchReport);
232 return nodeObject;
233}
234
235void
237{
238 XRPL_ASSERT(obj.isObject(), "xrpl::NodeStore::Database::getCountsJson : valid input type");
239
240 {
242 obj["read_queue"] = static_cast<Json::UInt>(read_.size());
243 }
244
245 obj["read_threads_total"] = readThreads_.load();
246 obj["read_threads_running"] = runningThreads_.load();
247 obj["read_request_bundle"] = requestBundle_;
248
249 obj[jss::node_writes] = std::to_string(storeCount_);
250 obj[jss::node_reads_total] = std::to_string(fetchTotalCount_);
251 obj[jss::node_reads_hit] = std::to_string(fetchHitCount_);
252 obj[jss::node_written_bytes] = std::to_string(storeSz_);
253 obj[jss::node_read_bytes] = std::to_string(fetchSz_);
254 obj[jss::node_reads_duration_us] = std::to_string(fetchDurationUs_);
255}
256
257} // namespace NodeStore
258} // namespace xrpl
Represents a JSON value.
Definition json_value.h:130
bool isObject() const
A generic endpoint for log messages.
Definition Journal.h:40
Stream error() const
Definition Journal.h:319
Stream debug() const
Definition Journal.h:301
A backend used for the NodeStore.
Definition Backend.h:20
virtual void storeBatch(Batch const &batch)=0
Store a group of objects.
Persistency layer for NodeObject.
Definition Database.h:31
void storeStats(std::uint64_t count, std::uint64_t sz)
Definition Database.h:216
std::condition_variable readCondVar_
Definition Database.h:243
std::atomic< std::uint64_t > fetchTotalCount_
Definition Database.h:238
std::atomic< int > readThreads_
Definition Database.h:253
std::atomic< int > runningThreads_
Definition Database.h:254
beast::Journal const j_
Definition Database.h:195
virtual void for_each(std::function< void(std::shared_ptr< NodeObject >)> f)=0
Visit every object in the database This is usually called during import.
std::map< uint256, std::vector< std::pair< std::uint32_t, std::function< void(std::shared_ptr< NodeObject > const &)> > > > read_
Definition Database.h:250
std::atomic< bool > readStopping_
Definition Database.h:252
virtual void asyncFetch(uint256 const &hash, std::uint32_t ledgerSeq, std::function< void(std::shared_ptr< NodeObject > const &)> &&callback)
Fetch an object without waiting.
Definition Database.cpp:156
std::atomic< std::uint64_t > fetchDurationUs_
Definition Database.h:239
virtual bool isSameDB(std::uint32_t s1, std::uint32_t s2)=0
virtual ~Database()
Destroy the node store.
Definition Database.cpp:104
std::atomic< std::uint64_t > storeSz_
Definition Database.h:237
std::uint32_t const earliestLedgerSeq_
Definition Database.h:208
std::atomic< std::uint32_t > fetchHitCount_
Definition Database.h:199
std::atomic< std::uint64_t > storeCount_
Definition Database.h:236
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, std::uint32_t ledgerSeq=0, FetchType fetchType=FetchType::synchronous, bool duplicate=false)
Fetch a node object.
Definition Database.cpp:209
void getCountsJson(Json::Value &obj)
Definition Database.cpp:236
std::atomic< std::uint32_t > fetchSz_
Definition Database.h:200
void importInternal(Backend &dstBackend, Database &srcDB)
Definition Database.cpp:171
Scheduling for asynchronous backend activity.
virtual void onFetch(FetchReport const &report)=0
Reports completion of a fetch Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:24
T detach(T... args)
T exchange(T... args)
T is_same_v
T load(T... args)
unsigned int UInt
void setCurrentThreadName(std::string_view newThreadName)
Changes the name of the caller thread.
STL namespace.
void read(nudb::detail::istream &is, std::size_t &u)
Definition varint.h:101
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_SEQ
The XRP ledger network's earliest allowed sequence.
T reserve(T... args)
Contains information about a fetch operation.
T to_string(T... args)
T what(T... args)
T yield(T... args)