Add command import node store to shards

This commit is contained in:
Miguel Portilla
2018-03-15 15:19:22 -04:00
committed by Nikolaos D. Bougalis
parent c4a9b73a66
commit 859d18adb0
16 changed files with 448 additions and 302 deletions

View File

@@ -216,6 +216,7 @@ Ledger::Ledger (
Ledger::Ledger (
LedgerInfo const& info,
bool& loaded,
bool acquire,
Config const& config,
Family& family,
beast::Journal j)
@@ -254,7 +255,8 @@ Ledger::Ledger (
if (! loaded)
{
info_.hash = calculateLedgerHash(info_);
family.missing_node (info_.hash, info_.seq);
if (acquire)
family.missing_node (info_.hash, info_.seq);
}
}
@@ -1083,10 +1085,12 @@ Ledger::invariants() const
*
* @param sqlSuffix: Additional string to append to the sql query.
* (typically a where clause).
* @param acquire: Acquire the ledger if not found locally.
* @return The ledger, ledger sequence, and ledger hash.
*/
std::tuple<std::shared_ptr<Ledger>, std::uint32_t, uint256>
loadLedgerHelper(std::string const& sqlSuffix, Application& app)
loadLedgerHelper(std::string const& sqlSuffix,
Application& app, bool acquire)
{
uint256 ledgerHash{};
std::uint32_t ledgerSeq{0};
@@ -1155,11 +1159,11 @@ loadLedgerHelper(std::string const& sqlSuffix, Application& app)
info.closeTimeResolution = duration{closeResolution.value_or(0)};
info.seq = ledgerSeq;
bool loaded = false;
bool loaded;
auto ledger = std::make_shared<Ledger>(
info,
loaded,
acquire,
app.config(),
app.family(),
app.journal("Ledger"));
@@ -1188,14 +1192,15 @@ void finishLoadByIndexOrHash(
}
std::shared_ptr<Ledger>
loadByIndex (std::uint32_t ledgerIndex, Application& app)
loadByIndex (std::uint32_t ledgerIndex,
Application& app, bool acquire)
{
std::shared_ptr<Ledger> ledger;
{
std::ostringstream s;
s << "WHERE LedgerSeq = " << ledgerIndex;
std::tie (ledger, std::ignore, std::ignore) =
loadLedgerHelper (s.str (), app);
loadLedgerHelper (s.str (), app, acquire);
}
finishLoadByIndexOrHash (ledger, app.config(),
@@ -1204,14 +1209,15 @@ loadByIndex (std::uint32_t ledgerIndex, Application& app)
}
std::shared_ptr<Ledger>
loadByHash (uint256 const& ledgerHash, Application& app)
loadByHash (uint256 const& ledgerHash,
Application& app, bool acquire)
{
std::shared_ptr<Ledger> ledger;
{
std::ostringstream s;
s << "WHERE LedgerHash = '" << ledgerHash << "'";
std::tie (ledger, std::ignore, std::ignore) =
loadLedgerHelper (s.str (), app);
loadLedgerHelper (s.str (), app, acquire);
}
finishLoadByIndexOrHash (ledger, app.config(),

View File

@@ -111,10 +111,14 @@ public:
Config const& config,
Family& family);
// Used for ledgers loaded from JSON files
/** Used for ledgers loaded from JSON files
@param acquire If true, acquires the ledger if not found locally
*/
Ledger (
LedgerInfo const& info,
bool& loaded,
bool acquire,
Config const& config,
Family& family,
beast::Journal j);
@@ -356,16 +360,17 @@ pendSaveValidated(
extern
std::shared_ptr<Ledger>
loadByIndex (std::uint32_t ledgerIndex,
Application& app);
Application& app, bool acquire = true);
extern
std::tuple<std::shared_ptr<Ledger>, std::uint32_t, uint256>
loadLedgerHelper(std::string const& sqlSuffix,
Application& app);
Application& app, bool acquire = true);
extern
std::shared_ptr<Ledger>
loadByHash (uint256 const& ledgerHash, Application& app);
loadByHash (uint256 const& ledgerHash,
Application& app, bool acquire = true);
extern
uint256

View File

@@ -1074,6 +1074,7 @@ private:
void addTxnSeqField();
void addValidationSeqFields();
bool updateTables ();
bool nodeToShards ();
bool validateShards ();
void startGenesisLedger ();
@@ -1281,8 +1282,15 @@ bool ApplicationImp::setup()
*config_);
add (*m_overlay); // add to PropertyStream
if (config_->valShards && !validateShards())
return false;
if (!config_->standalone())
{
// validation and node import require the sqlite db
if (config_->nodeToShard && !nodeToShards())
return false;
if (config_->validateShards && !validateShards())
return false;
}
validatorSites_->start ();
@@ -2076,16 +2084,32 @@ bool ApplicationImp::updateTables ()
return true;
}
bool ApplicationImp::validateShards()
bool ApplicationImp::nodeToShards()
{
if (!m_overlay)
Throw<std::runtime_error>("no overlay");
if(config_->standalone())
assert(m_overlay);
assert(!config_->standalone());
if (config_->section(ConfigSection::shardDatabase()).empty())
{
JLOG(m_journal.fatal()) <<
"Shard validation cannot be run in standalone";
JLOG (m_journal.fatal()) <<
"The [shard_db] configuration setting must be set";
return false;
}
if (!shardStore_)
{
JLOG(m_journal.fatal()) <<
"Invalid [shard_db] configuration";
return false;
}
shardStore_->importNodeStore();
return true;
}
bool ApplicationImp::validateShards()
{
assert(m_overlay);
assert(!config_->standalone());
if (config_->section(ConfigSection::shardDatabase()).empty())
{
JLOG (m_journal.fatal()) <<

View File

@@ -320,7 +320,8 @@ int run (int argc, char** argv)
("debug", "Enable normally suppressed debug logging")
("fg", "Run in the foreground.")
("import", importText.c_str ())
("shards", shardsText.c_str ())
("nodetoshard", "Import node store into shards")
("validateShards", shardsText.c_str ())
("version", "Display the build version.")
;
@@ -420,8 +421,11 @@ int run (int argc, char** argv)
if (vm.count ("import"))
config->doImport = true;
if (vm.count ("shards"))
config->valShards = true;
if (vm.count("nodetoshard"))
config->nodeToShard = true;
if (vm.count ("validateShards "))
config->validateShards = true;
if (vm.count ("ledger"))
{

View File

@@ -112,7 +112,8 @@ private:
public:
bool doImport = false;
bool valShards = false;
bool nodeToShard = false;
bool validateShards = false;
bool ELB_SUPPORT = false;
std::vector<std::string> IPS; // Peer IPs from rippled.cfg.

View File

@@ -239,15 +239,21 @@ protected:
std::shared_ptr<KeyCache<uint256>> const& nCache);
std::shared_ptr<NodeObject>
fetchInternal(uint256 const& hash, Backend& backend);
fetchInternal(uint256 const& hash, Backend& srcBackend);
void
importInternal(Database& source, Backend& dest);
importInternal(Backend& dstBackend, Database& srcDB);
std::shared_ptr<NodeObject>
doFetch(uint256 const& hash, std::uint32_t seq,
TaggedCache<uint256, NodeObject>& pCache,
KeyCache<uint256>& nCache, bool isAsync);
bool
copyLedger(Backend& dstBackend, Ledger const& srcLedger,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache, bool isAsync);
std::shared_ptr<KeyCache<uint256>> const& nCache,
std::shared_ptr<Ledger const> const& srcNext);
private:
std::atomic<std::uint32_t> storeCount_ {0};

View File

@@ -123,6 +123,12 @@ public:
void
validate() = 0;
/** Import the node store into the shard store.
*/
virtual
void
importNodeStore() = 0;
/** @return The maximum number of ledgers stored in a shard
*/
virtual

View File

@@ -32,7 +32,7 @@ enum
// This is only used to pre-allocate the array for
// batch objects and does not affect the amount written.
//
batchWritePreallocationSize = 128
batchWritePreallocationSize = 256
};
/** Return codes from Backend operations. */

View File

@@ -18,6 +18,7 @@
//==============================================================================
#include <ripple/nodestore/Database.h>
#include <ripple/app/ledger/Ledger.h>
#include <ripple/basics/chrono.h>
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/protocol/HashPrefix.h>
@@ -115,13 +116,13 @@ Database::asyncFetch(uint256 const& hash, std::uint32_t seq,
}
std::shared_ptr<NodeObject>
Database::fetchInternal(uint256 const& hash, Backend& backend)
Database::fetchInternal(uint256 const& hash, Backend& srcBackend)
{
std::shared_ptr<NodeObject> nObj;
Status status;
try
{
status = backend.fetch(hash.begin(), &nObj);
status = srcBackend.fetch(hash.begin(), &nObj);
}
catch (std::exception const& e)
{
@@ -153,11 +154,11 @@ Database::fetchInternal(uint256 const& hash, Backend& backend)
}
void
Database::importInternal(Database& source, Backend& dest)
Database::importInternal(Backend& dstBackend, Database& srcDB)
{
Batch b;
b.reserve(batchWritePreallocationSize);
source.for_each(
srcDB.for_each(
[&](std::shared_ptr<NodeObject> nObj)
{
assert(nObj);
@@ -170,20 +171,20 @@ Database::importInternal(Database& source, Backend& dest)
b.push_back(nObj);
if (b.size() >= batchWritePreallocationSize)
{
dest.storeBatch(b);
dstBackend.storeBatch(b);
b.clear();
b.reserve(batchWritePreallocationSize);
}
});
if (! b.empty())
dest.storeBatch(b);
dstBackend.storeBatch(b);
}
// Perform a fetch and report the time it took
std::shared_ptr<NodeObject>
Database::doFetch(uint256 const& hash, std::uint32_t seq,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache, bool isAsync)
TaggedCache<uint256, NodeObject>& pCache,
KeyCache<uint256>& nCache, bool isAsync)
{
FetchReport report;
report.isAsync = isAsync;
@@ -193,8 +194,8 @@ Database::doFetch(uint256 const& hash, std::uint32_t seq,
auto const before = steady_clock::now();
// See if the object already exists in the cache
auto nObj = pCache->fetch(hash);
if (! nObj && ! nCache->touch_if_exists(hash))
auto nObj = pCache.fetch(hash);
if (! nObj && ! nCache.touch_if_exists(hash))
{
// Try the database(s)
report.wentToDisk = true;
@@ -203,15 +204,15 @@ Database::doFetch(uint256 const& hash, std::uint32_t seq,
if (! nObj)
{
// Just in case a write occurred
nObj = pCache->fetch(hash);
nObj = pCache.fetch(hash);
if (! nObj)
// We give up
nCache->insert(hash);
nCache.insert(hash);
}
else
{
// Ensure all threads get the same object
pCache->canonicalize(hash, nObj);
pCache.canonicalize(hash, nObj);
// Since this was a 'hard' fetch, we will log it.
JLOG(j_.trace()) <<
@@ -225,6 +226,126 @@ Database::doFetch(uint256 const& hash, std::uint32_t seq,
return nObj;
}
bool
Database::copyLedger(Backend& dstBackend, Ledger const& srcLedger,
std::shared_ptr<TaggedCache<uint256, NodeObject>> const& pCache,
std::shared_ptr<KeyCache<uint256>> const& nCache,
std::shared_ptr<Ledger const> const& srcNext)
{
assert(static_cast<bool>(pCache) == static_cast<bool>(nCache));
if (srcLedger.info().hash.isZero() ||
srcLedger.info().accountHash.isZero())
{
assert(false);
JLOG(j_.error()) <<
"source ledger seq " << srcLedger.info().seq <<
" is invalid";
return false;
}
auto& srcDB = const_cast<Database&>(
srcLedger.stateMap().family().db());
if (&srcDB == this)
{
assert(false);
JLOG(j_.error()) <<
"source and destination databases are the same";
return false;
}
Batch batch;
batch.reserve(batchWritePreallocationSize);
auto storeBatch = [&]() {
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
for (auto& nObj : batch)
{
assert(nObj->getHash() ==
sha512Hash(makeSlice(nObj->getData())));
if (pCache && nCache)
{
pCache->canonicalize(nObj->getHash(), nObj, true);
nCache->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
}
#else
if (pCache && nCache)
for (auto& nObj : batch)
{
pCache->canonicalize(nObj->getHash(), nObj, true);
nCache->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
#endif
dstBackend.storeBatch(batch);
batch.clear();
batch.reserve(batchWritePreallocationSize);
};
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), srcLedger.info().seq))
{
batch.emplace_back(std::move(nObj));
if (batch.size() >= batchWritePreallocationSize)
storeBatch();
}
else
error = true;
return !error;
};
// Store ledger header
{
Serializer s(1024);
s.add32(HashPrefix::ledgerMaster);
addRaw(srcLedger.info(), s);
auto nObj = NodeObject::createObject(hotLEDGER,
std::move(s.modData()), srcLedger.info().hash);
batch.emplace_back(std::move(nObj));
}
// Store the state map
if (srcLedger.stateMap().getHash().isNonZero())
{
if (!srcLedger.stateMap().isValid())
{
JLOG(j_.error()) <<
"source ledger seq " << srcLedger.info().seq <<
" state map invalid";
return false;
}
if (srcNext && srcNext->info().parentHash == srcLedger.info().hash)
{
auto have = srcNext->stateMap().snapShot(false);
srcLedger.stateMap().snapShot(
false)->visitDifferences(&(*have), f);
}
else
srcLedger.stateMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Store the transaction map
if (srcLedger.info().txHash.isNonZero())
{
if (!srcLedger.txMap().isValid())
{
JLOG(j_.error()) <<
"source ledger seq " << srcLedger.info().seq <<
" transaction map invalid";
return false;
}
srcLedger.txMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
if (!batch.empty())
storeBatch();
return true;
}
// Entry point for async read threads
void
Database::threadEntry()
@@ -266,7 +387,7 @@ Database::threadEntry()
// Perform the read
if (lastPcache && lastPcache)
doFetch(lastHash, lastSeq, lastPcache, lastNcache, true);
doFetch(lastHash, lastSeq, *lastPcache, *lastNcache, true);
}
}

View File

@@ -51,85 +51,6 @@ DatabaseNodeImp::asyncFetch(uint256 const& hash,
return false;
}
bool
DatabaseNodeImp::copyLedger(
std::shared_ptr<Ledger const> const& ledger)
{
if (ledger->info().hash.isZero() ||
ledger->info().accountHash.isZero())
{
assert(false);
JLOG(j_.error()) <<
"Invalid ledger";
return false;
}
auto& srcDB = const_cast<Database&>(
ledger->stateMap().family().db());
if (&srcDB == this)
{
assert(false);
JLOG(j_.error()) <<
"Source and destination are the same";
return false;
}
Batch batch;
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), ledger->info().seq))
batch.emplace_back(std::move(nObj));
else
error = true;
return !error;
};
// Batch the ledger header
{
Serializer s(1024);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
batch.emplace_back(NodeObject::createObject(hotLEDGER,
std::move(s.modData()), ledger->info().hash));
}
// Batch the state map
if (ledger->stateMap().getHash().isNonZero())
{
if (! ledger->stateMap().isValid())
{
JLOG(j_.error()) <<
"invalid state map";
return false;
}
ledger->stateMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Batch the transaction map
if (ledger->info().txHash.isNonZero())
{
if (! ledger->txMap().isValid())
{
JLOG(j_.error()) <<
"invalid transaction map";
return false;
}
ledger->txMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Store batch
for (auto& nObj : batch)
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(nObj->getHash() == sha512Hash(makeSlice(nObj->getData())));
#endif
pCache_->canonicalize(nObj->getHash(), nObj, true);
nCache_->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
backend_->storeBatch(batch);
return true;
}
void
DatabaseNodeImp::tune(int size, int age)
{

View File

@@ -72,7 +72,7 @@ public:
void
import(Database& source) override
{
importInternal(source, *backend_.get());
importInternal(*backend_.get(), source);
}
void
@@ -82,7 +82,7 @@ public:
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override
{
return doFetch(hash, seq, pCache_, nCache_, false);
return doFetch(hash, seq, *pCache_, *nCache_, false);
}
bool
@@ -90,7 +90,11 @@ public:
std::shared_ptr<NodeObject>& object) override;
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) override;
copyLedger(std::shared_ptr<Ledger const> const& ledger) override
{
return Database::copyLedger(
*backend_, *ledger, pCache_, nCache_, nullptr);
}
int
getDesiredAsyncReadCount(std::uint32_t seq) override

View File

@@ -85,85 +85,6 @@ DatabaseRotatingImp::asyncFetch(uint256 const& hash,
return false;
}
bool
DatabaseRotatingImp::copyLedger(
std::shared_ptr<Ledger const> const& ledger)
{
if (ledger->info().hash.isZero() ||
ledger->info().accountHash.isZero())
{
assert(false);
JLOG(j_.error()) <<
"Invalid ledger";
return false;
}
auto& srcDB = const_cast<Database&>(
ledger->stateMap().family().db());
if (&srcDB == this)
{
assert(false);
JLOG(j_.error()) <<
"Source and destination are the same";
return false;
}
Batch batch;
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), ledger->info().seq))
batch.emplace_back(std::move(nObj));
else
error = true;
return !error;
};
// Batch the ledger header
{
Serializer s(1024);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
batch.emplace_back(NodeObject::createObject(hotLEDGER,
std::move(s.modData()), ledger->info().hash));
}
// Batch the state map
if (ledger->stateMap().getHash().isNonZero())
{
if (! ledger->stateMap().isValid())
{
JLOG(j_.error()) <<
"invalid state map";
return false;
}
ledger->stateMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Batch the transaction map
if (ledger->info().txHash.isNonZero())
{
if (! ledger->txMap().isValid())
{
JLOG(j_.error()) <<
"invalid transaction map";
return false;
}
ledger->txMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Store batch
for (auto& nObj : batch)
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(nObj->getHash() == sha512Hash(makeSlice(nObj->getData())));
#endif
pCache_->canonicalize(nObj->getHash(), nObj, true);
nCache_->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
getWritableBackend()->storeBatch(batch);
return true;
}
void
DatabaseRotatingImp::tune(int size, int age)
{

View File

@@ -75,7 +75,7 @@ public:
void import (Database& source) override
{
importInternal (source, *getWritableBackend());
importInternal (*getWritableBackend(), source);
}
void store(NodeObjectType type, Blob&& data,
@@ -84,7 +84,7 @@ public:
std::shared_ptr<NodeObject>
fetch(uint256 const& hash, std::uint32_t seq) override
{
return doFetch(hash, seq, pCache_, nCache_, false);
return doFetch(hash, seq, *pCache_, *nCache_, false);
}
bool
@@ -92,7 +92,11 @@ public:
std::shared_ptr<NodeObject>& object) override;
bool
copyLedger(std::shared_ptr<Ledger const> const& ledger) override;
copyLedger(std::shared_ptr<Ledger const> const& ledger) override
{
return Database::copyLedger(
*getWritableBackend(), *ledger, pCache_, nCache_, nullptr);
}
int
getDesiredAsyncReadCount(std::uint32_t seq) override

View File

@@ -42,7 +42,7 @@ DatabaseShardImp::DatabaseShardImp(Application& app,
, ledgersPerShard_(get<std::uint32_t>(
config, "ledgers_per_shard", ledgersPerShardDefault))
, earliestShardIndex_(seqToShardIndex(earliestSeq()))
, avgShardSz_(ledgersPerShard() * (192 * 1024))
, avgShardSz_(ledgersPerShard_ * (192 * 1024))
{
if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0)
Throw<std::runtime_error>(
@@ -108,6 +108,18 @@ DatabaseShardImp::init()
". Earliest shard index " << earliestShardIndex();
return false;
}
// Check if a previous import failed
if (is_regular_file(dir_ / std::to_string(shardIndex) /
importMarker_))
{
JLOG(j_.warn()) <<
"shard " << shardIndex <<
" previously failed import, removing";
remove_all(dir_ / std::to_string(shardIndex));
continue;
}
auto shard = std::make_unique<Shard>(
*this, shardIndex, cacheSz_, cacheAge_, j_);
if (!shard->open(config_, scheduler_, dir_))
@@ -336,6 +348,191 @@ DatabaseShardImp::validate()
app_.shardFamily()->reset();
}
void
DatabaseShardImp::importNodeStore()
{
std::unique_lock<std::mutex> l(m_);
assert(init_);
std::uint32_t earliestIndex;
std::uint32_t latestIndex;
{
auto loadLedger = [&](bool ascendSort = true) ->
boost::optional<std::uint32_t>
{
std::shared_ptr<Ledger> ledger;
std::uint32_t seq;
std::tie(ledger, seq, std::ignore) = loadLedgerHelper(
"WHERE LedgerSeq >= " + std::to_string(earliestSeq()) +
" order by LedgerSeq " + (ascendSort ? "asc" : "desc") +
" limit 1", app_, false);
if (!ledger || seq == 0)
{
JLOG(j_.error()) <<
"No suitable ledgers were found in" <<
" the sqlite database to import";
return boost::none;
}
return seq;
};
// Find earliest ledger sequence stored
auto seq {loadLedger()};
if (!seq)
return;
earliestIndex = seqToShardIndex(*seq);
// Consider only complete shards
if (seq != firstLedgerSeq(earliestIndex))
++earliestIndex;
// Find last ledger sequence stored
seq = loadLedger(false);
if (!seq)
return;
latestIndex = seqToShardIndex(*seq);
// Consider only complete shards
if (seq != lastLedgerSeq(latestIndex))
--latestIndex;
if (latestIndex < earliestIndex)
{
JLOG(j_.error()) <<
"No suitable ledgers were found in" <<
" the sqlite database to import";
return;
}
}
// Import the shards
for (std::uint32_t shardIndex = earliestIndex;
shardIndex <= latestIndex; ++shardIndex)
{
if (usedDiskSpace_ + avgShardSz_ > maxDiskSpace_)
{
JLOG(j_.error()) <<
"Maximum size reached";
canAdd_ = false;
break;
}
if (avgShardSz_ > boost::filesystem::space(dir_).free)
{
JLOG(j_.error()) <<
"Insufficient disk space";
canAdd_ = false;
break;
}
// Skip if already stored
if (complete_.find(shardIndex) != complete_.end() ||
(incomplete_ && incomplete_->index() == shardIndex))
{
JLOG(j_.debug()) <<
"shard " << shardIndex <<
" already exists";
continue;
}
// Verify sqlite ledgers are in the node store
{
auto const firstSeq {firstLedgerSeq(shardIndex)};
auto const lastSeq {std::max(firstSeq, lastLedgerSeq(shardIndex))};
auto const numLedgers {shardIndex == earliestShardIndex()
? lastSeq - firstSeq + 1 : ledgersPerShard_};
auto ledgerHashes{getHashesByIndex(firstSeq, lastSeq, app_)};
if (ledgerHashes.size() != numLedgers)
continue;
bool valid {true};
for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256)
{
if (!app_.getNodeStore().fetch(ledgerHashes[n].first, n))
{
JLOG(j_.warn()) <<
"SQL DB ledger sequence " << n <<
" mismatches node store";
valid = false;
break;
}
}
if (!valid)
continue;
}
// Create the new shard
app_.shardFamily()->reset();
auto shard = std::make_unique<Shard>(
*this, shardIndex, shardCacheSz, cacheAge_, j_);
if (!shard->open(config_, scheduler_, dir_))
{
shard.reset();
remove_all(dir_ / std::to_string(shardIndex));
continue;
}
// Create a marker file to signify an import in progress
auto f {dir_ / std::to_string(shardIndex) / importMarker_};
std::ofstream ofs {f.string()};
if (!ofs.is_open())
{
JLOG(j_.error()) <<
"shard " << shardIndex <<
" unable to create temp file";
shard.reset();
remove_all(dir_ / std::to_string(shardIndex));
continue;
}
ofs.close();
// Copy the ledgers from node store
while (auto seq = shard->prepare())
{
auto ledger = loadByIndex(*seq, app_, false);
if (!ledger || ledger->info().seq != seq ||
!Database::copyLedger(*shard->getBackend(), *ledger,
nullptr, nullptr, shard->lastStored()))
break;
auto const before {shard->fileSize()};
if (!shard->setStored(ledger))
break;
auto const after {shard->fileSize()};
if (after > before)
usedDiskSpace_ += (after - before);
else if(after < before)
usedDiskSpace_ -= std::min(before - after, usedDiskSpace_);
if (shard->complete())
{
remove(f);
JLOG(j_.debug()) <<
"shard " << shardIndex <<
" successfully imported";
break;
}
}
if (!shard->complete())
{
JLOG(j_.error()) <<
"shard " << shardIndex <<
" failed to import";
shard.reset();
remove_all(dir_ / std::to_string(shardIndex));
}
}
// Re initialize the shard store
init_ = false;
complete_.clear();
incomplete_.reset();
usedDiskSpace_ = 0;
l.unlock();
if (!init())
Throw<std::runtime_error>("Failed to initialize");
}
std::int32_t
DatabaseShardImp::getWriteLoad() const
{
@@ -384,7 +581,7 @@ DatabaseShardImp::fetch(uint256 const& hash, std::uint32_t seq)
{
auto cache {selectCache(seq)};
if (cache.first)
return doFetch(hash, seq, cache.first, cache.second, false);
return doFetch(hash, seq, *cache.first, *cache.second, false);
return {};
}
@@ -408,24 +605,6 @@ DatabaseShardImp::asyncFetch(uint256 const& hash,
bool
DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& ledger)
{
if (ledger->info().hash.isZero() ||
ledger->info().accountHash.isZero())
{
assert(false);
JLOG(j_.error()) <<
"source ledger seq " << ledger->info().seq <<
" is invalid";
return false;
}
auto& srcDB = const_cast<Database&>(
ledger->stateMap().family().db());
if (&srcDB == this)
{
assert(false);
JLOG(j_.error()) <<
"same source and destination databases";
return false;
}
auto const shardIndex {seqToShardIndex(ledger->info().seq)};
std::lock_guard<std::mutex> l(m_);
assert(init_);
@@ -437,74 +616,11 @@ DatabaseShardImp::copyLedger(std::shared_ptr<Ledger const> const& ledger)
return false;
}
// Store the ledger header
if (!Database::copyLedger(*incomplete_->getBackend(), *ledger,
incomplete_->pCache(), incomplete_->nCache(),
incomplete_->lastStored()))
{
Serializer s(1024);
s.add32(HashPrefix::ledgerMaster);
addRaw(ledger->info(), s);
auto nObj = NodeObject::createObject(hotLEDGER,
std::move(s.modData()), ledger->info().hash);
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(nObj->getHash() == sha512Hash(makeSlice(nObj->getData())));
#endif
incomplete_->pCache()->canonicalize(
nObj->getHash(), nObj, true);
incomplete_->getBackend()->store(nObj);
incomplete_->nCache()->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
auto next = incomplete_->lastStored();
bool error = false;
auto f = [&](SHAMapAbstractNode& node) {
if (auto nObj = srcDB.fetch(
node.getNodeHash().as_uint256(), ledger->info().seq))
{
#if RIPPLE_VERIFY_NODEOBJECT_KEYS
assert(nObj->getHash() == sha512Hash(makeSlice(nObj->getData())));
#endif
incomplete_->pCache()->canonicalize(
nObj->getHash(), nObj, true);
incomplete_->getBackend()->store(nObj);
incomplete_->nCache()->erase(nObj->getHash());
storeStats(nObj->getData().size());
}
else
error = true;
return !error;
};
// Store the state map
if (ledger->stateMap().getHash().isNonZero())
{
if (!ledger->stateMap().isValid())
{
JLOG(j_.error()) <<
"source ledger seq " << ledger->info().seq <<
" state map invalid";
return false;
}
if (next && next->info().parentHash == ledger->info().hash)
{
auto have = next->stateMap().snapShot(false);
ledger->stateMap().snapShot(false)->visitDifferences(&(*have), f);
}
else
ledger->stateMap().snapShot(false)->visitNodes(f);
if (error)
return false;
}
// Store the transaction map
if (ledger->info().txHash.isNonZero())
{
if (!ledger->txMap().isValid())
{
JLOG(j_.error()) <<
"source ledger seq " << ledger->info().seq <<
" transaction map invalid";
return false;
}
ledger->txMap().snapShot(false)->visitNodes(f);
if (error)
return false;
return false;
}
auto const before {incomplete_->fileSize()};
@@ -612,7 +728,7 @@ DatabaseShardImp::fetchFrom(uint256 const& hash, std::uint32_t seq)
std::shared_ptr<Backend> backend;
auto const shardIndex {seqToShardIndex(seq)};
{
std::unique_lock<std::mutex> l(m_);
std::lock_guard<std::mutex> l(m_);
assert(init_);
auto it = complete_.find(shardIndex);
if (it != complete_.end())
@@ -751,6 +867,5 @@ DatabaseShardImp::selectCache(std::uint32_t seq)
return cache;
}
} // NodeStore
} // ripple

View File

@@ -60,6 +60,9 @@ public:
void
validate() override;
void
importNodeStore() override;
std::uint32_t
ledgersPerShard() const override
{
@@ -175,6 +178,9 @@ private:
int cacheSz_ {shardCacheSz};
PCache::clock_type::rep cacheAge_ {shardCacheSeconds};
// File name used to mark shards being imported from node store
static constexpr auto importMarker_ = "import";
std::shared_ptr<NodeObject>
fetchFrom(uint256 const& hash, std::uint32_t seq) override;

View File

@@ -140,7 +140,9 @@ Shard::setStored(std::shared_ptr<Ledger const> const& l)
storedSeqs_.clear();
JLOG(j_.debug()) <<
"shard " << index_ << " complete";
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" stored. Shard complete";
}
else
{
@@ -148,12 +150,12 @@ Shard::setStored(std::shared_ptr<Ledger const> const& l)
lastStored_ = l;
if (backend_->fdlimit() != 0 && !saveControl())
return false;
}
JLOG(j_.debug()) <<
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" stored";
JLOG(j_.debug()) <<
"shard " << index_ <<
" ledger seq " << l->info().seq <<
" stored";
}
return true;
}
@@ -186,7 +188,7 @@ Shard::validate(Application& app)
{
std::tie(l, seq, hash) = loadLedgerHelper(
"WHERE LedgerSeq >= " + std::to_string(lastSeq_) +
" order by LedgerSeq desc limit 1", app);
" order by LedgerSeq desc limit 1", app, false);
if (!l)
{
JLOG(j_.fatal()) <<