From 9a0c71d4a77a40f479f2a0fab253adcdf2a2b7c1 Mon Sep 17 00:00:00 2001 From: Vinnie Falco Date: Sun, 1 Feb 2015 15:49:21 -0800 Subject: [PATCH] NuDB: limit size of mempool (RIPD-787): Insert now blocks when the size of the memory pool exceeds a predefined threshold. This solves the problem where sustained insertions cause the memory pool to grow without bound. --- src/beast/beast/nudb/detail/win32_file.h | 1 - src/beast/beast/nudb/store.h | 231 ++++++++++------------- 2 files changed, 96 insertions(+), 136 deletions(-) diff --git a/src/beast/beast/nudb/detail/win32_file.h b/src/beast/beast/nudb/detail/win32_file.h index 73f020c5fe..dc1ad28193 100644 --- a/src/beast/beast/nudb/detail/win32_file.h +++ b/src/beast/beast/nudb/detail/win32_file.h @@ -394,7 +394,6 @@ template std::pair win32_file<_>::flags (file_mode mode) { -mode = file_mode::write; std::pair result(0, 0); switch (mode) { diff --git a/src/beast/beast/nudb/store.h b/src/beast/beast/nudb/store.h index d5befef302..5530787d45 100644 --- a/src/beast/beast/nudb/store.h +++ b/src/beast/beast/nudb/store.h @@ -20,8 +20,6 @@ #ifndef BEAST_NUDB_STORE_H_INCLUDED #define BEAST_NUDB_STORE_H_INCLUDED -#include - #include #include #include @@ -64,71 +62,9 @@ #include #endif -#ifndef BEAST_NUDB_DEBUG_CHECKS -# ifndef NDEBUG -# define BEAST_NUDB_DEBUG_CHECKS 0 -# else -# define BEAST_NUDB_DEBUG_CHECKS 0 -# endif -#endif - namespace beast { namespace nudb { -namespace detail { - -// Holds state variables of the open database. -template -struct state -{ - File df; - File kf; - File lf; - path_type dp; - path_type kp; - path_type lp; - buffers b; - pool p0; - pool p1; - cache c0; - cache c1; - key_file_header const kh; - - // pool commit high water mark - std::size_t pool_thresh = 0; - - state (state const&) = delete; - state& operator= (state const&) = delete; - - state (File&& df_, File&& kf_, File&& lf_, - path_type const& dp_, path_type const& kp_, - path_type const& lp_, key_file_header const& kh_, - std::size_t arena_alloc_size); -}; - -template -state::state ( - File&& df_, File&& kf_, File&& lf_, - path_type const& dp_, path_type const& kp_, - path_type const& lp_, key_file_header const& kh_, - std::size_t arena_alloc_size) - : df (std::move(df_)) - , kf (std::move(kf_)) - , lf (std::move(lf_)) - , dp (dp_) - , kp (kp_) - , lp (lp_) - , b (kh_.block_size) - , p0 (kh_.key_size, arena_alloc_size) - , p1 (kh_.key_size, arena_alloc_size) - , c0 (kh_.key_size, kh_.block_size) - , c1 (kh_.key_size, kh_.block_size) - , kh (kh_) -{ -} - -} // detail - /* TODO @@ -169,20 +105,51 @@ private: using clock_type = std::chrono::steady_clock; + using shared_lock_type = boost::shared_lock; + using unique_lock_type = boost::unique_lock; + using blockbuf = typename detail::buffers::value_type; + struct state + { + File df; + File kf; + File lf; + path_type dp; + path_type kp; + path_type lp; + detail::buffers b; + detail::pool p0; + detail::pool p1; + detail::cache c0; + detail::cache c1; + detail::key_file_header const kh; + + // pool commit high water mark + std::size_t pool_thresh = 0; + + state (state const&) = delete; + state& operator= (state const&) = delete; + + state (File&& df_, File&& kf_, File&& lf_, + path_type const& dp_, path_type const& kp_, + path_type const& lp_, + detail::key_file_header const& kh_, + std::size_t arena_alloc_size); + }; + bool open_ = false; // VFALCO Make consistency checks optional? //bool safe_ = true; // Do consistency checks + // VFALCO Unfortunately boost::optional doesn't support // move construction so we use unique_ptr instead. - std::unique_ptr < - detail::state> s_; // State of an open database + std::unique_ptr s_; // State of an open database std::size_t frac_; // accumulates load std::size_t thresh_; // split threshold @@ -195,6 +162,13 @@ private: std::thread thread_; std::condition_variable_any cond_; + // These allow insert to block, preventing the pool + // from exceeding a limit. Currently the limit is + // baked in, and can only be reached during sustained + // insertions, such as while importing. + std::size_t commit_limit_ = 1UL * 1024 * 1024 * 1024; + std::condition_variable_any cond_limit_; + std::atomic epb_; // `true` when ep_ set std::exception_ptr ep_; @@ -319,16 +293,10 @@ private: std::size_t buckets, std::size_t modulus, detail::bulk_writer& w); - void - check (std::size_t n, detail::bucket& b, - std::size_t buckets, std::size_t modulus); - detail::bucket load (std::size_t n, detail::cache& c1, detail::cache& c0, void* buf); - bool check(); - void commit(); @@ -338,6 +306,30 @@ private: //------------------------------------------------------------------------------ +template +basic_store::state::state ( + File&& df_, File&& kf_, File&& lf_, + path_type const& dp_, path_type const& kp_, + path_type const& lp_, + detail::key_file_header const& kh_, + std::size_t arena_alloc_size) + : df (std::move(df_)) + , kf (std::move(kf_)) + , lf (std::move(lf_)) + , dp (dp_) + , kp (kp_) + , lp (lp_) + , b (kh_.block_size) + , p0 (kh_.key_size, arena_alloc_size) + , p1 (kh_.key_size, arena_alloc_size) + , c0 (kh_.key_size, kh_.block_size) + , c1 (kh_.key_size, kh_.block_size) + , kh (kh_) +{ +} + +//------------------------------------------------------------------------------ + template basic_store::~basic_store() { @@ -384,7 +376,7 @@ basic_store::open ( verify (dh); verify (kh); verify (dh, kh); - auto s = std::make_unique>( + auto s = std::make_unique( std::move(df), std::move(kf), std::move(lf), dat_path, key_path, log_path, kh, arena_alloc_size); @@ -576,10 +568,22 @@ basic_store::insert (void const* key, { unique_lock_type m (m_); s_->p1.insert (h, key, data, size); - bool const full = + // Did we go over the commit limit? + if (commit_limit_ > 0 && + s_->p1.data_size() >= commit_limit_) + { + // Yes, start a new commit + cond_.notify_all(); + // Wait for pool to shrink + cond_limit_.wait(m, + [this]() { return + s_->p1.data_size() < + commit_limit_; }); + } + bool const notify = s_->p1.data_size() >= s_->pool_thresh; m.unlock(); - if (full) + if (notify) cond_.notify_all(); } return true; @@ -738,11 +742,6 @@ basic_store::load ( auto iter = c1.find(n); if (iter != c1.end()) return iter->second; -#if BEAST_NUDB_DEBUG_CHECKS - if (n >= buckets_) - throw std::logic_error( - "nudb: missing bucket in cache"); -#endif iter = c0.find(n); if (iter != c0.end()) return c1.insert (n, @@ -755,24 +754,6 @@ basic_store::load ( return c1.insert (n, tmp)->second; } -template -void -basic_store::check ( - std::size_t n, detail::bucket& b, - std::size_t buckets, std::size_t modulus) -{ - using namespace detail; - for (std::size_t i = 0; i < b.size(); ++i) - { - auto const e = b[i]; - auto const h = hash( - e.key, s_->kh.key_size, s_->kh.salt); - auto const n1 = bucket_index( - h, buckets, modulus); - assert(n1 == n); - } -} - // Commit the memory pool to disk, then sync. // // Preconditions: @@ -795,6 +776,8 @@ basic_store::commit() unique_lock_type m (m_); if (s_->p1.empty()) return; + if (s_->p1.data_size() >= commit_limit_) + cond_limit_.notify_all(); swap (s_->c1, c1); swap (s_->p0, s_->p1); s_->pool_thresh = std::max( @@ -826,10 +809,6 @@ basic_store::commit() // Write inserted data to the data file for (auto& e : s_->p0) { - #if BEAST_NUDB_DEBUG_CHECKS - assert (e.first.hash == hash( - e.first.key, s_->kh.key_size, s_->kh.salt)); - #endif // VFALCO This could be UB since other // threads are reading other data members // of this object in memory @@ -848,10 +827,6 @@ basic_store::commit() // of original and modified buckets for (auto const e : s_->p0) { - #if BEAST_NUDB_DEBUG_CHECKS - assert (e.first.hash == hash( - e.first.key, s_->kh.key_size, s_->kh.salt)); - #endif // VFALCO Should this be >= or > ? if ((frac_ += 65536) >= thresh_) { @@ -862,35 +837,19 @@ basic_store::commit() auto const n1 = buckets - (modulus / 2); auto const n2 = buckets++; auto b1 = load (n1, c1, s_->c0, buf2.get()); - #if BEAST_NUDB_DEBUG_CHECKS - check(n1, b1, buckets, modulus); - #endif auto b2 = c1.create (n2); // If split spills, the writer is // flushed which can amplify writes. split (b1, b2, tmp, n1, n2, buckets, modulus, w); - #if BEAST_NUDB_DEBUG_CHECKS - check(n1, b1, buckets, modulus); - check(n2, b2, buckets, modulus); - #endif } // insert auto const n = bucket_index( e.first.hash, buckets, modulus); auto b = load (n, c1, s_->c0, buf2.get()); // This can amplify writes if it spills. - #if BEAST_NUDB_DEBUG_CHECKS - check(n, b, buckets, modulus); - #endif maybe_spill (b, w); - #if BEAST_NUDB_DEBUG_CHECKS - check(n, b, buckets, modulus); - #endif b.insert (e.second, e.first.size, e.first.key); - #if BEAST_NUDB_DEBUG_CHECKS - check(n, b, buckets, modulus); - #endif } w.flush(); } @@ -950,27 +909,31 @@ template void basic_store::run() { + auto const pred = + [this]() + { + return + ! open_ || + s_->p1.data_size() >= + s_->pool_thresh || + s_->p1.data_size() >= + commit_limit_; + }; try { while (open_) { - auto when = clock_type::now() + - std::chrono::seconds(1); for(;;) { + using std::chrono::seconds; unique_lock_type m (m_); bool const timeout = - cond_.wait_until (m, when) == - std::cv_status::timeout; + ! cond_.wait_for (m, + seconds(1), pred); if (! open_) break; - if (timeout || - s_->p1.data_size() >= - s_->pool_thresh) - { - m.unlock(); - commit(); - } + m.unlock(); + commit(); // Reclaim some memory if // we get a spare moment. if (timeout) @@ -982,8 +945,6 @@ basic_store::run() s_->c1.shrink_to_fit(); s_->c0.shrink_to_fit(); m.unlock(); - when = clock_type::now() + - std::chrono::seconds(1); } } }