Improve shards file exception handling

This commit is contained in:
Miguel Portilla
2018-07-16 12:54:52 -04:00
committed by seelabs
parent a73372cb9d
commit a7ed5bfbee
4 changed files with 222 additions and 158 deletions

View File

@@ -92,67 +92,77 @@ DatabaseShardImp::init()
return true; return true;
} }
// Find shards try
for (auto const& d : directory_iterator(dir_))
{ {
if (!is_directory(d)) // Find shards
continue; for (auto const& d : directory_iterator(dir_))
{
if (!is_directory(d))
continue;
// Validate shard directory name is numeric // Validate shard directory name is numeric
auto dirName = d.path().stem().string(); auto dirName = d.path().stem().string();
if (!std::all_of( if (!std::all_of(
dirName.begin(), dirName.begin(),
dirName.end(), dirName.end(),
[](auto c){ [](auto c) {
return ::isdigit(static_cast<unsigned char>(c)); return ::isdigit(static_cast<unsigned char>(c));
})) }))
{ {
continue; continue;
} }
auto const shardIndex {std::stoul(dirName)}; auto const shardIndex {std::stoul(dirName)};
if (shardIndex < earliestShardIndex()) if (shardIndex < earliestShardIndex())
{
JLOG(j_.fatal()) <<
"Invalid shard index " << shardIndex <<
". Earliest shard index " << earliestShardIndex();
return false;
}
// Check if a previous import failed
if (is_regular_file(dir_ / std::to_string(shardIndex) /
importMarker_))
{
JLOG(j_.warn()) <<
"shard " << shardIndex <<
" previously failed import, removing";
if (!this->remove(dir_ / std::to_string(shardIndex)))
return false;
continue;
}
auto shard = std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_);
if (!shard->open(config_, scheduler_))
return false;
usedDiskSpace_ += shard->fileSize();
if (shard->complete())
complete_.emplace(shard->index(), std::move(shard));
else
{
if (incomplete_)
{ {
JLOG(j_.fatal()) << JLOG(j_.fatal()) <<
"More than one control file found"; "Invalid shard index " << shardIndex <<
". Earliest shard index " << earliestShardIndex();
return false; return false;
} }
incomplete_ = std::move(shard);
// Check if a previous import failed
if (is_regular_file(
dir_ / std::to_string(shardIndex) / importMarker_))
{
JLOG(j_.warn()) <<
"shard " << shardIndex <<
" previously failed import, removing";
remove_all(dir_ / std::to_string(shardIndex));
continue;
}
auto shard {std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_)};
if (!shard->open(config_, scheduler_))
return false;
usedDiskSpace_ += shard->fileSize();
if (shard->complete())
complete_.emplace(shard->index(), std::move(shard));
else
{
if (incomplete_)
{
JLOG(j_.fatal()) <<
"More than one control file found";
return false;
}
incomplete_ = std::move(shard);
}
} }
} }
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
return false;
}
if (!incomplete_ && complete_.empty()) if (!incomplete_ && complete_.empty())
{ {
// New Shard Store, calculate file descriptor requirements // New Shard Store, calculate file descriptor requirements
if (maxDiskSpace_ > space(dir_).free) if (maxDiskSpace_ > available())
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Insufficient disk space"; "Insufficient disk space";
@@ -185,7 +195,7 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
canAdd_ = false; canAdd_ = false;
return boost::none; return boost::none;
} }
if (avgShardSz_ > boost::filesystem::space(dir_).free) if (avgShardSz_ > available())
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Insufficient disk space"; "Insufficient disk space";
@@ -211,9 +221,9 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
if (!incomplete_->open(config_, scheduler_)) if (!incomplete_->open(config_, scheduler_))
{ {
incomplete_.reset(); incomplete_.reset();
this->remove(dir_ / std::to_string(*shardIndex));
return boost::none; return boost::none;
} }
return incomplete_->prepare(); return incomplete_->prepare();
} }
@@ -254,6 +264,7 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
{ {
return false; return false;
} }
if (complete_.find(shardIndex) != complete_.end()) if (complete_.find(shardIndex) != complete_.end())
{ {
JLOG(j_.debug()) << JLOG(j_.debug()) <<
@@ -287,7 +298,7 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
"Exceeds maximum size"; "Exceeds maximum size";
return false; return false;
} }
if (sz > space(dir_).free) if (sz > available())
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Insufficient disk space"; "Insufficient disk space";
@@ -321,10 +332,19 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
boost::filesystem::path const& srcDir, bool validate) boost::filesystem::path const& srcDir, bool validate)
{ {
using namespace boost::filesystem; using namespace boost::filesystem;
if (!is_directory(srcDir) || is_empty(srcDir)) try
{
if (!is_directory(srcDir) || is_empty(srcDir))
{
JLOG(j_.error()) <<
"Invalid source directory " << srcDir.string();
return false;
}
}
catch (std::exception const& e)
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Invalid source directory " << srcDir.string(); "exception: " << e.what();
return false; return false;
} }
@@ -334,12 +354,10 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
{ {
rename(src, dst); rename(src, dst);
} }
catch (const filesystem_error& e) catch (std::exception const& e)
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"rename " << src.string() << "exception: " << e.what();
" to " << dst.string() <<
": Exception, " << e.code().message();
return false; return false;
} }
return true; return true;
@@ -367,27 +385,25 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
*this, shardIndex, cacheSz_, cacheAge_, j_)}; *this, shardIndex, cacheSz_, cacheAge_, j_)};
auto fail = [&](std::string msg) auto fail = [&](std::string msg)
{ {
if (!msg.empty()) JLOG(j_.error()) << msg;
{
JLOG(j_.error()) << msg;
}
shard.release(); shard.release();
move(dstDir, srcDir); move(dstDir, srcDir);
return false; return false;
}; };
if (!shard->open(config_, scheduler_)) if (!shard->open(config_, scheduler_))
return fail({}); return fail("Failure");
if (!shard->complete()) if (!shard->complete())
return fail("Incomplete shard"); return fail("Incomplete shard");
// Verify database integrity
try try
{ {
// Verify database integrity
shard->getBackend()->verify(); shard->getBackend()->verify();
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
return fail(std::string("Verify: Exception, ") + e.what()); return fail(std::string("exception: ") + e.what());
} }
// Validate shard ledgers // Validate shard ledgers
@@ -397,14 +413,14 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex,
// so the database can fetch data from it // so the database can fetch data from it
it->second = shard.get(); it->second = shard.get();
l.unlock(); l.unlock();
auto valid {shard->validate(app_)}; auto const valid {shard->validate(app_)};
l.lock(); l.lock();
if (!valid) if (!valid)
{ {
it = preShards_.find(shardIndex); it = preShards_.find(shardIndex);
if(it != preShards_.end()) if(it != preShards_.end())
it->second = nullptr; it->second = nullptr;
return fail({}); return fail("failed validation");
} }
} }
@@ -634,7 +650,7 @@ DatabaseShardImp::import(Database& source)
canAdd_ = false; canAdd_ = false;
break; break;
} }
if (avgShardSz_ > boost::filesystem::space(dir_).free) if (avgShardSz_ > available())
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"Insufficient disk space"; "Insufficient disk space";
@@ -686,7 +702,6 @@ DatabaseShardImp::import(Database& source)
if (!shard->open(config_, scheduler_)) if (!shard->open(config_, scheduler_))
{ {
shard.reset(); shard.reset();
this->remove(shardDir);
continue; continue;
} }
@@ -699,7 +714,7 @@ DatabaseShardImp::import(Database& source)
"shard " << shardIndex << "shard " << shardIndex <<
" unable to create temp marker file"; " unable to create temp marker file";
shard.reset(); shard.reset();
this->remove(shardDir); removeAll(shardDir, j_);
continue; continue;
} }
ofs.close(); ofs.close();
@@ -727,7 +742,7 @@ DatabaseShardImp::import(Database& source)
JLOG(j_.debug()) << JLOG(j_.debug()) <<
"shard " << shardIndex << "shard " << shardIndex <<
" successfully imported"; " successfully imported";
this->remove(markerFile); removeAll(markerFile, j_);
break; break;
} }
} }
@@ -738,7 +753,7 @@ DatabaseShardImp::import(Database& source)
"shard " << shardIndex << "shard " << shardIndex <<
" failed to import"; " failed to import";
shard.reset(); shard.reset();
this->remove(shardDir); removeAll(shardDir, j_);
} }
} }
@@ -1070,7 +1085,7 @@ DatabaseShardImp::updateStats(std::lock_guard<std::mutex>&)
else else
{ {
auto const sz = maxDiskSpace_ - usedDiskSpace_; auto const sz = maxDiskSpace_ - usedDiskSpace_;
if (sz > space(dir_).free) if (sz > available())
{ {
JLOG(j_.warn()) << JLOG(j_.warn()) <<
"Max Shard Store size exceeds " "Max Shard Store size exceeds "
@@ -1110,21 +1125,19 @@ DatabaseShardImp::selectCache(std::uint32_t seq)
return {}; return {};
} }
bool std::uint64_t
DatabaseShardImp::remove(boost::filesystem::path const& path) DatabaseShardImp::available() const
{ {
try try
{ {
boost::filesystem::remove_all(path); return boost::filesystem::space(dir_).available;
} }
catch (const boost::filesystem::filesystem_error& e) catch (std::exception const& e)
{ {
JLOG(j_.error()) << JLOG(j_.error()) <<
"remove_all " << path.string() << "exception: " << e.what();
": Exception, " << e.code().message(); return 0;
return false;
} }
return true;
} }
} // NodeStore } // NodeStore

View File

@@ -241,8 +241,9 @@ private:
1, static_cast<int>(complete_.size() + (incomplete_ ? 1 : 0)))); 1, static_cast<int>(complete_.size() + (incomplete_ ? 1 : 0))));
} }
bool // Returns available storage space
remove(boost::filesystem::path const& path); std::uint64_t
available() const;
}; };
} // NodeStore } // NodeStore

View File

@@ -53,15 +53,47 @@ Shard::open(Section config, Scheduler& scheduler)
{ {
assert(!backend_); assert(!backend_);
using namespace boost::filesystem; using namespace boost::filesystem;
auto const newShard {!is_directory(dir_) || is_empty(dir_)};
bool dirPreexist;
bool dirEmpty;
try
{
if (!exists(dir_))
{
dirPreexist = false;
dirEmpty = true;
}
else if (is_directory(dir_))
{
dirPreexist = true;
dirEmpty = is_empty(dir_);
}
else
{
JLOG(j_.error()) <<
"path exists as file: " << dir_.string();
return false;
}
}
catch (std::exception const& e)
{
JLOG(j_.error()) <<
"shard " + std::to_string(index_) + " exception: " + e.what();
return false;
}
auto fail = [&](std::string msg) auto fail = [&](std::string msg)
{ {
if (!msg.empty()) JLOG(j_.error()) <<
"shard " << std::to_string(index_) << " error: " << msg;
if (!dirPreexist)
removeAll(dir_, j_);
else if (dirEmpty)
{ {
JLOG(j_.error()) << msg; for (auto const& p : recursive_directory_iterator(dir_))
removeAll(p.path(), j_);
} }
if (newShard)
this->remove(dir_);
return false; return false;
}; };
@@ -70,50 +102,64 @@ Shard::open(Section config, Scheduler& scheduler)
{ {
backend_ = Manager::instance().make_Backend( backend_ = Manager::instance().make_Backend(
config, scheduler, j_); config, scheduler, j_);
backend_->open(newShard); backend_->open(!dirPreexist || dirEmpty);
if (backend_->fdlimit() == 0)
return true;
if (!dirPreexist || dirEmpty)
{
// New shard, create a control file
if (!saveControl())
return fail("failure");
}
else if (is_regular_file(control_))
{
// Incomplete shard, inspect control file
std::ifstream ifs(control_.string());
if (!ifs.is_open())
{
return fail("shard " + std::to_string(index_) +
", unable to open control file");
}
boost::archive::text_iarchive ar(ifs);
ar & storedSeqs_;
if (!storedSeqs_.empty())
{
if (boost::icl::first(storedSeqs_) < firstSeq_ ||
boost::icl::last(storedSeqs_) > lastSeq_)
{
return fail("shard " + std::to_string(index_) +
": Invalid control file");
}
if (boost::icl::length(storedSeqs_) >= maxLedgers_)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" found control file for complete shard";
storedSeqs_.clear();
complete_ = true;
remove_all(control_);
}
}
}
else
complete_ = true;
// Calculate file foot print of backend files
for (auto const& p : recursive_directory_iterator(dir_))
if (!is_directory(p))
fileSize_ += file_size(p);
} }
catch (std::exception const& e) catch (std::exception const& e)
{ {
return fail("shard " + std::to_string(index_) + JLOG(j_.error()) <<
": Exception, " + e.what()); "shard " << std::to_string(index_) << " error: " << e.what();
return false;
} }
if (backend_->fdlimit() == 0)
return true;
if (newShard)
{
if (!saveControl())
return fail({});
}
else if (is_regular_file(control_))
{
std::ifstream ifs(control_.string());
if (!ifs.is_open())
return fail("shard " + std::to_string(index_) +
": Unable to open control file");
boost::archive::text_iarchive ar(ifs);
ar & storedSeqs_;
if (!storedSeqs_.empty())
{
if (boost::icl::first(storedSeqs_) < firstSeq_ ||
boost::icl::last(storedSeqs_) > lastSeq_)
return fail("shard " + std::to_string(index_) +
": Invalid control file");
if (boost::icl::length(storedSeqs_) >= maxLedgers_)
{
JLOG(j_.error()) <<
"shard " << index_ <<
" found control file for complete shard";
storedSeqs_.clear();
this->remove(control_);
complete_ = true;
}
}
}
else
complete_ = true;
updateFileSize();
return true; return true;
} }
@@ -133,9 +179,26 @@ Shard::setStored(std::shared_ptr<Ledger const> const& l)
{ {
if (backend_->fdlimit() != 0) if (backend_->fdlimit() != 0)
{ {
if (!this->remove(control_)) if (!removeAll(control_, j_))
return false; return false;
updateFileSize();
// Update file foot print of backend files
using namespace boost::filesystem;
std::uint64_t sz {0};
try
{
for (auto const& p : recursive_directory_iterator(dir_))
if (!is_directory(p))
sz += file_size(p);
}
catch (const filesystem_error& e)
{
JLOG(j_.error()) <<
"exception: " << e.what();
fileSize_ = std::max(fileSize_, sz);
return false;
}
fileSize_ = sz;
} }
complete_ = true; complete_ = true;
storedSeqs_.clear(); storedSeqs_.clear();
@@ -406,16 +469,6 @@ Shard::valFetch(uint256 const& hash)
return nObj; return nObj;
} }
void
Shard::updateFileSize()
{
fileSize_ = 0;
using namespace boost::filesystem;
for (auto const& d : directory_iterator(dir_))
if (is_regular_file(d))
fileSize_ += file_size(d);
}
bool bool
Shard::saveControl() Shard::saveControl()
{ {
@@ -432,22 +485,5 @@ Shard::saveControl()
return true; return true;
} }
bool
Shard::remove(boost::filesystem::path const& path)
{
try
{
boost::filesystem::remove_all(path);
}
catch (const boost::filesystem::filesystem_error& e)
{
JLOG(j_.error()) <<
"remove_all " << path.string() <<
": Exception, " << e.code().message();
return false;
}
return true;
}
} // NodeStore } // NodeStore
} // ripple } // ripple

View File

@@ -34,6 +34,24 @@
namespace ripple { namespace ripple {
namespace NodeStore { namespace NodeStore {
// Removes a path in its entirety
inline static
bool
removeAll(boost::filesystem::path const& path, beast::Journal& j)
{
try
{
boost::filesystem::remove_all(path);
}
catch (std::exception const& e)
{
JLOG(j.error()) <<
"exception: " << e.what();
return false;
}
return true;
}
using PCache = TaggedCache<uint256, NodeObject>; using PCache = TaggedCache<uint256, NodeObject>;
using NCache = KeyCache<uint256>; using NCache = KeyCache<uint256>;
class DatabaseShard; class DatabaseShard;
@@ -164,10 +182,6 @@ private:
// Save the control file for an incomplete shard // Save the control file for an incomplete shard
bool bool
saveControl(); saveControl();
// Remove directory or file
bool
remove(boost::filesystem::path const& path);
}; };
} // NodeStore } // NodeStore