#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #ifndef NODESTORE_TIMING_DO_VERIFY #define NODESTORE_TIMING_DO_VERIFY 0 #endif namespace xrpl { namespace NodeStore { std::unique_ptr make_Backend(Section const& config, Scheduler& scheduler, beast::Journal journal) { return Manager::instance().make_Backend(config, megabytes(4), scheduler, journal); } // Fill memory with random bits template static void rngcpy(void* buffer, std::size_t bytes, Generator& g) { using result_type = typename Generator::result_type; while (bytes >= sizeof(result_type)) { auto const v = g(); memcpy(buffer, &v, sizeof(v)); buffer = reinterpret_cast(buffer) + sizeof(v); bytes -= sizeof(v); } if (bytes > 0) { auto const v = g(); memcpy(buffer, &v, bytes); } } // Instance of node factory produces a deterministic sequence // of random NodeObjects within the given class Sequence { private: enum { minLedger = 1, maxLedger = 1000000, minSize = 250, maxSize = 1250 }; beast::xor_shift_engine gen_; std::uint8_t prefix_; std::discrete_distribution d_type_; std::uniform_int_distribution d_size_; public: explicit Sequence(std::uint8_t prefix) : prefix_(prefix) // uniform distribution over hotLEDGER - hotTRANSACTION_NODE // but exclude hotTRANSACTION = 2 (removed) , d_type_({1, 1, 0, 1, 1}) , d_size_(minSize, maxSize) { } // Returns the n-th key uint256 key(std::size_t n) { gen_.seed(n + 1); uint256 result; rngcpy(&*result.begin(), result.size(), gen_); return result; } // Returns the n-th complete NodeObject std::shared_ptr obj(std::size_t n) { gen_.seed(n + 1); uint256 key; auto const data = static_cast(&*key.begin()); *data = prefix_; rngcpy(data + 1, key.size() - 1, gen_); Blob value(d_size_(gen_)); rngcpy(&value[0], value.size(), gen_); return NodeObject::createObject( safe_cast(d_type_(gen_)), std::move(value), key); } // returns a batch of NodeObjects starting at n void batch(std::size_t n, Batch& b, std::size_t size) { b.clear(); b.reserve(size); while (size--) b.emplace_back(obj(n++)); } }; //---------------------------------------------------------------------------------- class Timing_test : public beast::unit_test::suite { public: enum { // percent of fetches for missing nodes missingNodePercent = 20 }; std::size_t const default_repeat = 3; #ifndef NDEBUG std::size_t const default_items = 10000; #else std::size_t const default_items = 100000; // release #endif using clock_type = std::chrono::steady_clock; using duration_type = std::chrono::milliseconds; struct Params { std::size_t items; std::size_t threads; }; static std::string to_string(Section const& config) { std::string s; for (auto iter = config.begin(); iter != config.end(); ++iter) s += (iter != config.begin() ? "," : "") + iter->first + "=" + iter->second; return s; } static std::string to_string(duration_type const& d) { std::stringstream ss; ss << std::fixed << std::setprecision(3) << (d.count() / 1000.) << "s"; return ss.str(); } static Section parse(std::string s) { Section section; std::vector v; boost::split(v, s, boost::algorithm::is_any_of(",")); section.append(v); return section; } //-------------------------------------------------------------------------- // Workaround for GCC's parameter pack expansion in lambdas // https://gcc.gnu.org/bugzilla/show_bug.cgi?id=47226 template class parallel_for_lambda { private: std::size_t const n_; std::atomic& c_; public: parallel_for_lambda(std::size_t n, std::atomic& c) : n_(n), c_(c) { } template void operator()(Args&&... args) { Body body(args...); for (;;) { auto const i = c_++; if (i >= n_) break; body(i); } } }; /* Execute parallel-for loop. Constructs `number_of_threads` instances of `Body` with `args...` parameters and runs them on individual threads with unique loop indexes in the range [0, n). */ template void parallel_for(std::size_t const n, std::size_t number_of_threads, Args const&... args) { std::atomic c(0); std::vector t; t.reserve(number_of_threads); for (std::size_t id = 0; id < number_of_threads; ++id) t.emplace_back(*this, parallel_for_lambda(n, c), args...); for (auto& _ : t) _.join(); } template void parallel_for_id(std::size_t const n, std::size_t number_of_threads, Args const&... args) { std::atomic c(0); std::vector t; t.reserve(number_of_threads); for (std::size_t id = 0; id < number_of_threads; ++id) t.emplace_back(*this, parallel_for_lambda(n, c), id, args...); for (auto& _ : t) _.join(); } //-------------------------------------------------------------------------- // Insert only void do_insert(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); class Body { private: suite& suite_; Backend& backend_; Sequence seq_; public: explicit Body(suite& s, Backend& backend) : suite_(s), backend_(backend), seq_(1) { } void operator()(std::size_t i) { try { backend_.store(seq_.obj(i)); } catch (std::exception const& e) { suite_.fail(e.what()); } } }; try { parallel_for(params.items, params.threads, std::ref(*this), std::ref(*backend)); } catch (std::exception const&) { #if NODESTORE_TIMING_DO_VERIFY backend->verify(); #endif Rethrow(); } backend->close(); } // Fetch existing keys void do_fetch(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); class Body { private: suite& suite_; Backend& backend_; Sequence seq1_; beast::xor_shift_engine gen_; std::uniform_int_distribution dist_; public: Body(std::size_t id, suite& s, Params const& params, Backend& backend) : suite_(s), backend_(backend), seq1_(1), gen_(id + 1), dist_(0, params.items - 1) { } void operator()(std::size_t i) { try { std::shared_ptr obj; std::shared_ptr result; obj = seq1_.obj(dist_(gen_)); backend_.fetch(obj->getHash(), &result); suite_.expect(result && isSame(result, obj)); } catch (std::exception const& e) { suite_.fail(e.what()); } } }; try { parallel_for_id( params.items, params.threads, std::ref(*this), std::ref(params), std::ref(*backend)); } catch (std::exception const&) { #if NODESTORE_TIMING_DO_VERIFY backend->verify(); #endif Rethrow(); } backend->close(); } // Perform lookups of non-existent keys void do_missing(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); class Body { private: suite& suite_; // Params const& params_; Backend& backend_; Sequence seq2_; beast::xor_shift_engine gen_; std::uniform_int_distribution dist_; public: Body(std::size_t id, suite& s, Params const& params, Backend& backend) : suite_(s) //, params_ (params) , backend_(backend) , seq2_(2) , gen_(id + 1) , dist_(0, params.items - 1) { } void operator()(std::size_t i) { try { auto const hash = seq2_.key(i); std::shared_ptr result; backend_.fetch(hash, &result); suite_.expect(!result); } catch (std::exception const& e) { suite_.fail(e.what()); } } }; try { parallel_for_id( params.items, params.threads, std::ref(*this), std::ref(params), std::ref(*backend)); } catch (std::exception const&) { #if NODESTORE_TIMING_DO_VERIFY backend->verify(); #endif Rethrow(); } backend->close(); } // Fetch with present and missing keys void do_mixed(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->open(); class Body { private: suite& suite_; // Params const& params_; Backend& backend_; Sequence seq1_; Sequence seq2_; beast::xor_shift_engine gen_; std::uniform_int_distribution rand_; std::uniform_int_distribution dist_; public: Body(std::size_t id, suite& s, Params const& params, Backend& backend) : suite_(s) //, params_ (params) , backend_(backend) , seq1_(1) , seq2_(2) , gen_(id + 1) , rand_(0, 99) , dist_(0, params.items - 1) { } void operator()(std::size_t i) { try { if (rand_(gen_) < missingNodePercent) { auto const hash = seq2_.key(dist_(gen_)); std::shared_ptr result; backend_.fetch(hash, &result); suite_.expect(!result); } else { std::shared_ptr obj; std::shared_ptr result; obj = seq1_.obj(dist_(gen_)); backend_.fetch(obj->getHash(), &result); suite_.expect(result && isSame(result, obj)); } } catch (std::exception const& e) { suite_.fail(e.what()); } } }; try { parallel_for_id( params.items, params.threads, std::ref(*this), std::ref(params), std::ref(*backend)); } catch (std::exception const&) { #if NODESTORE_TIMING_DO_VERIFY backend->verify(); #endif Rethrow(); } backend->close(); } // Simulate a rippled workload: // Each thread randomly: // inserts a new key // fetches an old key // fetches recent, possibly non existent data void do_work(Section const& config, Params const& params, beast::Journal journal) { DummyScheduler scheduler; auto backend = make_Backend(config, scheduler, journal); BEAST_EXPECT(backend != nullptr); backend->setDeletePath(); backend->open(); class Body { private: suite& suite_; Params const& params_; Backend& backend_; Sequence seq1_; beast::xor_shift_engine gen_; std::uniform_int_distribution rand_; std::uniform_int_distribution recent_; std::uniform_int_distribution older_; public: Body(std::size_t id, suite& s, Params const& params, Backend& backend) : suite_(s) , params_(params) , backend_(backend) , seq1_(1) , gen_(id + 1) , rand_(0, 99) , recent_(params.items, params.items * 2 - 1) , older_(0, params.items - 1) { } void operator()(std::size_t i) { try { if (rand_(gen_) < 200) { // historical lookup std::shared_ptr obj; std::shared_ptr result; auto const j = older_(gen_); obj = seq1_.obj(j); backend_.fetch(obj->getHash(), &result); suite_.expect(result != nullptr); suite_.expect(isSame(result, obj)); } char p[2]; p[0] = rand_(gen_) < 50 ? 0 : 1; p[1] = 1 - p[0]; for (int q = 0; q < 2; ++q) { switch (p[q]) { case 0: { // fetch recent std::shared_ptr obj; std::shared_ptr result; auto const j = recent_(gen_); obj = seq1_.obj(j); backend_.fetch(obj->getHash(), &result); suite_.expect(!result || isSame(result, obj)); break; } case 1: { // insert new auto const j = i + params_.items; backend_.store(seq1_.obj(j)); break; } } } } catch (std::exception const& e) { suite_.fail(e.what()); } } }; try { parallel_for_id( params.items, params.threads, std::ref(*this), std::ref(params), std::ref(*backend)); } catch (std::exception const&) { #if NODESTORE_TIMING_DO_VERIFY backend->verify(); #endif Rethrow(); } backend->close(); } //-------------------------------------------------------------------------- using test_func = void (Timing_test::*)(Section const&, Params const&, beast::Journal); using test_list = std::vector>; duration_type do_test(test_func f, Section const& config, Params const& params, beast::Journal journal) { auto const start = clock_type::now(); (this->*f)(config, params, journal); return std::chrono::duration_cast(clock_type::now() - start); } void do_tests( std::size_t threads, test_list const& tests, std::vector const& config_strings) { using std::setw; int w = 8; for (auto const& test : tests) if (w < test.first.size()) w = test.first.size(); log << threads << " Thread" << (threads > 1 ? "s" : "") << ", " << default_items << " Objects" << std::endl; { std::stringstream ss; ss << std::left << setw(10) << "Backend" << std::right; for (auto const& test : tests) ss << " " << setw(w) << test.first; log << ss.str() << std::endl; } using namespace beast::severities; test::SuiteJournal journal("Timing_test", *this); for (auto const& config_string : config_strings) { Params params; params.items = default_items; params.threads = threads; for (auto i = default_repeat; i--;) { beast::temp_dir tempDir; Section config = parse(config_string); config.set("path", tempDir.path()); std::stringstream ss; ss << std::left << setw(10) << get(config, "type", std::string()) << std::right; for (auto const& test : tests) ss << " " << setw(w) << to_string(do_test(test.second, config, params, journal)); ss << " " << to_string(config); log << ss.str() << std::endl; } } } void run() override { testcase("Timing", beast::unit_test::abort_on_fail); /* Parameters: repeat Number of times to repeat each test items Number of objects to create in the database */ std::string default_args = "type=nudb" #if XRPL_ROCKSDB_AVAILABLE ";type=rocksdb,open_files=2000,filter_bits=12,cache_mb=256," "file_size_mb=8,file_size_mult=2" #endif #if 0 ";type=memory|path=NodeStore" #endif ; test_list const tests = { {"Insert", &Timing_test::do_insert}, {"Fetch", &Timing_test::do_fetch}, {"Missing", &Timing_test::do_missing}, {"Mixed", &Timing_test::do_mixed}, {"Work", &Timing_test::do_work}}; auto args = arg().empty() ? default_args : arg(); std::vector config_strings; boost::split(config_strings, args, boost::algorithm::is_any_of(";")); for (auto iter = config_strings.begin(); iter != config_strings.end();) if (iter->empty()) iter = config_strings.erase(iter); else ++iter; do_tests(1, tests, config_strings); do_tests(4, tests, config_strings); do_tests(8, tests, config_strings); // do_tests (16, tests, config_strings); } }; BEAST_DEFINE_TESTSUITE_MANUAL_PRIO(Timing, nodestore, xrpl, 1); } // namespace NodeStore } // namespace xrpl