20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/core/ConfigSections.h>
27 #include <ripple/nodestore/DummyScheduler.h>
28 #include <ripple/nodestore/impl/DatabaseShardImp.h>
29 #include <ripple/overlay/Overlay.h>
30 #include <ripple/overlay/predicates.h>
31 #include <ripple/protocol/HashPrefix.h>
33 #include <boost/algorithm/string/predicate.hpp>
36 #include <sys/statvfs.h>
60 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192))
76 JLOG(
j_.
error()) <<
"already initialized";
82 JLOG(
j_.
error()) <<
"invalid configuration file settings";
88 using namespace boost::filesystem;
96 for (
auto const& path : paths)
102 if (!is_directory(path))
104 JLOG(
j_.
error()) << path <<
" must be a directory";
108 else if (!create_directories(path))
111 <<
"failed to create path: " + path.string();
118 <<
"failed to create path: " + path.string();
131 ctx_ = std::make_unique<nudb::context>();
135 for (
auto const& path : paths)
137 for (
auto const& d : directory_iterator(path))
139 if (!is_directory(d))
142 auto const shardDir = d.path();
145 auto dirName = shardDir.stem().string();
147 dirName.begin(), dirName.end(), [](
auto c) {
148 return ::isdigit(static_cast<unsigned char>(c));
158 <<
"shard " << shardIndex
159 <<
" comes before earliest shard index "
168 <<
"shard " << shardIndex
169 <<
" previously failed import, removing";
170 remove_all(shardDir);
174 auto shard{std::make_unique<Shard>(
175 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
179 shard->removeOnDestroy();
181 <<
"shard " << shardIndex <<
" removed, "
182 << (shard->isLegacy() ?
"legacy" :
"corrupted")
187 if (shard->isFinal())
194 else if (shard->isBackendComplete())
196 auto const result{
shards_.emplace(
201 result.first->second,
true, lock, boost::none);
208 <<
"more than one shard being acquired";
224 <<
"exception " << e.
what() <<
" in function " << __func__;
237 boost::optional<std::uint32_t>
240 boost::optional<std::uint32_t> shardIndex;
249 return it->second.shard->prepare();
262 JLOG(
j_.
debug()) <<
"no new shards to add";
270 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
275 if (!pathDesignation)
278 auto const needsHistoricalPath =
281 auto shard = [
this, shardIndex, needsHistoricalPath] {
283 return std::make_unique<Shard>(
294 auto const seq{shard->prepare()};
309 JLOG(j.error()) <<
"shard " << shardIndex <<
" " << msg;
316 return fail(
"cannot be stored at this time");
321 "comes before earliest shard index " +
330 return fail(
"has an invalid index");
341 JLOG(
j_.
debug()) <<
"shard " << shardIndex
342 <<
" is already stored or queued for import";
353 return fail(
"maximum number of historical shards reached");
359 return fail(
"insufficient storage space available");
371 if (
auto const it{
shards_.find(shardIndex)};
400 boost::filesystem::path
const& srcDir)
402 using namespace boost::filesystem;
405 if (!is_directory(srcDir) || is_empty(srcDir))
407 JLOG(
j_.
error()) <<
"invalid source directory " << srcDir.string();
413 JLOG(
j_.
error()) <<
"exception " << e.
what() <<
" in function "
423 JLOG(
j_.
error()) <<
"shard " << shardIndex
424 <<
" expected hash not found";
428 auto renameDir = [&](path
const& src, path
const& dst) {
436 <<
"exception " << e.
what() <<
" in function " << __func__;
448 if (
auto const it{
shards_.find(shardIndex)}; it ==
shards_.end() ||
451 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" failed to import";
455 auto const pathDesignation =
458 if (!pathDesignation)
460 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" failed to import";
464 auto const needsHistoricalPath =
472 if (!renameDir(srcDir, dstDir))
476 auto shard{std::make_unique<Shard>(
477 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
479 if (!shard->open(
scheduler_, *
ctx_) || !shard->isBackendComplete())
481 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" failed to import";
483 renameDir(dstDir, srcDir);
489 auto const it{
shards_.find(shardIndex)};
490 if (it ==
shards_.end() || it->second.shard ||
493 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" failed to import";
495 renameDir(dstDir, srcDir);
499 it->second.shard = std::move(shard);
519 shard = it->second.shard;
520 state = it->second.state;
533 if (shard->containsLedger(seq))
541 auto nObj{
fetch(hash, seq)};
550 auto ledger{std::make_shared<Ledger>(
555 if (ledger->info().seq != seq)
560 if (ledger->info().hash != hash)
563 "encountered invalid ledger hash " +
to_string(hash) +
568 if (!ledger->stateMap().fetchRoot(
569 SHAMapHash{ledger->info().accountHash},
nullptr))
572 "is missing root STATE node on hash " +
to_string(hash) +
576 if (ledger->info().txHash.isNonZero())
578 if (!ledger->txMap().fetchRoot(
582 "is missing root TXN node on hash " +
to_string(hash) +
592 if (ledger->info().hash.isZero())
594 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
595 << ledger->info().seq;
598 if (ledger->info().accountHash.isZero())
600 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
601 << ledger->info().seq;
604 if (ledger->stateMap().getHash().isNonZero() &&
605 !ledger->stateMap().isValid())
607 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
608 << ledger->info().seq;
611 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
613 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
614 << ledger->info().seq;
627 <<
"shard " << shardIndex <<
" is not being acquired";
632 shard = it->second.shard;
636 <<
"shard " << shardIndex <<
" is not being acquired";
672 for (
auto const& e : shards)
674 if (
auto shard{e.lock()}; shard)
675 shard->finalize(
true, boost::none);
695 e.second.shard->stop();
711 JLOG(
j_.
error()) <<
"invalid source database";
718 auto loadLedger = [&](
bool ascendSort =
719 true) -> boost::optional<std::uint32_t> {
723 "WHERE LedgerSeq >= " +
725 " order by LedgerSeq " + (ascendSort ?
"asc" :
"desc") +
729 if (!ledger || seq == 0)
731 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
732 " the SQLite database to import";
739 auto seq{loadLedger()};
749 seq = loadLedger(
false);
758 if (latestIndex < earliestIndex)
760 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
761 " the SQLite database to import";
770 shardIndex <= latestIndex;
773 auto const pathDesignation =
776 if (!pathDesignation)
779 auto const needsHistoricalPath =
786 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already exists";
795 auto const numLedgers{
799 if (ledgerHashes.size() != numLedgers)
805 if (!source.
fetch(ledgerHashes[n].first, n))
807 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
808 <<
" mismatches node store";
822 std::make_unique<Shard>(
app_, *
this, shardIndex, path,
j_);
834 JLOG(
j_.
error()) <<
"shard " << shardIndex
835 <<
" failed to create temp marker file";
836 shard->removeOnDestroy();
844 boost::optional<uint256> lastLedgerHash;
846 while (
auto seq = shard->prepare())
849 if (!ledger || ledger->info().seq != seq)
862 if (!shard->store(ledger))
866 lastLedgerHash = ledger->info().hash;
868 recentStored = ledger;
871 using namespace boost::filesystem;
872 if (lastLedgerHash && shard->isBackendComplete())
885 shard->getBackend()->store(nObj);
889 remove_all(markerFile);
891 JLOG(
j_.
debug()) <<
"shard " << shardIndex
892 <<
" was successfully imported";
894 auto const result{
shards_.emplace(
898 result.first->second,
true, lock, boost::none);
906 <<
" in function " << __func__;
907 shard->removeOnDestroy();
913 <<
"shard " << shardIndex <<
" failed to import";
914 shard->removeOnDestroy();
933 shard = it->second.shard;
938 return shard->getBackend()->getWriteLoad();
957 <<
"shard " << shardIndex <<
" is not being acquired";
962 shard = it->second.shard;
966 <<
"shard " << shardIndex <<
" is not being acquired";
971 auto [backend, pCache, nCache] = shard->getBackendAll();
974 pCache->canonicalize_replace_cache(hash, nObj);
975 backend->store(nObj);
986 return doFetch(hash, seq, *cache.first, *cache.second,
false);
1000 object = cache.first->fetch(hash);
1001 if (
object || cache.second->touch_if_exists(hash))
1012 auto const seq{srcLedger->info().seq};
1022 <<
"shard " << shardIndex <<
" is not being acquired";
1026 if (
auto const it{
shards_.find(shardIndex)}; it !=
shards_.end())
1027 shard = it->second.shard;
1031 <<
"shard " << shardIndex <<
" is not being acquired";
1036 if (shard->containsLedger(seq))
1038 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
1043 auto [backend, pCache, nCache] = shard->getBackendAll();
1045 *srcLedger, backend, pCache, nCache,
nullptr))
1063 if (
auto const it{
shards_.find(shardIndex)}; it !=
shards_.end() &&
1067 shard = it->second.shard;
1073 return shard->pCache()->getTargetSize() /
asyncDivider;
1085 shard = it->second.shard;
1090 return shard->pCache()->getHitRate();
1109 for (
auto const& e : shards)
1111 if (
auto shard{e.lock()}; shard)
1136 get_if_exists<std::uint32_t>(
1137 section,
"earliest_seq", shardDBEarliestSeq);
1140 get_if_exists<std::uint32_t>(
1145 if (shardDBEarliestSeq != nodeDBEarliestSeq)
1149 "] define different 'earliest_seq' values");
1153 using namespace boost::filesystem;
1154 if (!get_if_exists<path>(section,
"path",
dir_))
1155 return fail(
"'path' missing");
1160 Section const& historicalShardPaths =
1161 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1163 auto values = historicalShardPaths.
values();
1165 std::sort(values.begin(), values.end());
1166 values.erase(
std::unique(values.begin(), values.end()), values.end());
1168 for (
auto const& s : values)
1170 auto const dir = path(s);
1174 "the 'path' cannot also be in the "
1175 "'historical_shard_path' section");
1182 if (section.exists(
"ledgers_per_shard"))
1185 if (!config.standalone())
1186 return fail(
"'ledgers_per_shard' only honored in stand alone");
1190 return fail(
"'ledgers_per_shard' must be a multiple of 256");
1197 backendName_ = get<std::string>(section,
"type",
"nudb");
1199 return fail(
"'type' value unsupported");
1213 if (
auto const it{
shards_.find(shardIndex)};
1214 it !=
shards_.end() && it->second.shard)
1216 shard = it->second.shard;
1225 boost::optional<std::uint32_t>
1233 auto const maxShardIndex{[
this, validLedgerSeq]() {
1242 if (
shards_.size() >= maxNumShards)
1245 if (maxShardIndex < 1024 ||
1246 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1254 shardIndex <= maxShardIndex;
1261 if (available.
empty())
1264 if (available.
size() == 1)
1265 return available.
front();
1274 for (
int i = 0; i < 40; ++i)
1290 boost::optional<uint256>
const& expectedHash)
1292 assert(shardInfo.
shard);
1294 assert(shardInfo.
shard->isBackendComplete());
1297 auto const shardIndex{shardInfo.
shard->index()};
1300 taskQueue_->addTask([
this, shardIndex, writeSQLite, expectedHash]() {
1307 if (
auto const it{
shards_.find(shardIndex)}; it !=
shards_.end())
1309 shard = it->second.shard;
1313 JLOG(
j_.
error()) <<
"Unable to finalize shard " << shardIndex;
1318 if (!shard->finalize(writeSQLite, expectedHash))
1333 auto const it{
shards_.find(shardIndex)};
1340 auto const isHistoricalShard = shardIndex < boundaryIndex;
1342 if (isHistoricalShard)
1345 shard->getDir().parent_path() ==
dir_)
1349 JLOG(
j_.
warn()) <<
"shard " << shardIndex
1350 <<
" is not stored at a historical path";
1359 assert(!boundaryIndex || shardIndex - boundaryIndex <= 1);
1361 auto& recentShard = shardIndex == boundaryIndex
1367 recentShard = shardIndex;
1369 if (shard->getDir().parent_path() !=
dir_)
1371 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1372 <<
" is not stored at the path";
1383 protocol::TMPeerShardInfo message;
1385 message.set_nodepubkey(publicKey.data(), publicKey.size());
1388 message, protocol::mtPEER_SHARD_INFO)));
1412 for (
auto const& e : shards)
1414 if (
auto shard{e.lock()}; shard)
1416 auto [sz, fd] = shard->fileInfo();
1431 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1440 <<
"maximum shard store size exceeds available storage space";
1452 rs.insert(e.second.shard->index());
1468 if (
auto const it{
shards_.find(shardIndex)};
1469 it !=
shards_.end() && it->second.shard)
1471 shard = it->second.shard;
1479 std::tie(std::ignore, pCache, nCache) = shard->getBackendAll();
1502 auto const availableSpace =
1503 boost::filesystem::space(path).available;
1511 capacities.
push_back(boost::filesystem::space(
dir_).available);
1523 if (numShards <= shardCap)
1526 numShards -= shardCap;
1531 JLOG(
j_.
error()) <<
"exception " << e.
what() <<
" in function "
1546 if (!shard->store(ledger))
1552 else if (shard->isBackendComplete())
1556 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1567 <<
"shard " << shard->index() <<
" is no longer being acquired";
1590 if ((
shards_.erase(shard->index()) > 0) && shard->isFinal())
1594 shard->removeOnDestroy();
1624 [recentShardBoundaryIndex](
auto const& entry) {
1625 return entry.first < recentShardBoundaryIndex;
1641 auto const removeShard =
1647 if (it->second.shard)
1651 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1656 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1660 auto const keepShard =
1661 [
this, &lock, removeShard, separateHistoricalPath](
1666 <<
"maximum number of historical shards reached";
1668 removeShard(shardIndex);
1671 if (separateHistoricalPath &&
1674 JLOG(
j_.
error()) <<
"insufficient storage space available";
1676 removeShard(shardIndex);
1685 auto const moveShard = [
this,
1691 if (
auto& shard = it->second.shard)
1702 boost::filesystem::rename(
1703 shard->getDir().string(),
1709 <<
"shard " << shardIndex
1710 <<
" failed to move to historical storage";
1716 shard = std::make_unique<Shard>(
1717 app_, *
this, shardIndex, dst,
j_);
1723 <<
"shard " << shardIndex
1724 <<
" failed to open in historical storage";
1726 shard->removeOnDestroy();
1733 <<
"can't find shard to move to historical path";
1739 <<
"can't find shard to move to historical path";
1745 bool const curNotSynched =
1752 if (curNotSynched || prevNotSynched)
1758 if (keepShard(*prev) && separateHistoricalPath)
1770 if (cur == latestShardIndex - 1)
1781 if (keepShard(*cur) && separateHistoricalPath)
1802 auto const isHistoricalShard = shardIndex < boundaryIndex;
1811 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1817 JLOG(
j_.
error()) <<
"insufficient storage space available";
1825 boost::filesystem::path
1833 boost::filesystem::path historicalShardPath;
1842 if (potentialPaths.
empty())
1844 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1849 potentialPaths.
begin(),
1850 potentialPaths.
end(),
1851 &historicalShardPath,
1855 return historicalShardPath;
1872 struct statvfs buffer;
1873 if (statvfs(path.c_str(), &buffer))
1876 <<
"failed to acquire stats for 'historical_shard_path': "
1881 filesystemIDs[buffer.f_fsid].push_back(path.string());
1885 for (
auto const& entry : filesystemIDs)
1889 if (entry.second.size() > 1)
1894 <<
"The following paths correspond to the same filesystem: "
1895 << boost::algorithm::join(entry.second,
", ")
1896 <<
". Each configured historical storage path should"
1897 " be on a unique device or filesystem.";
1919 uniqueCapacities[boost::filesystem::space(path).available].push_back(
1922 for (
auto const& entry : uniqueCapacities)
1926 if (entry.second.size() > 1)
1932 <<
"Each of the following paths have " << entry.first
1933 <<
" bytes free, and may be located on the same device"
1935 << boost::algorithm::join(entry.second,
", ")
1936 <<
". Each configured historical storage path should"
1937 " be on a unique device or filesystem.";
1958 if (section.empty())
1961 return std::make_unique<DatabaseShardImp>(
1962 app, parent,
"ShardStore", scheduler, readThreads, j);