mirror of
https://github.com/Xahau/xahaud.git
synced 2025-11-20 18:45:55 +00:00
Fix invalid shard removal
This commit is contained in:
committed by
Nik Bougalis
parent
1c3c69f8b5
commit
ce5f240551
@@ -147,10 +147,10 @@ DatabaseShardImp::init()
|
|||||||
return false;
|
return false;
|
||||||
|
|
||||||
// Remove legacy shard
|
// Remove legacy shard
|
||||||
|
shard->removeOnDestroy();
|
||||||
JLOG(j_.warn()) <<
|
JLOG(j_.warn()) <<
|
||||||
"shard " << shardIndex <<
|
"shard " << shardIndex <<
|
||||||
" incompatible legacy shard, removing";
|
" removed, legacy shard";
|
||||||
remove_all(shardDir);
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -740,12 +740,12 @@ DatabaseShardImp::import(Database& source)
|
|||||||
|
|
||||||
// Create the new shard
|
// Create the new shard
|
||||||
app_.shardFamily()->reset();
|
app_.shardFamily()->reset();
|
||||||
auto const shardDir {dir_ / std::to_string(shardIndex)};
|
|
||||||
auto shard {std::make_unique<Shard>(app_, *this, shardIndex, j_)};
|
auto shard {std::make_unique<Shard>(app_, *this, shardIndex, j_)};
|
||||||
if (!shard->open(scheduler_, *ctx_))
|
if (!shard->open(scheduler_, *ctx_))
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
// Create a marker file to signify an import in progress
|
// Create a marker file to signify an import in progress
|
||||||
|
auto const shardDir {dir_ / std::to_string(shardIndex)};
|
||||||
auto const markerFile {shardDir / importMarker_};
|
auto const markerFile {shardDir / importMarker_};
|
||||||
{
|
{
|
||||||
std::ofstream ofs {markerFile.string()};
|
std::ofstream ofs {markerFile.string()};
|
||||||
@@ -753,8 +753,8 @@ DatabaseShardImp::import(Database& source)
|
|||||||
{
|
{
|
||||||
JLOG(j_.error()) <<
|
JLOG(j_.error()) <<
|
||||||
"shard " << shardIndex <<
|
"shard " << shardIndex <<
|
||||||
" is unable to create temp marker file";
|
" failed to create temp marker file";
|
||||||
remove_all(shardDir);
|
shard->removeOnDestroy();
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
ofs.close();
|
ofs.close();
|
||||||
@@ -825,14 +825,14 @@ DatabaseShardImp::import(Database& source)
|
|||||||
JLOG(j_.error()) <<
|
JLOG(j_.error()) <<
|
||||||
"exception " << e.what() <<
|
"exception " << e.what() <<
|
||||||
" in function " << __func__;
|
" in function " << __func__;
|
||||||
remove_all(shardDir);
|
shard->removeOnDestroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
JLOG(j_.error()) <<
|
JLOG(j_.error()) <<
|
||||||
"shard " << shardIndex << " failed to import";
|
"shard " << shardIndex << " failed to import";
|
||||||
remove_all(shardDir);
|
shard->removeOnDestroy();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -1239,26 +1239,15 @@ DatabaseShardImp::finalizeShard(
|
|||||||
if (isStopping())
|
if (isStopping())
|
||||||
return;
|
return;
|
||||||
|
|
||||||
// Bad shard, remove it
|
// Invalid or corrupt shard, remove it
|
||||||
{
|
{
|
||||||
std::lock_guard lock(mutex_);
|
std::lock_guard lock(mutex_);
|
||||||
shards_.erase(shardIndex);
|
shards_.erase(shardIndex);
|
||||||
updateStatus(lock);
|
updateStatus(lock);
|
||||||
|
|
||||||
using namespace boost::filesystem;
|
|
||||||
path const dir {shard->getDir()};
|
|
||||||
shard.reset();
|
|
||||||
try
|
|
||||||
{
|
|
||||||
remove_all(dir);
|
|
||||||
}
|
|
||||||
catch (std::exception const& e)
|
|
||||||
{
|
|
||||||
JLOG(j_.error()) <<
|
|
||||||
"exception " << e.what() << " in function " << __func__;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shard->removeOnDestroy();
|
||||||
|
shard.reset();
|
||||||
setFileStats();
|
setFileStats();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -1404,28 +1393,19 @@ DatabaseShardImp::storeLedgerInShard(
|
|||||||
|
|
||||||
if (!shard->store(ledger))
|
if (!shard->store(ledger))
|
||||||
{
|
{
|
||||||
// Shard may be corrupt, remove it
|
// Invalid or corrupt shard, remove it
|
||||||
std::lock_guard lock(mutex_);
|
{
|
||||||
|
std::lock_guard lock(mutex_);
|
||||||
|
shards_.erase(shard->index());
|
||||||
|
|
||||||
shards_.erase(shard->index());
|
if (shard->index() == acquireIndex_)
|
||||||
if (shard->index() == acquireIndex_)
|
acquireIndex_ = 0;
|
||||||
acquireIndex_ = 0;
|
|
||||||
|
|
||||||
updateStatus(lock);
|
updateStatus(lock);
|
||||||
|
}
|
||||||
|
|
||||||
using namespace boost::filesystem;
|
shard->removeOnDestroy();
|
||||||
path const dir {shard->getDir()};
|
|
||||||
shard.reset();
|
shard.reset();
|
||||||
try
|
|
||||||
{
|
|
||||||
remove_all(dir);
|
|
||||||
}
|
|
||||||
catch (std::exception const& e)
|
|
||||||
{
|
|
||||||
JLOG(j_.error()) <<
|
|
||||||
"exception " << e.what() << " in function " << __func__;
|
|
||||||
}
|
|
||||||
|
|
||||||
result = false;
|
result = false;
|
||||||
}
|
}
|
||||||
else if (shard->isBackendComplete())
|
else if (shard->isBackendComplete())
|
||||||
|
|||||||
@@ -52,6 +52,29 @@ Shard::Shard(
|
|||||||
Throw<std::runtime_error>("Shard: Invalid index");
|
Throw<std::runtime_error>("Shard: Invalid index");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Shard::~Shard()
|
||||||
|
{
|
||||||
|
if (removeOnDestroy_)
|
||||||
|
{
|
||||||
|
backend_.reset();
|
||||||
|
lgrSQLiteDB_.reset();
|
||||||
|
txSQLiteDB_.reset();
|
||||||
|
acquireInfo_.reset();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
boost::filesystem::remove_all(dir_);
|
||||||
|
}
|
||||||
|
catch (std::exception const& e)
|
||||||
|
{
|
||||||
|
JLOG(j_.error()) <<
|
||||||
|
"shard " << index_ <<
|
||||||
|
" exception " << e.what() <<
|
||||||
|
" in function " << __func__;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool
|
bool
|
||||||
Shard::open(Scheduler& scheduler, nudb::context& ctx)
|
Shard::open(Scheduler& scheduler, nudb::context& ctx)
|
||||||
{
|
{
|
||||||
|
|||||||
@@ -57,6 +57,8 @@ public:
|
|||||||
std::uint32_t index,
|
std::uint32_t index,
|
||||||
beast::Journal j);
|
beast::Journal j);
|
||||||
|
|
||||||
|
~Shard();
|
||||||
|
|
||||||
bool
|
bool
|
||||||
open(Scheduler& scheduler, nudb::context& ctx);
|
open(Scheduler& scheduler, nudb::context& ctx);
|
||||||
|
|
||||||
@@ -125,6 +127,12 @@ public:
|
|||||||
void
|
void
|
||||||
stop() {stop_ = true;}
|
stop() {stop_ = true;}
|
||||||
|
|
||||||
|
/** If called, the shard directory will be removed when
|
||||||
|
the shard is destroyed.
|
||||||
|
*/
|
||||||
|
void
|
||||||
|
removeOnDestroy() {removeOnDestroy_ = true;}
|
||||||
|
|
||||||
// Current shard version
|
// Current shard version
|
||||||
static constexpr std::uint32_t version {2};
|
static constexpr std::uint32_t version {2};
|
||||||
|
|
||||||
@@ -202,6 +210,9 @@ private:
|
|||||||
// Determines if the shard needs to stop processing for shutdown
|
// Determines if the shard needs to stop processing for shutdown
|
||||||
std::atomic<bool> stop_ {false};
|
std::atomic<bool> stop_ {false};
|
||||||
|
|
||||||
|
// Determines if the shard directory should be removed in the destructor
|
||||||
|
std::atomic<bool> removeOnDestroy_ {false};
|
||||||
|
|
||||||
// Set the backend cache
|
// Set the backend cache
|
||||||
// Lock over mutex_ required
|
// Lock over mutex_ required
|
||||||
void
|
void
|
||||||
|
|||||||
Reference in New Issue
Block a user