Add shard thread safety

This commit is contained in:
Miguel Portilla
2019-08-09 11:57:07 -04:00
committed by Nik Bougalis
parent 66fad62e66
commit 22c9de487a
4 changed files with 424 additions and 385 deletions

View File

@@ -75,9 +75,9 @@ DatabaseShardImp::init()
using namespace boost::beast::detail;
std::lock_guard lock(m_);
auto fail = [this](std::string const& msg)
auto fail = [j = j_](std::string const& msg)
{
JLOG(j_.error()) <<
JLOG(j.error()) <<
"[" << ConfigSection::shardDatabase() << "] " << msg;
return false;
};
@@ -293,10 +293,9 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
std::lock_guard lock(m_);
assert(init_);
auto fail = [this, shardIndex](std::string const& msg)
auto fail = [j = j_, shardIndex](std::string const& msg)
{
JLOG(j_.error()) <<
"shard " << shardIndex << " " << msg;
JLOG(j.error()) << "shard " << shardIndex << " " << msg;
return false;
};
@@ -421,78 +420,76 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
return true;
};
std::unique_lock<std::mutex> lock(m_);
assert(init_);
// Check shard is prepared
auto it {preShards_.find(shardIndex)};
if(it == preShards_.end())
{
std::unique_lock<std::mutex> lock(m_);
assert(init_);
// Check shard is prepared
auto it {preShards_.find(shardIndex)};
if(it == preShards_.end())
{
JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index";
return false;
}
// Move source directory to the shard database directory
auto const dstDir {dir_ / std::to_string(shardIndex)};
if (!move(srcDir, dstDir))
return false;
// Create the new shard
auto shard {std::make_unique<Shard>(app_, *this, shardIndex, j_)};
auto fail = [&](std::string const& msg)
{
if (!msg.empty())
{
JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
}
shard.reset();
move(dstDir, srcDir);
return false;
};
if (!shard->open(scheduler_, *ctx_))
return fail({});
if (!shard->complete())
return fail("is incomplete");
try
{
// Verify database integrity
shard->getBackend()->verify();
}
catch (std::exception const& e)
{
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
// Validate shard ledgers
if (validate)
{
// Shard validation requires releasing the lock
// so the database can fetch data from it
it->second = shard.get();
lock.unlock();
auto const valid {shard->validate()};
lock.lock();
if (!valid)
{
it = preShards_.find(shardIndex);
if(it != preShards_.end())
it->second = nullptr;
return fail("failed validation");
}
}
// Add the shard
complete_.emplace(shardIndex, std::move(shard));
preShards_.erase(shardIndex);
JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index";
return false;
}
std::lock_guard<std::mutex> lock(m_);
setFileStats(lock);
updateStatus(lock);
// Move source directory to the shard database directory
auto const dstDir {dir_ / std::to_string(shardIndex)};
if (!move(srcDir, dstDir))
return false;
// Create the new shard
auto shard {std::make_unique<Shard>(app_, *this, shardIndex, j_)};
auto fail = [&](std::string const& msg)
{
if (!msg.empty())
{
JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
}
shard.reset();
move(dstDir, srcDir);
return false;
};
if (!shard->open(scheduler_, *ctx_))
return fail({});
if (!shard->complete())
return fail("is incomplete");
try
{
// Verify database integrity
shard->getBackend()->verify();
}
catch (std::exception const& e)
{
return fail(std::string("exception ") +
e.what() + " in function " + __func__);
}
// Validate shard ledgers
if (validate)
{
// Shard validation requires releasing the lock
// so the database can fetch data from it
it->second = shard.get();
lock.unlock();
auto const valid {shard->validate()};
lock.lock();
if (!valid)
{
it = preShards_.find(shardIndex);
if(it != preShards_.end())
it->second = nullptr;
return fail("failed validation");
}
}
// Add the shard
complete_.emplace(shardIndex, std::move(shard));
preShards_.erase(shardIndex);
std::lock_guard lockg(*lock.release(), std::adopt_lock);
setFileStats(lockg);
updateStatus(lockg);
return true;
}
@@ -552,9 +549,9 @@ void
DatabaseShardImp::setStored(std::shared_ptr<Ledger const> const& ledger)
{
auto const shardIndex {seqToShardIndex(ledger->info().seq)};
auto fail = [this, shardIndex](std::string const& msg)
auto fail = [j = j_, shardIndex](std::string const& msg)
{
JLOG(j_.error()) << "shard " << shardIndex << " " << msg;
JLOG(j.error()) << "shard " << shardIndex << " " << msg;
};
if (ledger->info().hash.isZero())
@@ -622,36 +619,28 @@ DatabaseShardImp::getCompleteShards()
void
DatabaseShardImp::validate()
{
std::vector<std::shared_ptr<Shard>> completeShards;
{
std::lock_guard lock(m_);
assert(init_);
if (complete_.empty() && !incomplete_)
if (complete_.empty())
{
JLOG(j_.error()) << "no shards found to validate";
return;
}
std::string s {"Found shards "};
for (auto const& e : complete_)
s += std::to_string(e.second->index()) + ",";
if (incomplete_)
s += std::to_string(incomplete_->index());
else
s.pop_back();
JLOG(j_.debug()) << s;
JLOG(j_.debug()) << "Validating shards " << status_;
completeShards.reserve(complete_.size());
for (auto const& shard : complete_)
completeShards.push_back(shard.second);
}
for (auto& e : complete_)
{
app_.shardFamily()->reset();
e.second->validate();
}
if (incomplete_)
{
app_.shardFamily()->reset();
incomplete_->validate();
}
// Verify each complete stored shard
for (auto const& shard : completeShards)
shard->validate();
app_.shardFamily()->reset();
}
@@ -1116,29 +1105,15 @@ DatabaseShardImp::setFileStats(std::lock_guard<std::mutex>&)
void
DatabaseShardImp::updateStatus(std::lock_guard<std::mutex>&)
{
status_.clear();
status_.reserve(complete_.size() * 8);
for (auto it = complete_.begin(); it != complete_.end(); ++it)
if (!complete_.empty())
{
if (it == complete_.begin())
status_ = std::to_string(it->first);
else
{
if (it->first - std::prev(it)->first > 1)
{
if (status_.back() == '-')
status_ += std::to_string(std::prev(it)->first);
status_ += ',' + std::to_string(it->first);
}
else
{
if (status_.back() != '-')
status_ += '-';
if (std::next(it) == complete_.end())
status_ += std::to_string(it->first);
}
}
RangeSet<std::uint32_t> rs;
for (auto const& e : complete_)
rs.insert(e.second->index());
status_ = to_string(rs);
}
else
status_.clear();
}
std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>