NuDB: limit size of mempool (RIPD-787):

Insert now blocks when the size of the memory pool exceeds a predefined
threshold. This solves the problem where sustained insertions cause the
memory pool to grow without bound.
This commit is contained in:
Vinnie Falco
2015-02-01 15:49:21 -08:00
parent 0f1b831de7
commit 9a0c71d4a7
2 changed files with 96 additions and 136 deletions

View File

@@ -394,7 +394,6 @@ template <class _>
std::pair<DWORD, DWORD> std::pair<DWORD, DWORD>
win32_file<_>::flags (file_mode mode) win32_file<_>::flags (file_mode mode)
{ {
mode = file_mode::write;
std::pair<DWORD, DWORD> result(0, 0); std::pair<DWORD, DWORD> result(0, 0);
switch (mode) switch (mode)
{ {

View File

@@ -20,8 +20,6 @@
#ifndef BEAST_NUDB_STORE_H_INCLUDED #ifndef BEAST_NUDB_STORE_H_INCLUDED
#define BEAST_NUDB_STORE_H_INCLUDED #define BEAST_NUDB_STORE_H_INCLUDED
#include <beast/streams/debug_ostream.h>
#include <beast/nudb/error.h> #include <beast/nudb/error.h>
#include <beast/nudb/file.h> #include <beast/nudb/file.h>
#include <beast/nudb/mode.h> #include <beast/nudb/mode.h>
@@ -64,71 +62,9 @@
#include <beast/nudb/README.md> #include <beast/nudb/README.md>
#endif #endif
#ifndef BEAST_NUDB_DEBUG_CHECKS
# ifndef NDEBUG
# define BEAST_NUDB_DEBUG_CHECKS 0
# else
# define BEAST_NUDB_DEBUG_CHECKS 0
# endif
#endif
namespace beast { namespace beast {
namespace nudb { namespace nudb {
namespace detail {
// Holds state variables of the open database.
template <class File>
struct state
{
File df;
File kf;
File lf;
path_type dp;
path_type kp;
path_type lp;
buffers b;
pool p0;
pool p1;
cache c0;
cache c1;
key_file_header const kh;
// pool commit high water mark
std::size_t pool_thresh = 0;
state (state const&) = delete;
state& operator= (state const&) = delete;
state (File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_, key_file_header const& kh_,
std::size_t arena_alloc_size);
};
template <class File>
state<File>::state (
File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_, key_file_header const& kh_,
std::size_t arena_alloc_size)
: df (std::move(df_))
, kf (std::move(kf_))
, lf (std::move(lf_))
, dp (dp_)
, kp (kp_)
, lp (lp_)
, b (kh_.block_size)
, p0 (kh_.key_size, arena_alloc_size)
, p1 (kh_.key_size, arena_alloc_size)
, c0 (kh_.key_size, kh_.block_size)
, c1 (kh_.key_size, kh_.block_size)
, kh (kh_)
{
}
} // detail
/* /*
TODO TODO
@@ -169,20 +105,51 @@ private:
using clock_type = using clock_type =
std::chrono::steady_clock; std::chrono::steady_clock;
using shared_lock_type = using shared_lock_type =
boost::shared_lock<boost::shared_mutex>; boost::shared_lock<boost::shared_mutex>;
using unique_lock_type = using unique_lock_type =
boost::unique_lock<boost::shared_mutex>; boost::unique_lock<boost::shared_mutex>;
using blockbuf = using blockbuf =
typename detail::buffers::value_type; typename detail::buffers::value_type;
struct state
{
File df;
File kf;
File lf;
path_type dp;
path_type kp;
path_type lp;
detail::buffers b;
detail::pool p0;
detail::pool p1;
detail::cache c0;
detail::cache c1;
detail::key_file_header const kh;
// pool commit high water mark
std::size_t pool_thresh = 0;
state (state const&) = delete;
state& operator= (state const&) = delete;
state (File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_,
detail::key_file_header const& kh_,
std::size_t arena_alloc_size);
};
bool open_ = false; bool open_ = false;
// VFALCO Make consistency checks optional? // VFALCO Make consistency checks optional?
//bool safe_ = true; // Do consistency checks //bool safe_ = true; // Do consistency checks
// VFALCO Unfortunately boost::optional doesn't support // VFALCO Unfortunately boost::optional doesn't support
// move construction so we use unique_ptr instead. // move construction so we use unique_ptr instead.
std::unique_ptr < std::unique_ptr <state> s_; // State of an open database
detail::state<File>> s_; // State of an open database
std::size_t frac_; // accumulates load std::size_t frac_; // accumulates load
std::size_t thresh_; // split threshold std::size_t thresh_; // split threshold
@@ -195,6 +162,13 @@ private:
std::thread thread_; std::thread thread_;
std::condition_variable_any cond_; std::condition_variable_any cond_;
// These allow insert to block, preventing the pool
// from exceeding a limit. Currently the limit is
// baked in, and can only be reached during sustained
// insertions, such as while importing.
std::size_t commit_limit_ = 1UL * 1024 * 1024 * 1024;
std::condition_variable_any cond_limit_;
std::atomic<bool> epb_; // `true` when ep_ set std::atomic<bool> epb_; // `true` when ep_ set
std::exception_ptr ep_; std::exception_ptr ep_;
@@ -319,16 +293,10 @@ private:
std::size_t buckets, std::size_t modulus, std::size_t buckets, std::size_t modulus,
detail::bulk_writer<File>& w); detail::bulk_writer<File>& w);
void
check (std::size_t n, detail::bucket& b,
std::size_t buckets, std::size_t modulus);
detail::bucket detail::bucket
load (std::size_t n, detail::cache& c1, load (std::size_t n, detail::cache& c1,
detail::cache& c0, void* buf); detail::cache& c0, void* buf);
bool check();
void void
commit(); commit();
@@ -338,6 +306,30 @@ private:
//------------------------------------------------------------------------------ //------------------------------------------------------------------------------
template <class Hasher, class File>
basic_store<Hasher, File>::state::state (
File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_,
detail::key_file_header const& kh_,
std::size_t arena_alloc_size)
: df (std::move(df_))
, kf (std::move(kf_))
, lf (std::move(lf_))
, dp (dp_)
, kp (kp_)
, lp (lp_)
, b (kh_.block_size)
, p0 (kh_.key_size, arena_alloc_size)
, p1 (kh_.key_size, arena_alloc_size)
, c0 (kh_.key_size, kh_.block_size)
, c1 (kh_.key_size, kh_.block_size)
, kh (kh_)
{
}
//------------------------------------------------------------------------------
template <class Hasher, class File> template <class Hasher, class File>
basic_store<Hasher, File>::~basic_store() basic_store<Hasher, File>::~basic_store()
{ {
@@ -384,7 +376,7 @@ basic_store<Hasher, File>::open (
verify (dh); verify (dh);
verify<Hasher> (kh); verify<Hasher> (kh);
verify<Hasher> (dh, kh); verify<Hasher> (dh, kh);
auto s = std::make_unique<state<File>>( auto s = std::make_unique<state>(
std::move(df), std::move(kf), std::move(lf), std::move(df), std::move(kf), std::move(lf),
dat_path, key_path, log_path, kh, dat_path, key_path, log_path, kh,
arena_alloc_size); arena_alloc_size);
@@ -576,10 +568,22 @@ basic_store<Hasher, File>::insert (void const* key,
{ {
unique_lock_type m (m_); unique_lock_type m (m_);
s_->p1.insert (h, key, data, size); s_->p1.insert (h, key, data, size);
bool const full = // Did we go over the commit limit?
if (commit_limit_ > 0 &&
s_->p1.data_size() >= commit_limit_)
{
// Yes, start a new commit
cond_.notify_all();
// Wait for pool to shrink
cond_limit_.wait(m,
[this]() { return
s_->p1.data_size() <
commit_limit_; });
}
bool const notify =
s_->p1.data_size() >= s_->pool_thresh; s_->p1.data_size() >= s_->pool_thresh;
m.unlock(); m.unlock();
if (full) if (notify)
cond_.notify_all(); cond_.notify_all();
} }
return true; return true;
@@ -738,11 +742,6 @@ basic_store<Hasher, File>::load (
auto iter = c1.find(n); auto iter = c1.find(n);
if (iter != c1.end()) if (iter != c1.end())
return iter->second; return iter->second;
#if BEAST_NUDB_DEBUG_CHECKS
if (n >= buckets_)
throw std::logic_error(
"nudb: missing bucket in cache");
#endif
iter = c0.find(n); iter = c0.find(n);
if (iter != c0.end()) if (iter != c0.end())
return c1.insert (n, return c1.insert (n,
@@ -755,24 +754,6 @@ basic_store<Hasher, File>::load (
return c1.insert (n, tmp)->second; return c1.insert (n, tmp)->second;
} }
template <class Hasher, class File>
void
basic_store<Hasher, File>::check (
std::size_t n, detail::bucket& b,
std::size_t buckets, std::size_t modulus)
{
using namespace detail;
for (std::size_t i = 0; i < b.size(); ++i)
{
auto const e = b[i];
auto const h = hash<Hasher>(
e.key, s_->kh.key_size, s_->kh.salt);
auto const n1 = bucket_index(
h, buckets, modulus);
assert(n1 == n);
}
}
// Commit the memory pool to disk, then sync. // Commit the memory pool to disk, then sync.
// //
// Preconditions: // Preconditions:
@@ -795,6 +776,8 @@ basic_store<Hasher, File>::commit()
unique_lock_type m (m_); unique_lock_type m (m_);
if (s_->p1.empty()) if (s_->p1.empty())
return; return;
if (s_->p1.data_size() >= commit_limit_)
cond_limit_.notify_all();
swap (s_->c1, c1); swap (s_->c1, c1);
swap (s_->p0, s_->p1); swap (s_->p0, s_->p1);
s_->pool_thresh = std::max( s_->pool_thresh = std::max(
@@ -826,10 +809,6 @@ basic_store<Hasher, File>::commit()
// Write inserted data to the data file // Write inserted data to the data file
for (auto& e : s_->p0) for (auto& e : s_->p0)
{ {
#if BEAST_NUDB_DEBUG_CHECKS
assert (e.first.hash == hash<Hasher>(
e.first.key, s_->kh.key_size, s_->kh.salt));
#endif
// VFALCO This could be UB since other // VFALCO This could be UB since other
// threads are reading other data members // threads are reading other data members
// of this object in memory // of this object in memory
@@ -848,10 +827,6 @@ basic_store<Hasher, File>::commit()
// of original and modified buckets // of original and modified buckets
for (auto const e : s_->p0) for (auto const e : s_->p0)
{ {
#if BEAST_NUDB_DEBUG_CHECKS
assert (e.first.hash == hash<Hasher>(
e.first.key, s_->kh.key_size, s_->kh.salt));
#endif
// VFALCO Should this be >= or > ? // VFALCO Should this be >= or > ?
if ((frac_ += 65536) >= thresh_) if ((frac_ += 65536) >= thresh_)
{ {
@@ -862,35 +837,19 @@ basic_store<Hasher, File>::commit()
auto const n1 = buckets - (modulus / 2); auto const n1 = buckets - (modulus / 2);
auto const n2 = buckets++; auto const n2 = buckets++;
auto b1 = load (n1, c1, s_->c0, buf2.get()); auto b1 = load (n1, c1, s_->c0, buf2.get());
#if BEAST_NUDB_DEBUG_CHECKS
check(n1, b1, buckets, modulus);
#endif
auto b2 = c1.create (n2); auto b2 = c1.create (n2);
// If split spills, the writer is // If split spills, the writer is
// flushed which can amplify writes. // flushed which can amplify writes.
split (b1, b2, tmp, n1, n2, split (b1, b2, tmp, n1, n2,
buckets, modulus, w); buckets, modulus, w);
#if BEAST_NUDB_DEBUG_CHECKS
check(n1, b1, buckets, modulus);
check(n2, b2, buckets, modulus);
#endif
} }
// insert // insert
auto const n = bucket_index( auto const n = bucket_index(
e.first.hash, buckets, modulus); e.first.hash, buckets, modulus);
auto b = load (n, c1, s_->c0, buf2.get()); auto b = load (n, c1, s_->c0, buf2.get());
// This can amplify writes if it spills. // This can amplify writes if it spills.
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
maybe_spill (b, w); maybe_spill (b, w);
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
b.insert (e.second, e.first.size, e.first.key); b.insert (e.second, e.first.size, e.first.key);
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
} }
w.flush(); w.flush();
} }
@@ -950,27 +909,31 @@ template <class Hasher, class File>
void void
basic_store<Hasher, File>::run() basic_store<Hasher, File>::run()
{ {
auto const pred =
[this]()
{
return
! open_ ||
s_->p1.data_size() >=
s_->pool_thresh ||
s_->p1.data_size() >=
commit_limit_;
};
try try
{ {
while (open_) while (open_)
{ {
auto when = clock_type::now() +
std::chrono::seconds(1);
for(;;) for(;;)
{ {
using std::chrono::seconds;
unique_lock_type m (m_); unique_lock_type m (m_);
bool const timeout = bool const timeout =
cond_.wait_until (m, when) == ! cond_.wait_for (m,
std::cv_status::timeout; seconds(1), pred);
if (! open_) if (! open_)
break; break;
if (timeout ||
s_->p1.data_size() >=
s_->pool_thresh)
{
m.unlock(); m.unlock();
commit(); commit();
}
// Reclaim some memory if // Reclaim some memory if
// we get a spare moment. // we get a spare moment.
if (timeout) if (timeout)
@@ -982,8 +945,6 @@ basic_store<Hasher, File>::run()
s_->c1.shrink_to_fit(); s_->c1.shrink_to_fit();
s_->c0.shrink_to_fit(); s_->c0.shrink_to_fit();
m.unlock(); m.unlock();
when = clock_type::now() +
std::chrono::seconds(1);
} }
} }
} }