NuDB: limit size of mempool (RIPD-787):

Insert now blocks when the size of the memory pool exceeds a predefined
threshold. This solves the problem where sustained insertions cause the
memory pool to grow without bound.
This commit is contained in:
Vinnie Falco
2015-02-01 15:49:21 -08:00
parent 96c3292210
commit 3838d222c2
2 changed files with 96 additions and 136 deletions

View File

@@ -394,7 +394,6 @@ template <class _>
std::pair<DWORD, DWORD>
win32_file<_>::flags (file_mode mode)
{
mode = file_mode::write;
std::pair<DWORD, DWORD> result(0, 0);
switch (mode)
{

View File

@@ -20,8 +20,6 @@
#ifndef BEAST_NUDB_STORE_H_INCLUDED
#define BEAST_NUDB_STORE_H_INCLUDED
#include <beast/streams/debug_ostream.h>
#include <beast/nudb/error.h>
#include <beast/nudb/file.h>
#include <beast/nudb/mode.h>
@@ -64,71 +62,9 @@
#include <beast/nudb/README.md>
#endif
#ifndef BEAST_NUDB_DEBUG_CHECKS
# ifndef NDEBUG
# define BEAST_NUDB_DEBUG_CHECKS 0
# else
# define BEAST_NUDB_DEBUG_CHECKS 0
# endif
#endif
namespace beast {
namespace nudb {
namespace detail {
// Holds state variables of the open database.
template <class File>
struct state
{
File df;
File kf;
File lf;
path_type dp;
path_type kp;
path_type lp;
buffers b;
pool p0;
pool p1;
cache c0;
cache c1;
key_file_header const kh;
// pool commit high water mark
std::size_t pool_thresh = 0;
state (state const&) = delete;
state& operator= (state const&) = delete;
state (File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_, key_file_header const& kh_,
std::size_t arena_alloc_size);
};
template <class File>
state<File>::state (
File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_, key_file_header const& kh_,
std::size_t arena_alloc_size)
: df (std::move(df_))
, kf (std::move(kf_))
, lf (std::move(lf_))
, dp (dp_)
, kp (kp_)
, lp (lp_)
, b (kh_.block_size)
, p0 (kh_.key_size, arena_alloc_size)
, p1 (kh_.key_size, arena_alloc_size)
, c0 (kh_.key_size, kh_.block_size)
, c1 (kh_.key_size, kh_.block_size)
, kh (kh_)
{
}
} // detail
/*
TODO
@@ -169,20 +105,51 @@ private:
using clock_type =
std::chrono::steady_clock;
using shared_lock_type =
boost::shared_lock<boost::shared_mutex>;
using unique_lock_type =
boost::unique_lock<boost::shared_mutex>;
using blockbuf =
typename detail::buffers::value_type;
struct state
{
File df;
File kf;
File lf;
path_type dp;
path_type kp;
path_type lp;
detail::buffers b;
detail::pool p0;
detail::pool p1;
detail::cache c0;
detail::cache c1;
detail::key_file_header const kh;
// pool commit high water mark
std::size_t pool_thresh = 0;
state (state const&) = delete;
state& operator= (state const&) = delete;
state (File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_,
detail::key_file_header const& kh_,
std::size_t arena_alloc_size);
};
bool open_ = false;
// VFALCO Make consistency checks optional?
//bool safe_ = true; // Do consistency checks
// VFALCO Unfortunately boost::optional doesn't support
// move construction so we use unique_ptr instead.
std::unique_ptr <
detail::state<File>> s_; // State of an open database
std::unique_ptr <state> s_; // State of an open database
std::size_t frac_; // accumulates load
std::size_t thresh_; // split threshold
@@ -195,6 +162,13 @@ private:
std::thread thread_;
std::condition_variable_any cond_;
// These allow insert to block, preventing the pool
// from exceeding a limit. Currently the limit is
// baked in, and can only be reached during sustained
// insertions, such as while importing.
std::size_t commit_limit_ = 1UL * 1024 * 1024 * 1024;
std::condition_variable_any cond_limit_;
std::atomic<bool> epb_; // `true` when ep_ set
std::exception_ptr ep_;
@@ -319,16 +293,10 @@ private:
std::size_t buckets, std::size_t modulus,
detail::bulk_writer<File>& w);
void
check (std::size_t n, detail::bucket& b,
std::size_t buckets, std::size_t modulus);
detail::bucket
load (std::size_t n, detail::cache& c1,
detail::cache& c0, void* buf);
bool check();
void
commit();
@@ -338,6 +306,30 @@ private:
//------------------------------------------------------------------------------
template <class Hasher, class File>
basic_store<Hasher, File>::state::state (
File&& df_, File&& kf_, File&& lf_,
path_type const& dp_, path_type const& kp_,
path_type const& lp_,
detail::key_file_header const& kh_,
std::size_t arena_alloc_size)
: df (std::move(df_))
, kf (std::move(kf_))
, lf (std::move(lf_))
, dp (dp_)
, kp (kp_)
, lp (lp_)
, b (kh_.block_size)
, p0 (kh_.key_size, arena_alloc_size)
, p1 (kh_.key_size, arena_alloc_size)
, c0 (kh_.key_size, kh_.block_size)
, c1 (kh_.key_size, kh_.block_size)
, kh (kh_)
{
}
//------------------------------------------------------------------------------
template <class Hasher, class File>
basic_store<Hasher, File>::~basic_store()
{
@@ -384,7 +376,7 @@ basic_store<Hasher, File>::open (
verify (dh);
verify<Hasher> (kh);
verify<Hasher> (dh, kh);
auto s = std::make_unique<state<File>>(
auto s = std::make_unique<state>(
std::move(df), std::move(kf), std::move(lf),
dat_path, key_path, log_path, kh,
arena_alloc_size);
@@ -576,10 +568,22 @@ basic_store<Hasher, File>::insert (void const* key,
{
unique_lock_type m (m_);
s_->p1.insert (h, key, data, size);
bool const full =
// Did we go over the commit limit?
if (commit_limit_ > 0 &&
s_->p1.data_size() >= commit_limit_)
{
// Yes, start a new commit
cond_.notify_all();
// Wait for pool to shrink
cond_limit_.wait(m,
[this]() { return
s_->p1.data_size() <
commit_limit_; });
}
bool const notify =
s_->p1.data_size() >= s_->pool_thresh;
m.unlock();
if (full)
if (notify)
cond_.notify_all();
}
return true;
@@ -738,11 +742,6 @@ basic_store<Hasher, File>::load (
auto iter = c1.find(n);
if (iter != c1.end())
return iter->second;
#if BEAST_NUDB_DEBUG_CHECKS
if (n >= buckets_)
throw std::logic_error(
"nudb: missing bucket in cache");
#endif
iter = c0.find(n);
if (iter != c0.end())
return c1.insert (n,
@@ -755,24 +754,6 @@ basic_store<Hasher, File>::load (
return c1.insert (n, tmp)->second;
}
template <class Hasher, class File>
void
basic_store<Hasher, File>::check (
std::size_t n, detail::bucket& b,
std::size_t buckets, std::size_t modulus)
{
using namespace detail;
for (std::size_t i = 0; i < b.size(); ++i)
{
auto const e = b[i];
auto const h = hash<Hasher>(
e.key, s_->kh.key_size, s_->kh.salt);
auto const n1 = bucket_index(
h, buckets, modulus);
assert(n1 == n);
}
}
// Commit the memory pool to disk, then sync.
//
// Preconditions:
@@ -795,6 +776,8 @@ basic_store<Hasher, File>::commit()
unique_lock_type m (m_);
if (s_->p1.empty())
return;
if (s_->p1.data_size() >= commit_limit_)
cond_limit_.notify_all();
swap (s_->c1, c1);
swap (s_->p0, s_->p1);
s_->pool_thresh = std::max(
@@ -826,10 +809,6 @@ basic_store<Hasher, File>::commit()
// Write inserted data to the data file
for (auto& e : s_->p0)
{
#if BEAST_NUDB_DEBUG_CHECKS
assert (e.first.hash == hash<Hasher>(
e.first.key, s_->kh.key_size, s_->kh.salt));
#endif
// VFALCO This could be UB since other
// threads are reading other data members
// of this object in memory
@@ -848,10 +827,6 @@ basic_store<Hasher, File>::commit()
// of original and modified buckets
for (auto const e : s_->p0)
{
#if BEAST_NUDB_DEBUG_CHECKS
assert (e.first.hash == hash<Hasher>(
e.first.key, s_->kh.key_size, s_->kh.salt));
#endif
// VFALCO Should this be >= or > ?
if ((frac_ += 65536) >= thresh_)
{
@@ -862,35 +837,19 @@ basic_store<Hasher, File>::commit()
auto const n1 = buckets - (modulus / 2);
auto const n2 = buckets++;
auto b1 = load (n1, c1, s_->c0, buf2.get());
#if BEAST_NUDB_DEBUG_CHECKS
check(n1, b1, buckets, modulus);
#endif
auto b2 = c1.create (n2);
// If split spills, the writer is
// flushed which can amplify writes.
split (b1, b2, tmp, n1, n2,
buckets, modulus, w);
#if BEAST_NUDB_DEBUG_CHECKS
check(n1, b1, buckets, modulus);
check(n2, b2, buckets, modulus);
#endif
}
// insert
auto const n = bucket_index(
e.first.hash, buckets, modulus);
auto b = load (n, c1, s_->c0, buf2.get());
// This can amplify writes if it spills.
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
maybe_spill (b, w);
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
b.insert (e.second, e.first.size, e.first.key);
#if BEAST_NUDB_DEBUG_CHECKS
check(n, b, buckets, modulus);
#endif
}
w.flush();
}
@@ -950,27 +909,31 @@ template <class Hasher, class File>
void
basic_store<Hasher, File>::run()
{
auto const pred =
[this]()
{
return
! open_ ||
s_->p1.data_size() >=
s_->pool_thresh ||
s_->p1.data_size() >=
commit_limit_;
};
try
{
while (open_)
{
auto when = clock_type::now() +
std::chrono::seconds(1);
for(;;)
{
using std::chrono::seconds;
unique_lock_type m (m_);
bool const timeout =
cond_.wait_until (m, when) ==
std::cv_status::timeout;
! cond_.wait_for (m,
seconds(1), pred);
if (! open_)
break;
if (timeout ||
s_->p1.data_size() >=
s_->pool_thresh)
{
m.unlock();
commit();
}
m.unlock();
commit();
// Reclaim some memory if
// we get a spare moment.
if (timeout)
@@ -982,8 +945,6 @@ basic_store<Hasher, File>::run()
s_->c1.shrink_to_fit();
s_->c0.shrink_to_fit();
m.unlock();
when = clock_type::now() +
std::chrono::seconds(1);
}
}
}