rippled
Loading...
Searching...
No Matches
import_test.cpp
1//------------------------------------------------------------------------------
2/*
3 This file is part of rippled: https://github.com/ripple/rippled
4 Copyright (c) 2012, 2013 Ripple Labs Inc.
5
6 Permission to use, copy, modify, and/or distribute this software for any
7 purpose with or without fee is hereby granted, provided that the above
8 copyright notice and this permission notice appear in all copies.
9
10 THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
11 WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12 MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
13 ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14 WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15 ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
16 OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17*/
18//==============================================================================
19
20#include <xrpld/nodestore/detail/codec.h>
21#include <xrpl/basics/contract.h>
22#include <xrpl/beast/clock/basic_seconds_clock.h>
23#include <xrpl/beast/core/LexicalCast.h>
24#include <xrpl/beast/rfc2616.h>
25#include <xrpl/beast/unit_test.h>
26#include <boost/beast/core/string.hpp>
27#include <boost/regex.hpp>
28#include <algorithm>
29#include <chrono>
30#include <iomanip>
31#include <map>
32#include <nudb/create.hpp>
33#include <nudb/detail/format.hpp>
34#include <nudb/xxhasher.hpp>
35#include <sstream>
36
37#include <xrpld/unity/rocksdb.h>
38
39/*
40
41Math:
42
431000 gb dat file
44170 gb key file
45capacity 113 keys/bucket
46
47normal:
481,000gb data file read
4919,210gb key file read (113 * 170)
5019,210gb key file write
51
52multi(32gb):
536 passes (170/32)
546,000gb data file read
55170gb key file write
56
57
58*/
59
60namespace ripple {
61namespace NodeStore {
62
63namespace detail {
64
66{
69 std::ios::fmtflags flags_;
70 std::ios::char_type fill_;
71
72public:
74 {
77 os_.fill(fill_);
78 }
81 operator=(save_stream_state const&) = delete;
83 : os_(os)
84 , precision_(os.precision())
85 , flags_(os.flags())
86 , fill_(os.fill())
87 {
88 }
89};
90
91template <class Rep, class Period>
94{
96 using namespace std::chrono;
97 if (d < microseconds{1})
98 {
99 // use nanoseconds
100 if (d < nanoseconds{100})
101 {
102 // use floating
104 os << std::fixed << std::setprecision(1) << ns(d).count();
105 }
106 else
107 {
108 // use integral
109 os << round<nanoseconds>(d).count();
110 }
111 os << "ns";
112 }
113 else if (d < milliseconds{1})
114 {
115 // use microseconds
116 if (d < microseconds{100})
117 {
118 // use floating
120 os << std::fixed << std::setprecision(1) << ms(d).count();
121 }
122 else
123 {
124 // use integral
125 os << round<microseconds>(d).count();
126 }
127 os << "us";
128 }
129 else if (d < seconds{1})
130 {
131 // use milliseconds
132 if (d < milliseconds{100})
133 {
134 // use floating
136 os << std::fixed << std::setprecision(1) << ms(d).count();
137 }
138 else
139 {
140 // use integral
141 os << round<milliseconds>(d).count();
142 }
143 os << "ms";
144 }
145 else if (d < minutes{1})
146 {
147 // use seconds
148 if (d < seconds{100})
149 {
150 // use floating
151 using s = duration<float>;
152 os << std::fixed << std::setprecision(1) << s(d).count();
153 }
154 else
155 {
156 // use integral
157 os << round<seconds>(d).count();
158 }
159 os << "s";
160 }
161 else
162 {
163 // use minutes
164 if (d < minutes{100})
165 {
166 // use floating
168 os << std::fixed << std::setprecision(1) << m(d).count();
169 }
170 else
171 {
172 // use integral
173 os << round<minutes>(d).count();
174 }
175 os << "min";
176 }
177 return os;
178}
179
180template <class Period, class Rep>
181inline std::string
183{
185 pretty_time(ss, d);
186 return ss.str();
187}
188
189} // namespace detail
190
191//------------------------------------------------------------------------------
192
194{
195private:
197
203 bool estimate_ = false;
204
205public:
206 explicit progress(std::size_t work) : work_(work)
207 {
208 }
209
210 template <class Log>
211 void
212 operator()(Log& log, std::size_t work)
213 {
214 using namespace std::chrono;
215 auto const now = clock_type::now();
216 if (now == now_)
217 return;
218 now_ = now;
219 auto const elapsed = now - start_;
220 if (!estimate_)
221 {
222 if (elapsed < seconds(15))
223 return;
224 estimate_ = true;
225 }
226 else if (now - report_ < std::chrono::seconds(60))
227 {
228 return;
229 }
230 auto const rate = elapsed.count() / double(work);
231 clock_type::duration const remain(
232 static_cast<clock_type::duration::rep>((work_ - work) * rate));
233 log << "Remaining: " << detail::fmtdur(remain) << " (" << work << " of "
234 << work_ << " in " << detail::fmtdur(elapsed) << ", "
235 << (work - prev_) << " in " << detail::fmtdur(now - report_) << ")";
236 report_ = now;
237 prev_ = work;
238 }
239
240 template <class Log>
241 void
242 finish(Log& log)
243 {
244 log << "Total time: " << detail::fmtdur(clock_type::now() - start_);
245 }
246};
247
250{
251 // <key> '=' <value>
252 static boost::regex const re1(
253 "^" // start of line
254 "(?:\\s*)" // whitespace (optonal)
255 "([a-zA-Z][_a-zA-Z0-9]*)" // <key>
256 "(?:\\s*)" // whitespace (optional)
257 "(?:=)" // '='
258 "(?:\\s*)" // whitespace (optional)
259 "(.*\\S+)" // <value>
260 "(?:\\s*)" // whitespace (optional)
261 ,
262 boost::regex_constants::optimize);
264 auto const v = beast::rfc2616::split(s.begin(), s.end(), ',');
265 for (auto const& kv : v)
266 {
267 boost::smatch m;
268 if (!boost::regex_match(kv, m, re1))
269 Throw<std::runtime_error>("invalid parameter " + kv);
270 auto const result = map.emplace(m[1], m[2]);
271 if (!result.second)
272 Throw<std::runtime_error>("duplicate parameter " + m[1]);
273 }
274 return map;
275}
276
277//------------------------------------------------------------------------------
278
279#if RIPPLE_ROCKSDB_AVAILABLE
280
281class import_test : public beast::unit_test::suite
282{
283public:
284 void
285 run() override
286 {
287 testcase(beast::unit_test::abort_on_fail) << arg();
288
289 using namespace nudb;
290 using namespace nudb::detail;
291
292 pass();
293 auto const args = parse_args(arg());
294 bool usage = args.empty();
295
296 if (!usage && args.find("from") == args.end())
297 {
298 log << "Missing parameter: from";
299 usage = true;
300 }
301 if (!usage && args.find("to") == args.end())
302 {
303 log << "Missing parameter: to";
304 usage = true;
305 }
306 if (!usage && args.find("buffer") == args.end())
307 {
308 log << "Missing parameter: buffer";
309 usage = true;
310 }
311
312 if (usage)
313 {
314 log << "Usage:\n"
315 << "--unittest-arg=from=<from>,to=<to>,buffer=<buffer>\n"
316 << "from: RocksDB database to import from\n"
317 << "to: NuDB database to import to\n"
318 << "buffer: Buffer size (bigger is faster)\n"
319 << "NuDB database must not already exist.";
320 return;
321 }
322
323 // This controls the size of the bucket buffer.
324 // For a 1TB data file, a 32GB bucket buffer is suggested.
325 // The larger the buffer, the faster the import.
326 //
327 std::size_t const buffer_size = std::stoull(args.at("buffer"));
328 auto const from_path = args.at("from");
329 auto const to_path = args.at("to");
330
331 using hash_type = nudb::xxhasher;
332 auto const bulk_size = 64 * 1024 * 1024;
333 float const load_factor = 0.5;
334
335 auto const dp = to_path + ".dat";
336 auto const kp = to_path + ".key";
337
338 auto const start = std::chrono::steady_clock::now();
339
340 log << "from: " << from_path
341 << "\n"
342 "to: "
343 << to_path
344 << "\n"
345 "buffer: "
346 << buffer_size;
347
349 {
350 rocksdb::Options options;
351 options.create_if_missing = false;
352 options.max_open_files = 2000; // 5000?
353 rocksdb::DB* pdb = nullptr;
354 rocksdb::Status status =
355 rocksdb::DB::OpenForReadOnly(options, from_path, &pdb);
356 if (!status.ok() || !pdb)
357 Throw<std::runtime_error>(
358 "Can't open '" + from_path + "': " + status.ToString());
359 db.reset(pdb);
360 }
361 // Create data file with values
362 std::size_t nitems = 0;
363 dat_file_header dh;
364 dh.version = currentVersion;
365 dh.uid = make_uid();
366 dh.appnum = 1;
367 dh.key_size = 32;
368
369 native_file df;
370 error_code ec;
371 df.create(file_mode::append, dp, ec);
372 if (ec)
373 Throw<nudb::system_error>(ec);
374 bulk_writer<native_file> dw(df, 0, bulk_size);
375 {
376 {
377 auto os = dw.prepare(dat_file_header::size, ec);
378 if (ec)
379 Throw<nudb::system_error>(ec);
380 write(os, dh);
381 }
382 rocksdb::ReadOptions options;
383 options.verify_checksums = false;
384 options.fill_cache = false;
385 std::unique_ptr<rocksdb::Iterator> it(db->NewIterator(options));
386
387 buffer buf;
388 for (it->SeekToFirst(); it->Valid(); it->Next())
389 {
390 if (it->key().size() != 32)
391 Throw<std::runtime_error>(
392 "Unexpected key size " +
393 std::to_string(it->key().size()));
394 void const* const key = it->key().data();
395 void const* const data = it->value().data();
396 auto const size = it->value().size();
397 std::unique_ptr<char[]> clean(new char[size]);
398 std::memcpy(clean.get(), data, size);
399 filter_inner(clean.get(), size);
400 auto const out = nodeobject_compress(clean.get(), size, buf);
401 // Verify codec correctness
402 {
403 buffer buf2;
404 auto const check =
405 nodeobject_decompress(out.first, out.second, buf2);
406 BEAST_EXPECT(check.second == size);
407 BEAST_EXPECT(
408 std::memcmp(check.first, clean.get(), size) == 0);
409 }
410 // Data Record
411 auto os = dw.prepare(
412 field<uint48_t>::size + // Size
413 32 + // Key
414 out.second,
415 ec);
416 if (ec)
417 Throw<nudb::system_error>(ec);
418 write<uint48_t>(os, out.second);
419 std::memcpy(os.data(32), key, 32);
420 std::memcpy(os.data(out.second), out.first, out.second);
421 ++nitems;
422 }
423 dw.flush(ec);
424 if (ec)
425 Throw<nudb::system_error>(ec);
426 }
427 db.reset();
428 log << "Import data: "
429 << detail::fmtdur(std::chrono::steady_clock::now() - start);
430 auto const df_size = df.size(ec);
431 if (ec)
432 Throw<nudb::system_error>(ec);
433 // Create key file
434 key_file_header kh;
435 kh.version = currentVersion;
436 kh.uid = dh.uid;
437 kh.appnum = dh.appnum;
438 kh.key_size = 32;
439 kh.salt = make_salt();
440 kh.pepper = pepper<hash_type>(kh.salt);
441 kh.block_size = block_size(kp);
442 kh.load_factor = std::min<std::size_t>(65536.0 * load_factor, 65535);
443 kh.buckets =
444 std::ceil(nitems / (bucket_capacity(kh.block_size) * load_factor));
445 kh.modulus = ceil_pow2(kh.buckets);
446 native_file kf;
447 kf.create(file_mode::append, kp, ec);
448 if (ec)
449 Throw<nudb::system_error>(ec);
450 buffer buf(kh.block_size);
451 {
452 std::memset(buf.get(), 0, kh.block_size);
453 ostream os(buf.get(), kh.block_size);
454 write(os, kh);
455 kf.write(0, buf.get(), kh.block_size, ec);
456 if (ec)
457 Throw<nudb::system_error>(ec);
458 }
459 // Build contiguous sequential sections of the
460 // key file using multiple passes over the data.
461 //
462 auto const buckets =
463 std::max<std::size_t>(1, buffer_size / kh.block_size);
464 buf.reserve(buckets * kh.block_size);
465 auto const passes = (kh.buckets + buckets - 1) / buckets;
466 log << "items: " << nitems
467 << "\n"
468 "buckets: "
469 << kh.buckets
470 << "\n"
471 "data: "
472 << df_size
473 << "\n"
474 "passes: "
475 << passes;
476 progress p(df_size * passes);
477 std::size_t npass = 0;
478 for (std::size_t b0 = 0; b0 < kh.buckets; b0 += buckets)
479 {
480 auto const b1 = std::min(b0 + buckets, kh.buckets);
481 // Buffered range is [b0, b1)
482 auto const bn = b1 - b0;
483 // Create empty buckets
484 for (std::size_t i = 0; i < bn; ++i)
485 {
486 bucket b(kh.block_size, buf.get() + i * kh.block_size, empty);
487 }
488 // Insert all keys into buckets
489 // Iterate Data File
490 bulk_reader<native_file> r(
491 df, dat_file_header::size, df_size, bulk_size);
492 while (!r.eof())
493 {
494 auto const offset = r.offset();
495 // Data Record or Spill Record
497 auto is = r.prepare(field<uint48_t>::size, ec); // Size
498 if (ec)
499 Throw<nudb::system_error>(ec);
500 read<uint48_t>(is, size);
501 if (size > 0)
502 {
503 // Data Record
504 is = r.prepare(
505 dh.key_size + // Key
506 size,
507 ec); // Data
508 if (ec)
509 Throw<nudb::system_error>(ec);
510 std::uint8_t const* const key = is.data(dh.key_size);
511 auto const h = hash<hash_type>(key, kh.key_size, kh.salt);
512 auto const n = bucket_index(h, kh.buckets, kh.modulus);
513 p(log, npass * df_size + r.offset());
514 if (n < b0 || n >= b1)
515 continue;
516 bucket b(
517 kh.block_size, buf.get() + (n - b0) * kh.block_size);
518 maybe_spill(b, dw, ec);
519 if (ec)
520 Throw<nudb::system_error>(ec);
521 b.insert(offset, size, h);
522 }
523 else
524 {
525 // VFALCO Should never get here
526 // Spill Record
527 is = r.prepare(field<std::uint16_t>::size, ec);
528 if (ec)
529 Throw<nudb::system_error>(ec);
530 read<std::uint16_t>(is, size); // Size
531 r.prepare(size, ec); // skip
532 if (ec)
533 Throw<nudb::system_error>(ec);
534 }
535 }
536 kf.write(
537 (b0 + 1) * kh.block_size, buf.get(), bn * kh.block_size, ec);
538 if (ec)
539 Throw<nudb::system_error>(ec);
540 ++npass;
541 }
542 dw.flush(ec);
543 if (ec)
544 Throw<nudb::system_error>(ec);
545 p.finish(log);
546 }
547};
548
549BEAST_DEFINE_TESTSUITE_MANUAL(import, NodeStore, ripple);
550
551#endif
552
553//------------------------------------------------------------------------------
554
555} // namespace NodeStore
556} // namespace ripple
T begin(T... args)
T ceil(T... args)
A clock whose minimum resolution is one second.
typename Clock::duration duration
typename Clock::time_point time_point
A testsuite class.
Definition: suite.h:53
save_stream_state(save_stream_state const &)=delete
save_stream_state & operator=(save_stream_state const &)=delete
void operator()(Log &log, std::size_t work)
clock_type::time_point start_
clock_type::time_point report_
progress(std::size_t work)
clock_type::time_point now_
T data(T... args)
T emplace(T... args)
T end(T... args)
T fill(T... args)
T fixed(T... args)
T flags(T... args)
T log(T... args)
T memcmp(T... args)
T memcpy(T... args)
T memset(T... args)
T min(T... args)
void check(bool condition, std::string const &message)
Definition: json/Writer.h:252
Result split(FwdIt first, FwdIt last, Char delim)
Parse a character sequence of values separated by commas.
Definition: rfc2616.h:128
std::ostream & pretty_time(std::ostream &os, std::chrono::duration< Rep, Period > d)
Definition: import_test.cpp:93
std::string fmtdur(std::chrono::duration< Period, Rep > const &d)
std::map< std::string, std::string, boost::beast::iless > parse_args(std::string const &s)
void filter_inner(void *in, std::size_t in_size)
Definition: codec.h:315
void write(nudb::detail::ostream &os, std::size_t t)
Definition: varint.h:133
std::pair< void const *, std::size_t > nodeobject_decompress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:108
std::pair< void const *, std::size_t > nodeobject_compress(void const *in, std::size_t in_size, BufferFactory &&bf)
Definition: codec.h:219
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
Definition: algorithm.h:26
int run(int argc, char **argv)
Definition: Main.cpp:351
T precision(T... args)
T reset(T... args)
T setprecision(T... args)
T size(T... args)
T stoull(T... args)
T str(T... args)
T to_string(T... args)