mirror of
https://github.com/Xahau/xahaud.git
synced 2025-11-19 18:15:50 +00:00
Add shard import support to shard database
This commit is contained in:
@@ -20,9 +20,10 @@
|
||||
|
||||
#include <ripple/nodestore/impl/DatabaseShardImp.h>
|
||||
#include <ripple/app/ledger/InboundLedgers.h>
|
||||
#include <ripple/app/ledger/Ledger.h>
|
||||
#include <ripple/app/ledger/LedgerMaster.h>
|
||||
#include <ripple/basics/chrono.h>
|
||||
#include <ripple/basics/random.h>
|
||||
#include <ripple/nodestore/DummyScheduler.h>
|
||||
#include <ripple/nodestore/Manager.h>
|
||||
#include <ripple/protocol/HashPrefix.h>
|
||||
|
||||
@@ -38,6 +39,8 @@ DatabaseShardImp::DatabaseShardImp(Application& app,
|
||||
, app_(app)
|
||||
, config_(config)
|
||||
, dir_(get<std::string>(config, "path"))
|
||||
, backendName_(Manager::instance().find(
|
||||
get<std::string>(config_, "type"))->getName())
|
||||
, maxDiskSpace_(get<std::uint64_t>(config, "max_size_gb") << 30)
|
||||
, ledgersPerShard_(get<std::uint32_t>(
|
||||
config, "ledgers_per_shard", ledgersPerShardDefault))
|
||||
@@ -58,15 +61,16 @@ DatabaseShardImp::~DatabaseShardImp()
|
||||
bool
|
||||
DatabaseShardImp::init()
|
||||
{
|
||||
using namespace boost::filesystem;
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
if (init_)
|
||||
{
|
||||
assert(false);
|
||||
JLOG(j_.error()) <<
|
||||
"Already initialized";
|
||||
return false;
|
||||
}
|
||||
|
||||
using namespace boost::filesystem;
|
||||
// Find backend type and file handle requirement
|
||||
try
|
||||
{
|
||||
@@ -93,13 +97,19 @@ DatabaseShardImp::init()
|
||||
{
|
||||
if (!is_directory(d))
|
||||
continue;
|
||||
|
||||
// Validate shard directory name is numeric
|
||||
auto dirName = d.path().stem().string();
|
||||
if (!std::all_of(dirName.begin(), dirName.end(),
|
||||
[](auto c)
|
||||
{
|
||||
return ::isdigit(static_cast<unsigned char>(c));
|
||||
}))
|
||||
if (!std::all_of(
|
||||
dirName.begin(),
|
||||
dirName.end(),
|
||||
[](auto c){
|
||||
return ::isdigit(static_cast<unsigned char>(c));
|
||||
}))
|
||||
{
|
||||
continue;
|
||||
}
|
||||
|
||||
auto const shardIndex {std::stoul(dirName)};
|
||||
if (shardIndex < earliestShardIndex())
|
||||
{
|
||||
@@ -116,13 +126,14 @@ DatabaseShardImp::init()
|
||||
JLOG(j_.warn()) <<
|
||||
"shard " << shardIndex <<
|
||||
" previously failed import, removing";
|
||||
remove_all(dir_ / std::to_string(shardIndex));
|
||||
if (!this->remove(dir_ / std::to_string(shardIndex)))
|
||||
return false;
|
||||
continue;
|
||||
}
|
||||
|
||||
auto shard = std::make_unique<Shard>(
|
||||
*this, shardIndex, cacheSz_, cacheAge_, j_);
|
||||
if (!shard->open(config_, scheduler_, dir_))
|
||||
if (!shard->open(config_, scheduler_))
|
||||
return false;
|
||||
usedDiskSpace_ += shard->fileSize();
|
||||
if (shard->complete())
|
||||
@@ -156,7 +167,7 @@ DatabaseShardImp::init()
|
||||
}
|
||||
|
||||
boost::optional<std::uint32_t>
|
||||
DatabaseShardImp::prepare(std::uint32_t validLedgerSeq)
|
||||
DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
@@ -166,7 +177,7 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq)
|
||||
return boost::none;
|
||||
if (backed_)
|
||||
{
|
||||
// Create a new shard to acquire
|
||||
// Check available disk space
|
||||
if (usedDiskSpace_ + avgShardSz_ > maxDiskSpace_)
|
||||
{
|
||||
JLOG(j_.debug()) <<
|
||||
@@ -176,7 +187,7 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq)
|
||||
}
|
||||
if (avgShardSz_ > boost::filesystem::space(dir_).free)
|
||||
{
|
||||
JLOG(j_.warn()) <<
|
||||
JLOG(j_.error()) <<
|
||||
"Insufficient disk space";
|
||||
canAdd_ = false;
|
||||
return boost::none;
|
||||
@@ -197,15 +208,213 @@ DatabaseShardImp::prepare(std::uint32_t validLedgerSeq)
|
||||
1, static_cast<int>(complete_.size() + 1)))};
|
||||
incomplete_ = std::make_unique<Shard>(
|
||||
*this, *shardIndex, sz, cacheAge_, j_);
|
||||
if (!incomplete_->open(config_, scheduler_, dir_))
|
||||
if (!incomplete_->open(config_, scheduler_))
|
||||
{
|
||||
incomplete_.reset();
|
||||
remove_all(dir_ / std::to_string(*shardIndex));
|
||||
this->remove(dir_ / std::to_string(*shardIndex));
|
||||
return boost::none;
|
||||
}
|
||||
return incomplete_->prepare();
|
||||
}
|
||||
|
||||
bool
|
||||
DatabaseShardImp::prepareShard(std::uint32_t shardIndex)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
if (!canAdd_)
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Unable to add more shards to the database";
|
||||
return false;
|
||||
}
|
||||
|
||||
if (shardIndex < earliestShardIndex())
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Invalid shard index " << shardIndex;
|
||||
return false;
|
||||
}
|
||||
|
||||
// If we are synced to the network, check if the shard index
|
||||
// is greater or equal to the current shard.
|
||||
auto seqCheck = [&](std::uint32_t seq)
|
||||
{
|
||||
// seq will be greater than zero if valid
|
||||
if (seq > earliestSeq() && shardIndex >= seqToShardIndex(seq))
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Invalid shard index " << shardIndex;
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex()) ||
|
||||
!seqCheck(app_.getLedgerMaster().getCurrentLedgerIndex()))
|
||||
{
|
||||
return false;
|
||||
}
|
||||
if (complete_.find(shardIndex) != complete_.end())
|
||||
{
|
||||
JLOG(j_.debug()) <<
|
||||
"Shard index " << shardIndex <<
|
||||
" stored";
|
||||
return false;
|
||||
}
|
||||
if (incomplete_ && incomplete_->index() == shardIndex)
|
||||
{
|
||||
JLOG(j_.debug()) <<
|
||||
"Shard index " << shardIndex <<
|
||||
" is being acquired";
|
||||
return false;
|
||||
}
|
||||
if (preShards_.find(shardIndex) != preShards_.end())
|
||||
{
|
||||
JLOG(j_.debug()) <<
|
||||
"Shard index " << shardIndex <<
|
||||
" is prepared for import";
|
||||
return false;
|
||||
}
|
||||
|
||||
// Check limit and space requirements
|
||||
if (backed_)
|
||||
{
|
||||
std::uint64_t const sz {
|
||||
(preShards_.size() + 1 + (incomplete_ ? 1 : 0)) * avgShardSz_};
|
||||
if (usedDiskSpace_ + sz > maxDiskSpace_)
|
||||
{
|
||||
JLOG(j_.debug()) <<
|
||||
"Exceeds maximum size";
|
||||
return false;
|
||||
}
|
||||
if (sz > space(dir_).free)
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Insufficient disk space";
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Add to shards prepared
|
||||
preShards_.emplace(shardIndex, nullptr);
|
||||
return true;
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseShardImp::removePreShard(std::uint32_t shardIndex)
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
preShards_.erase(shardIndex);
|
||||
}
|
||||
|
||||
std::uint32_t
|
||||
DatabaseShardImp::getNumPreShard()
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
return preShards_.size();
|
||||
}
|
||||
|
||||
bool
|
||||
DatabaseShardImp::importShard(std::uint32_t shardIndex,
|
||||
boost::filesystem::path const& srcDir, bool validate)
|
||||
{
|
||||
using namespace boost::filesystem;
|
||||
if (!is_directory(srcDir) || is_empty(srcDir))
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Invalid source directory " << srcDir.string();
|
||||
return false;
|
||||
}
|
||||
|
||||
auto move = [&](path const& src, path const& dst)
|
||||
{
|
||||
try
|
||||
{
|
||||
rename(src, dst);
|
||||
}
|
||||
catch (const filesystem_error& e)
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"rename " << src.string() <<
|
||||
" to " << dst.string() <<
|
||||
": Exception, " << e.code().message();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
};
|
||||
|
||||
std::unique_lock<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
|
||||
// Check shard is prepared
|
||||
auto it {preShards_.find(shardIndex)};
|
||||
if(it == preShards_.end())
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"Invalid shard index " << std::to_string(shardIndex);
|
||||
return false;
|
||||
}
|
||||
|
||||
// Move source directory to the shard database directory
|
||||
auto const dstDir {dir_ / std::to_string(shardIndex)};
|
||||
if (!move(srcDir, dstDir))
|
||||
return false;
|
||||
|
||||
// Create the new shard
|
||||
auto shard {std::make_unique<Shard>(
|
||||
*this, shardIndex, cacheSz_, cacheAge_, j_)};
|
||||
auto fail = [&](std::string msg)
|
||||
{
|
||||
if (!msg.empty())
|
||||
{
|
||||
JLOG(j_.error()) << msg;
|
||||
}
|
||||
shard.release();
|
||||
move(dstDir, srcDir);
|
||||
return false;
|
||||
};
|
||||
if (!shard->open(config_, scheduler_))
|
||||
return fail({});
|
||||
if (!shard->complete())
|
||||
return fail("Incomplete shard");
|
||||
|
||||
// Verify database integrity
|
||||
try
|
||||
{
|
||||
shard->getBackend()->verify();
|
||||
}
|
||||
catch (std::exception const& e)
|
||||
{
|
||||
return fail(std::string("Verify: Exception, ") + e.what());
|
||||
}
|
||||
|
||||
// Validate shard ledgers
|
||||
if (validate)
|
||||
{
|
||||
// Shard validation requires releasing the lock
|
||||
// so the database can fetch data from it
|
||||
it->second = shard.get();
|
||||
l.unlock();
|
||||
auto valid {shard->validate(app_)};
|
||||
l.lock();
|
||||
if (!valid)
|
||||
{
|
||||
it = preShards_.find(shardIndex);
|
||||
if(it != preShards_.end())
|
||||
it->second = nullptr;
|
||||
return fail({});
|
||||
}
|
||||
}
|
||||
|
||||
// Add the shard
|
||||
usedDiskSpace_ += shard->fileSize();
|
||||
complete_.emplace(shardIndex, std::move(shard));
|
||||
preShards_.erase(shardIndex);
|
||||
return true;
|
||||
}
|
||||
|
||||
std::shared_ptr<Ledger>
|
||||
DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq)
|
||||
{
|
||||
@@ -320,7 +529,7 @@ DatabaseShardImp::validate()
|
||||
assert(init_);
|
||||
if (complete_.empty() && !incomplete_)
|
||||
{
|
||||
JLOG(j_.fatal()) <<
|
||||
JLOG(j_.error()) <<
|
||||
"No shards to validate";
|
||||
return;
|
||||
}
|
||||
@@ -332,7 +541,7 @@ DatabaseShardImp::validate()
|
||||
s += std::to_string(incomplete_->index());
|
||||
else
|
||||
s.pop_back();
|
||||
JLOG(j_.fatal()) << s;
|
||||
JLOG(j_.debug()) << s;
|
||||
}
|
||||
|
||||
for (auto& e : complete_)
|
||||
@@ -349,10 +558,20 @@ DatabaseShardImp::validate()
|
||||
}
|
||||
|
||||
void
|
||||
DatabaseShardImp::importNodeStore()
|
||||
DatabaseShardImp::import(Database& source)
|
||||
{
|
||||
std::unique_lock<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
|
||||
// Only the application local node store can be imported
|
||||
if (&source != &app_.getNodeStore())
|
||||
{
|
||||
assert(false);
|
||||
JLOG(j_.error()) <<
|
||||
"Invalid source database";
|
||||
return;
|
||||
}
|
||||
|
||||
std::uint32_t earliestIndex;
|
||||
std::uint32_t latestIndex;
|
||||
{
|
||||
@@ -446,7 +665,7 @@ DatabaseShardImp::importNodeStore()
|
||||
bool valid {true};
|
||||
for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256)
|
||||
{
|
||||
if (!app_.getNodeStore().fetch(ledgerHashes[n].first, n))
|
||||
if (!source.fetch(ledgerHashes[n].first, n))
|
||||
{
|
||||
JLOG(j_.warn()) <<
|
||||
"SQL DB ledger sequence " << n <<
|
||||
@@ -461,25 +680,26 @@ DatabaseShardImp::importNodeStore()
|
||||
|
||||
// Create the new shard
|
||||
app_.shardFamily()->reset();
|
||||
auto const shardDir {dir_ / std::to_string(shardIndex)};
|
||||
auto shard = std::make_unique<Shard>(
|
||||
*this, shardIndex, shardCacheSz, cacheAge_, j_);
|
||||
if (!shard->open(config_, scheduler_, dir_))
|
||||
if (!shard->open(config_, scheduler_))
|
||||
{
|
||||
shard.reset();
|
||||
remove_all(dir_ / std::to_string(shardIndex));
|
||||
this->remove(shardDir);
|
||||
continue;
|
||||
}
|
||||
|
||||
// Create a marker file to signify an import in progress
|
||||
auto f {dir_ / std::to_string(shardIndex) / importMarker_};
|
||||
std::ofstream ofs {f.string()};
|
||||
auto const markerFile {shardDir / importMarker_};
|
||||
std::ofstream ofs {markerFile.string()};
|
||||
if (!ofs.is_open())
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"shard " << shardIndex <<
|
||||
" unable to create temp file";
|
||||
" unable to create temp marker file";
|
||||
shard.reset();
|
||||
remove_all(dir_ / std::to_string(shardIndex));
|
||||
this->remove(shardDir);
|
||||
continue;
|
||||
}
|
||||
ofs.close();
|
||||
@@ -504,10 +724,10 @@ DatabaseShardImp::importNodeStore()
|
||||
|
||||
if (shard->complete())
|
||||
{
|
||||
remove(f);
|
||||
JLOG(j_.debug()) <<
|
||||
"shard " << shardIndex <<
|
||||
" successfully imported";
|
||||
this->remove(markerFile);
|
||||
break;
|
||||
}
|
||||
}
|
||||
@@ -518,7 +738,7 @@ DatabaseShardImp::importNodeStore()
|
||||
"shard " << shardIndex <<
|
||||
" failed to import";
|
||||
shard.reset();
|
||||
remove_all(dir_ / std::to_string(shardIndex));
|
||||
this->remove(shardDir);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -725,20 +945,31 @@ DatabaseShardImp::sweep()
|
||||
std::shared_ptr<NodeObject>
|
||||
DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
|
||||
{
|
||||
std::shared_ptr<Backend> backend;
|
||||
auto const shardIndex {seqToShardIndex(seq)};
|
||||
std::unique_lock<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
auto it = complete_.find(shardIndex);
|
||||
if (it != complete_.end())
|
||||
backend = it->second->getBackend();
|
||||
else if (incomplete_ && incomplete_->index() == shardIndex)
|
||||
backend = incomplete_->getBackend();
|
||||
else
|
||||
return {};
|
||||
{
|
||||
l.unlock();
|
||||
return fetchInternal(hash, *it->second->getBackend());
|
||||
}
|
||||
}
|
||||
return fetchInternal(hash, *backend);
|
||||
if (incomplete_ && incomplete_->index() == shardIndex)
|
||||
{
|
||||
l.unlock();
|
||||
return fetchInternal(hash, *incomplete_->getBackend());
|
||||
}
|
||||
|
||||
// Used to validate import shards
|
||||
auto it = preShards_.find(shardIndex);
|
||||
if (it != preShards_.end() && it->second)
|
||||
{
|
||||
l.unlock();
|
||||
return fetchInternal(hash, *it->second->getBackend());
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
boost::optional<std::uint32_t>
|
||||
@@ -763,7 +994,8 @@ DatabaseShardImp::findShardIndexToAdd(
|
||||
for (std::uint32_t i = earliestShardIndex(); i <= maxShardIndex; ++i)
|
||||
{
|
||||
if (complete_.find(i) == complete_.end() &&
|
||||
(!incomplete_ || incomplete_->index() != i))
|
||||
(!incomplete_ || incomplete_->index() != i) &&
|
||||
preShards_.find(i) == preShards_.end())
|
||||
available.push_back(i);
|
||||
}
|
||||
if (!available.empty())
|
||||
@@ -776,9 +1008,10 @@ DatabaseShardImp::findShardIndexToAdd(
|
||||
// chances of running more than 30 times is less than 1 in a billion
|
||||
for (int i = 0; i < 40; ++i)
|
||||
{
|
||||
auto const r = rand_int(earliestShardIndex(), maxShardIndex);
|
||||
auto const r {rand_int(earliestShardIndex(), maxShardIndex)};
|
||||
if (complete_.find(r) == complete_.end() &&
|
||||
(!incomplete_ || incomplete_->index() != r))
|
||||
(!incomplete_ || incomplete_->index() != r) &&
|
||||
preShards_.find(r) == preShards_.end())
|
||||
return r;
|
||||
}
|
||||
assert(0);
|
||||
@@ -850,21 +1083,48 @@ DatabaseShardImp::updateStats(std::lock_guard<std::mutex>&)
|
||||
std::pair<std::shared_ptr<PCache>, std::shared_ptr<NCache>>
|
||||
DatabaseShardImp::selectCache(std::uint32_t seq)
|
||||
{
|
||||
std::pair<std::shared_ptr<PCache>,
|
||||
std::shared_ptr<NCache>> cache;
|
||||
auto const shardIndex {seqToShardIndex(seq)};
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
{
|
||||
std::lock_guard<std::mutex> l(m_);
|
||||
assert(init_);
|
||||
auto it = complete_.find(shardIndex);
|
||||
if (it != complete_.end())
|
||||
cache = std::make_pair(it->second->pCache(),
|
||||
{
|
||||
return std::make_pair(it->second->pCache(),
|
||||
it->second->nCache());
|
||||
else if (incomplete_ && incomplete_->index() == shardIndex)
|
||||
cache = std::make_pair(incomplete_->pCache(),
|
||||
incomplete_->nCache());
|
||||
}
|
||||
}
|
||||
return cache;
|
||||
if (incomplete_ && incomplete_->index() == shardIndex)
|
||||
{
|
||||
return std::make_pair(incomplete_->pCache(),
|
||||
incomplete_->nCache());
|
||||
}
|
||||
|
||||
// Used to validate import shards
|
||||
auto it = preShards_.find(shardIndex);
|
||||
if (it != preShards_.end() && it->second)
|
||||
{
|
||||
return std::make_pair(it->second->pCache(),
|
||||
it->second->nCache());
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
||||
bool
|
||||
DatabaseShardImp::remove(boost::filesystem::path const& path)
|
||||
{
|
||||
try
|
||||
{
|
||||
boost::filesystem::remove_all(path);
|
||||
}
|
||||
catch (const boost::filesystem::filesystem_error& e)
|
||||
{
|
||||
JLOG(j_.error()) <<
|
||||
"remove_all " << path.string() <<
|
||||
": Exception, " << e.code().message();
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
} // NodeStore
|
||||
|
||||
Reference in New Issue
Block a user