rippled
Loading...
Searching...
No Matches
NuDBFactory.cpp
1#include <xrpl/basics/contract.h>
2#include <xrpl/beast/core/LexicalCast.h>
3#include <xrpl/beast/utility/instrumentation.h>
4#include <xrpl/nodestore/Factory.h>
5#include <xrpl/nodestore/Manager.h>
6#include <xrpl/nodestore/detail/DecodedBlob.h>
7#include <xrpl/nodestore/detail/EncodedBlob.h>
8#include <xrpl/nodestore/detail/codec.h>
9
10#include <boost/filesystem.hpp>
11
12#include <nudb/nudb.hpp>
13
14#include <chrono>
15#include <cstdint>
16#include <cstdio>
17#include <exception>
18#include <memory>
19
20namespace xrpl {
21namespace NodeStore {
22
23class NuDBBackend : public Backend
24{
25public:
26 // "appnum" is an application-defined constant stored in the header of a
27 // NuDB database. We used it to identify shard databases before that code
28 // was removed. For now, its only use is a sanity check that the database
29 // was created by xrpld.
30 static constexpr std::uint64_t appnum = 1;
31
33 size_t const keyBytes_;
37 nudb::store db_;
40
42 size_t keyBytes,
43 Section const& keyValues,
45 Scheduler& scheduler,
46 beast::Journal journal)
47 : j_(journal)
48 , keyBytes_(keyBytes)
50 , name_(get(keyValues, "path"))
51 , blockSize_(parseBlockSize(name_, keyValues, journal))
52 , deletePath_(false)
53 , scheduler_(scheduler)
54 {
55 if (name_.empty())
56 Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
57 }
58
60 size_t keyBytes,
61 Section const& keyValues,
63 Scheduler& scheduler,
64 nudb::context& context,
65 beast::Journal journal)
66 : j_(journal)
67 , keyBytes_(keyBytes)
69 , name_(get(keyValues, "path"))
70 , blockSize_(parseBlockSize(name_, keyValues, journal))
71 , db_(context)
72 , deletePath_(false)
73 , scheduler_(scheduler)
74 {
75 if (name_.empty())
76 Throw<std::runtime_error>("nodestore: Missing path in NuDB backend");
77 }
78
79 ~NuDBBackend() override
80 {
81 try
82 {
83 // close can throw and we don't want the destructor to throw.
84 close();
85 }
86 catch (nudb::system_error const&)
87 {
88 // Don't allow exceptions to propagate out of destructors.
89 // close() has already logged the error.
90 }
91 }
92
94 getName() override
95 {
96 return name_;
97 }
98
100 getBlockSize() const override
101 {
102 return blockSize_;
103 }
104
105 void
106 open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt) override
107 {
108 using namespace boost::filesystem;
109 if (db_.is_open())
110 {
111 // LCOV_EXCL_START
112 UNREACHABLE(
113 "xrpl::NodeStore::NuDBBackend::open : database is already "
114 "open");
115 JLOG(j_.error()) << "database is already open";
116 return;
117 // LCOV_EXCL_STOP
118 }
119 auto const folder = path(name_);
120 auto const dp = (folder / "nudb.dat").string();
121 auto const kp = (folder / "nudb.key").string();
122 auto const lp = (folder / "nudb.log").string();
123 nudb::error_code ec;
124 if (createIfMissing)
125 {
126 create_directories(folder);
127 nudb::create<nudb::xxhasher>(dp, kp, lp, appType, uid, salt, keyBytes_, blockSize_, 0.50, ec);
128 if (ec == nudb::errc::file_exists)
129 ec = {};
130 if (ec)
131 Throw<nudb::system_error>(ec);
132 }
133 db_.open(dp, kp, lp, ec);
134 if (ec)
135 Throw<nudb::system_error>(ec);
136
137 if (db_.appnum() != appnum)
138 Throw<std::runtime_error>("nodestore: unknown appnum");
139 db_.set_burst(burstSize_);
140 }
141
142 bool
143 isOpen() override
144 {
145 return db_.is_open();
146 }
147
148 void
149 open(bool createIfMissing) override
150 {
151 open(createIfMissing, appnum, nudb::make_uid(), nudb::make_salt());
152 }
153
154 void
155 close() override
156 {
157 if (db_.is_open())
158 {
159 nudb::error_code ec;
160 db_.close(ec);
161 if (ec)
162 {
163 // Log to make sure the nature of the error gets to the user.
164 JLOG(j_.fatal()) << "NuBD close() failed: " << ec.message();
165 Throw<nudb::system_error>(ec);
166 }
167
168 if (deletePath_)
169 {
170 boost::filesystem::remove_all(name_, ec);
171 if (ec)
172 {
173 JLOG(j_.fatal()) << "Filesystem remove_all of " << name_ << " failed with: " << ec.message();
174 }
175 }
176 }
177 }
178
179 Status
180 fetch(void const* key, std::shared_ptr<NodeObject>* pno) override
181 {
182 Status status;
183 pno->reset();
184 nudb::error_code ec;
185 db_.fetch(
186 key,
187 [key, pno, &status](void const* data, std::size_t size) {
188 nudb::detail::buffer bf;
189 auto const result = nodeobject_decompress(data, size, bf);
190 DecodedBlob decoded(key, result.first, result.second);
191 if (!decoded.wasOk())
192 {
193 status = dataCorrupt;
194 return;
195 }
196 *pno = decoded.createObject();
197 status = ok;
198 },
199 ec);
200 if (ec == nudb::error::key_not_found)
201 return notFound;
202 if (ec)
203 Throw<nudb::system_error>(ec);
204 return status;
205 }
206
209 {
211 results.reserve(hashes.size());
212 for (auto const& h : hashes)
213 {
215 Status status = fetch(h->begin(), &nObj);
216 if (status != ok)
217 results.push_back({});
218 else
219 results.push_back(nObj);
220 }
221
222 return {results, ok};
223 }
224
225 void
227 {
228 EncodedBlob e(no);
229 nudb::error_code ec;
230 nudb::detail::buffer bf;
231 auto const result = nodeobject_compress(e.getData(), e.getSize(), bf);
232 db_.insert(e.getKey(), result.first, result.second, ec);
233 if (ec && ec != nudb::error::key_exists)
234 Throw<nudb::system_error>(ec);
235 }
236
237 void
239 {
240 BatchWriteReport report;
241 report.writeCount = 1;
242 auto const start = std::chrono::steady_clock::now();
243 do_insert(no);
244 report.elapsed =
245 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start);
246 scheduler_.onBatchWrite(report);
247 }
248
249 void
250 storeBatch(Batch const& batch) override
251 {
252 BatchWriteReport report;
253 report.writeCount = batch.size();
254 auto const start = std::chrono::steady_clock::now();
255 for (auto const& e : batch)
256 do_insert(e);
257 report.elapsed =
258 std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now() - start);
259 scheduler_.onBatchWrite(report);
260 }
261
262 void
263 sync() override
264 {
265 }
266
267 void
269 {
270 auto const dp = db_.dat_path();
271 auto const kp = db_.key_path();
272 auto const lp = db_.log_path();
273 // auto const appnum = db_.appnum();
274 nudb::error_code ec;
275 db_.close(ec);
276 if (ec)
277 Throw<nudb::system_error>(ec);
278 nudb::visit(
279 dp,
280 [&](void const* key, std::size_t key_bytes, void const* data, std::size_t size, nudb::error_code&) {
281 nudb::detail::buffer bf;
282 auto const result = nodeobject_decompress(data, size, bf);
283 DecodedBlob decoded(key, result.first, result.second);
284 if (!decoded.wasOk())
285 {
286 ec = make_error_code(nudb::error::missing_value);
287 return;
288 }
289 f(decoded.createObject());
290 },
291 nudb::no_progress{},
292 ec);
293 if (ec)
294 Throw<nudb::system_error>(ec);
295 db_.open(dp, kp, lp, ec);
296 if (ec)
297 Throw<nudb::system_error>(ec);
298 }
299
300 int
301 getWriteLoad() override
302 {
303 return 0;
304 }
305
306 void
307 setDeletePath() override
308 {
309 deletePath_ = true;
310 }
311
312 void
313 verify() override
314 {
315 auto const dp = db_.dat_path();
316 auto const kp = db_.key_path();
317 auto const lp = db_.log_path();
318 nudb::error_code ec;
319 db_.close(ec);
320 if (ec)
321 Throw<nudb::system_error>(ec);
322 nudb::verify_info vi;
323 nudb::verify<nudb::xxhasher>(vi, dp, kp, 0, nudb::no_progress{}, ec);
324 if (ec)
325 Throw<nudb::system_error>(ec);
326 db_.open(dp, kp, lp, ec);
327 if (ec)
328 Throw<nudb::system_error>(ec);
329 }
330
331 int
332 fdRequired() const override
333 {
334 return 3;
335 }
336
337private:
338 static std::size_t
339 parseBlockSize(std::string const& name, Section const& keyValues, beast::Journal journal)
340 {
341 using namespace boost::filesystem;
342 auto const folder = path(name);
343 auto const kp = (folder / "nudb.key").string();
344
345 std::size_t const defaultSize = nudb::block_size(kp); // Default 4K from NuDB
346 std::size_t blockSize = defaultSize;
347 std::string blockSizeStr;
348
349 if (!get_if_exists(keyValues, "nudb_block_size", blockSizeStr))
350 {
351 return blockSize; // Early return with default
352 }
353
354 try
355 {
356 std::size_t const parsedBlockSize = beast::lexicalCastThrow<std::size_t>(blockSizeStr);
357
358 // Validate: must be power of 2 between 4K and 32K
359 if (parsedBlockSize < 4096 || parsedBlockSize > 32768 || (parsedBlockSize & (parsedBlockSize - 1)) != 0)
360 {
362 s << "Invalid nudb_block_size: " << parsedBlockSize << ". Must be power of 2 between 4096 and 32768.";
363 Throw<std::runtime_error>(s.str());
364 }
365
366 JLOG(journal.info()) << "Using custom NuDB block size: " << parsedBlockSize << " bytes";
367 return parsedBlockSize;
368 }
369 catch (std::exception const& e)
370 {
372 s << "Invalid nudb_block_size value: " << blockSizeStr << ". Error: " << e.what();
373 Throw<std::runtime_error>(s.str());
374 }
375 }
376};
377
378//------------------------------------------------------------------------------
379
380class NuDBFactory : public Factory
381{
382private:
384
385public:
386 explicit NuDBFactory(Manager& manager) : manager_(manager)
387 {
388 manager_.insert(*this);
389 }
390
392 getName() const override
393 {
394 return "NuDB";
395 }
396
399 size_t keyBytes,
400 Section const& keyValues,
402 Scheduler& scheduler,
403 beast::Journal journal) override
404 {
405 return std::make_unique<NuDBBackend>(keyBytes, keyValues, burstSize, scheduler, journal);
406 }
407
410 size_t keyBytes,
411 Section const& keyValues,
413 Scheduler& scheduler,
414 nudb::context& context,
415 beast::Journal journal) override
416 {
417 return std::make_unique<NuDBBackend>(keyBytes, keyValues, burstSize, scheduler, context, journal);
418 }
419};
420
421void
423{
424 static NuDBFactory instance{manager};
425}
426
427} // namespace NodeStore
428} // namespace xrpl
A generic endpoint for log messages.
Definition Journal.h:40
Stream fatal() const
Definition Journal.h:324
Stream error() const
Definition Journal.h:318
Stream info() const
Definition Journal.h:306
A backend used for the NodeStore.
Definition Backend.h:20
Parsed key/value blob into NodeObject components.
Definition DecodedBlob.h:19
std::shared_ptr< NodeObject > createObject()
Create a NodeObject from this data.
bool wasOk() const noexcept
Determine if the decoding was successful.
Definition DecodedBlob.h:26
Convert a NodeObject from in-memory to database format.
Definition EncodedBlob.h:36
void const * getKey() const noexcept
Definition EncodedBlob.h:89
std::size_t getSize() const noexcept
Definition EncodedBlob.h:95
void const * getData() const noexcept
Base class for backend factories.
Definition Factory.h:16
Singleton for managing NodeStore factories and back ends.
Definition Manager.h:12
virtual void insert(Factory &factory)=0
Add a factory.
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal)
std::size_t const blockSize_
void verify() override
Perform consistency checks on database.
void storeBatch(Batch const &batch) override
Store a group of objects.
int getWriteLoad() override
Estimate the number of write operations pending.
std::pair< std::vector< std::shared_ptr< NodeObject > >, Status > fetchBatch(std::vector< uint256 const * > const &hashes) override
Fetch a batch synchronously.
std::optional< std::size_t > getBlockSize() const override
Get the block size for backends that support it.
static std::size_t parseBlockSize(std::string const &name, Section const &keyValues, beast::Journal journal)
int fdRequired() const override
Returns the number of file descriptors the backend expects to need.
void store(std::shared_ptr< NodeObject > const &no) override
Store a single object.
beast::Journal const j_
static constexpr std::uint64_t appnum
NuDBBackend(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal)
std::string getName() override
Get the human-readable name of this backend.
Status fetch(void const *key, std::shared_ptr< NodeObject > *pno) override
Fetch a single object.
void do_insert(std::shared_ptr< NodeObject > const &no)
void open(bool createIfMissing) override
Open the backend.
void open(bool createIfMissing, uint64_t appType, uint64_t uid, uint64_t salt) override
Open the backend.
std::size_t const burstSize_
void close() override
Close the backend.
void setDeletePath() override
Remove contents on disk upon destruction.
bool isOpen() override
Returns true is the database is open.
void for_each(std::function< void(std::shared_ptr< NodeObject >)> f) override
Visit every object in the database This is usually called during import.
std::atomic< bool > deletePath_
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, nudb::context &context, beast::Journal journal) override
Create an instance of this factory's backend.
NuDBFactory(Manager &manager)
std::unique_ptr< Backend > createInstance(size_t keyBytes, Section const &keyValues, std::size_t burstSize, Scheduler &scheduler, beast::Journal journal) override
Create an instance of this factory's backend.
std::string getName() const override
Retrieve the name of this factory.
Scheduling for asynchronous backend activity.
virtual void onBatchWrite(BatchWriteReport const &report)=0
Reports the completion of a batch write Allows the scheduler to monitor the node store's performance.
Holds a collection of configuration values.
Definition BasicConfig.h:24
T empty(T... args)
T is_same_v
void registerNuDBFactory(Manager &manager)
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:187
Status
Return codes from Backend operations.
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition codec.h:86
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition algorithm.h:5
std::error_code make_error_code(xrpl::TokenCodecErrc e)
T get(Section const &section, std::string const &name, T const &defaultValue=T{})
Retrieve a key/value pair from a section.
@ open
We haven't closed our ledger yet, but others might have.
bool get_if_exists(Section const &section, std::string const &name, T &v)
@ no
Definition Steps.h:25
T push_back(T... args)
T reserve(T... args)
T reset(T... args)
T size(T... args)
T str(T... args)
Contains information about a batch write operation.
T what(T... args)