Use NuDB context with backends

This commit is contained in:
Miguel Portilla
2019-02-28 16:40:53 -05:00
committed by Nik Bougalis
parent 89b3bf0796
commit a988b3224f
14 changed files with 215 additions and 143 deletions

View File

@@ -34,64 +34,94 @@ namespace NodeStore {
constexpr std::uint32_t DatabaseShard::ledgersPerShardDefault;
DatabaseShardImp::DatabaseShardImp(Application& app,
std::string const& name, Stoppable& parent, Scheduler& scheduler,
int readThreads, Section const& config, beast::Journal j)
DatabaseShardImp::DatabaseShardImp(
Application& app,
std::string const& name,
Stoppable& parent,
Scheduler& scheduler,
int readThreads,
Section const& config,
beast::Journal j)
: DatabaseShard(name, parent, scheduler, readThreads, config, j)
, app_(app)
, ctx_(std::make_unique<nudb::context>())
, config_(config)
, dir_(get<std::string>(config, "path"))
, backendName_(Manager::instance().find(
get<std::string>(config_, "type"))->getName())
get<std::string>(config, "type", "nudb"))->getName())
, maxDiskSpace_(get<std::uint64_t>(config, "max_size_gb") << 30)
, ledgersPerShard_(get<std::uint32_t>(
config, "ledgers_per_shard", ledgersPerShardDefault))
, earliestShardIndex_(seqToShardIndex(earliestSeq()))
, avgShardSz_(ledgersPerShard_ * (192 * 1024))
{
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>(
"ledgers_per_shard must be a multiple of 256");
ctx_->start();
}
DatabaseShardImp::~DatabaseShardImp()
{
// Stop threads before data members are destroyed
stopThreads();
// Close backend databases before destroying the context
std::lock_guard<std::mutex> lock(m_);
complete_.clear();
if (incomplete_)
incomplete_.reset();
preShards_.clear();
ctx_.reset();
}
bool
DatabaseShardImp::init()
{
using namespace boost::filesystem;
using namespace boost::beast::detail;
std::lock_guard<std::mutex> lock(m_);
if (init_)
{
assert(false);
JLOG(j_.error()) <<
"Already initialized";
return false;
}
// Find backend type and file handle requirement
try
{
fdLimit_ = Manager::instance().make_Backend(
config_, scheduler_, j_)->fdlimit();
}
catch (std::exception const&)
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
{
JLOG(j_.error()) <<
"Invalid or missing shard store "
"type specified in [shard_db]";
"ledgers_per_shard must be a multiple of 256";
return false;
}
backed_ = static_cast<bool>(fdLimit_);
if (!backed_)
// NuDB is the default and only supported permanent storage backend
// "Memory" and "none" types are supported for tests
if (!iequals(backendName_, "NuDB") &&
!iequals(backendName_, "Memory") &&
!iequals(backendName_, "none"))
{
init_ = true;
return true;
JLOG(j_.error()) <<
"Unsupported shard store type: " << backendName_;
return false;
}
{
// Find backend file handle requirement
auto factory {Manager::instance().find(backendName_)};
if (!factory)
{
JLOG(j_.error()) <<
"Failed to create shard store type " << backendName_;
return false;
}
auto backend {factory->createInstance(NodeObject::keyBytes,
config_, scheduler_, *ctx_, j_)};
backed_ = backend->backed();
if (!backed_)
{
init_ = true;
return true;
}
fdLimit_ = backend->fdlimit();
}
try
@@ -136,7 +166,7 @@ DatabaseShardImp::init()
auto shard {std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_)};
if (!shard->open(config_, scheduler_))
if (!shard->open(config_, scheduler_, *ctx_))
return false;
usedDiskSpace_ += shard->fileSize();
@@ -220,7 +250,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
1, static_cast<int>(complete_.size() + 1)))};
incomplete_ = std::make_unique<Shard>(
*this, *shardIndex, sz, cacheAge_, j_);
if (!incomplete_->open(config_, scheduler_))
if (!incomplete_->open(config_, scheduler_, *ctx_))
{
incomplete_.reset();
return boost::none;
@@ -380,7 +410,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
if(it == preShards_.end())
{
JLOG(j_.error()) <<
"Invalid shard index " << std::to_string(shardIndex);
"Invalid shard index " << shardIndex;
return false;
}
@@ -394,16 +424,20 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
*this, shardIndex, cacheSz_, cacheAge_, j_)};
auto fail = [&](std::string msg)
{
JLOG(j_.error()) << msg;
shard.release();
if (!msg.empty())
{
JLOG(j_.error()) <<
"Import shard " << shardIndex << ": " << msg;
}
shard.reset();
move(dstDir, srcDir);
return false;
};
if (!shard->open(config_, scheduler_))
return fail("Failure");
if (!shard->open(config_, scheduler_, *ctx_))
return fail({});
if (!shard->complete())
return fail("Incomplete shard");
return fail("incomplete shard");
try
{
@@ -412,7 +446,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
}
catch (std::exception const& e)
{
return fail(std::string("exception: ") + e.what());
return fail(e.what());
}
// Validate shard ledgers
@@ -716,7 +750,7 @@ DatabaseShardImp::import(Database& source)
auto const shardDir {dir_ / std::to_string(shardIndex)};
auto shard = std::make_unique<Shard>(
*this, shardIndex, shardCacheSz, cacheAge_, j_);
if (!shard->open(config_, scheduler_))
if (!shard->open(config_, scheduler_, *ctx_))
{
shard.reset();
continue;
@@ -1087,6 +1121,7 @@ DatabaseShardImp::updateStats(std::lock_guard<std::mutex>&)
}
else if(incomplete_)
filesPerShard = incomplete_->fdlimit();
if (!backed_)
return;