20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/core/ConfigSections.h>
27 #include <ripple/nodestore/DummyScheduler.h>
28 #include <ripple/nodestore/impl/DatabaseShardImp.h>
29 #include <ripple/overlay/Overlay.h>
30 #include <ripple/overlay/predicates.h>
31 #include <ripple/protocol/HashPrefix.h>
33 #include <boost/algorithm/string/predicate.hpp>
36 #include <sys/statvfs.h>
60 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
73 JLOG(
j_.
error()) <<
"already initialized";
79 JLOG(
j_.
error()) <<
"invalid configuration file settings";
85 using namespace boost::filesystem;
92 for (
auto const& path : paths)
96 if (!is_directory(path))
98 JLOG(
j_.
error()) << path <<
" must be a directory";
102 else if (!create_directories(path))
105 <<
"failed to create path: " + path.string();
117 ctx_ = std::make_unique<nudb::context>();
122 for (
auto const& path : paths)
124 for (
auto const& it : directory_iterator(path))
127 if (!is_directory(it))
131 auto const shardDir{it.path()};
132 auto dirName{shardDir.stem().string()};
134 dirName.begin(), dirName.end(), [](
auto c) {
135 return ::isdigit(static_cast<unsigned char>(c));
146 <<
"shard " << shardIndex
147 <<
" ignored, comes before earliest shard index "
156 <<
"shard " << shardIndex
157 <<
" previously failed import, removing";
158 remove_all(shardDir);
162 auto shard{std::make_shared<Shard>(
163 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
167 shard->removeOnDestroy();
169 <<
"shard " << shardIndex <<
" removed, "
170 << (shard->isLegacy() ?
"legacy" :
"corrupted")
175 switch (shard->getState())
180 shards_.emplace(shardIndex, std::move(shard));
185 shards_.emplace(shardIndex, std::move(shard))
195 <<
"more than one shard being acquired";
199 shards_.emplace(shardIndex, std::move(shard));
205 <<
"shard " << shardIndex <<
" invalid state";
213 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
214 <<
". Error: " << e.
what();
227 boost::optional<std::uint32_t>
230 boost::optional<std::uint32_t> shardIndex;
239 return it->second->prepare();
254 JLOG(
j_.
debug()) <<
"no new shards to add";
262 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
267 if (!pathDesignation)
270 auto const needsHistoricalPath =
273 auto shard = [
this, shardIndex, needsHistoricalPath] {
275 return std::make_unique<Shard>(
286 auto const ledgerSeq{shard->prepare()};
289 shards_.emplace(*shardIndex, std::move(shard));
298 auto fail = [j =
j_, &shardIndexes](
300 boost::optional<std::uint32_t> shardIndex = boost::none) {
301 auto multipleIndexPrequel = [&shardIndexes] {
304 shardIndexes.
begin(),
306 indexesAsString.
begin(),
307 [](uint32_t
const index) { return std::to_string(index); });
310 (shardIndexes.
size() > 1 ?
"s " :
" ") +
311 boost::algorithm::join(indexesAsString,
", ");
316 : multipleIndexPrequel();
318 JLOG(j.error()) << prequel <<
" " << msg;
326 return fail(
"cannot be stored at this time");
328 auto historicalShardsToPrepare = 0;
330 for (
auto const shardIndex : shardIndexes)
335 "comes before earliest shard index " +
346 return fail(
"invalid index", shardIndex);
353 return fail(
"invalid index", shardIndex);
357 return fail(
"is already stored", shardIndex);
360 return fail(
"is already queued for import", shardIndex);
365 ++historicalShardsToPrepare;
372 return fail(
"maximum number of historical shards reached");
374 if (historicalShardsToPrepare)
379 return fail(
"insufficient storage space available");
382 if (
auto const recentShardsToPrepare =
383 shardIndexes.size() - historicalShardsToPrepare;
384 recentShardsToPrepare)
389 return fail(
"insufficient storage space available");
392 for (
auto const shardIndex : shardIndexes)
394 auto const prepareSuccessful =
397 (void)prepareSuccessful;
398 assert(prepareSuccessful);
422 rs.insert(shardIndex);
434 boost::filesystem::path
const& srcDir)
438 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
445 using namespace boost::filesystem;
448 if (!is_directory(srcDir) || is_empty(srcDir))
451 "invalid source directory " + srcDir.string(),
458 std::string(
". Exception caught in function ") + __func__ +
459 ". Error: " + e.
what(),
472 return fail(
"already exists", lock);
476 return fail(
"was not prepared for import", lock);
478 auto const pathDesignation{
480 if (!pathDesignation)
481 return fail(
"failed to import", lock);
490 auto renameDir = [&](path
const& src, path
const& dst) {
498 std::string(
". Exception caught in function ") + __func__ +
499 ". Error: " + e.
what(),
506 if (!renameDir(srcDir, dstDir))
510 auto shard{std::make_unique<Shard>(
511 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
516 renameDir(dstDir, srcDir);
520 auto const [it, inserted] = [&]() {
523 return shards_.emplace(shardIndex, std::move(shard));
529 renameDir(dstDir, srcDir);
547 auto const it{
shards_.find(shardIndex)};
554 switch (shard->getState())
559 if (shard->containsLedger(ledgerSeq))
572 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
576 auto ledger{std::make_shared<Ledger>(
581 if (ledger->info().seq != ledgerSeq)
584 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
586 if (ledger->info().hash != hash)
589 "encountered invalid ledger hash " +
to_string(hash) +
594 if (!ledger->stateMap().fetchRoot(
595 SHAMapHash{ledger->info().accountHash},
nullptr))
598 "is missing root STATE node on hash " +
to_string(hash) +
602 if (ledger->info().txHash.isNonZero())
604 if (!ledger->txMap().fetchRoot(
608 "is missing root TXN node on hash " +
to_string(hash) +
618 auto const ledgerSeq{ledger->info().seq};
619 if (ledger->info().hash.isZero())
621 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
625 if (ledger->info().accountHash.isZero())
627 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
631 if (ledger->stateMap().getHash().isNonZero() &&
632 !ledger->stateMap().isValid())
634 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
638 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
640 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
654 <<
"shard " << shardIndex <<
" is not being acquired";
658 auto const it{
shards_.find(shardIndex)};
662 <<
"shard " << shardIndex <<
" is not being acquired";
668 if (shard->containsLedger(ledgerSeq))
670 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
713 for (
auto const& e : shards)
718 if (
auto const shard{e.lock()}; shard)
721 JLOG(
j_.
warn()) <<
" shard " << shardIndex <<
" unexpired";
729 JLOG(
j_.
warn()) <<
" Children failed to stop";
744 JLOG(
j_.
error()) <<
"invalid source database";
751 auto loadLedger = [&](
bool ascendSort =
752 true) -> boost::optional<std::uint32_t> {
756 "WHERE LedgerSeq >= " +
758 " order by LedgerSeq " + (ascendSort ?
"asc" :
"desc") +
762 if (!ledger || ledgerSeq == 0)
764 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
765 " the SQLite database to import";
772 auto ledgerSeq{loadLedger()};
782 ledgerSeq = loadLedger(
false);
791 if (latestIndex < earliestIndex)
793 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
794 " the SQLite database to import";
803 shardIndex <= latestIndex;
806 auto const pathDesignation =
809 if (!pathDesignation)
812 auto const needsHistoricalPath =
819 <<
"shard " << shardIndex <<
" already being acquired";
827 <<
"shard " << shardIndex <<
" already being imported";
834 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
843 auto const numLedgers{
847 if (ledgerHashes.size() != numLedgers)
855 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
856 <<
" mismatches node store";
870 std::make_unique<Shard>(
app_, *
this, shardIndex, path,
j_)};
881 JLOG(
j_.
error()) <<
"shard " << shardIndex
882 <<
" failed to create temp marker file";
883 shard->removeOnDestroy();
891 boost::optional<uint256> lastLedgerHash;
893 while (
auto const ledgerSeq = shard->prepare())
896 if (!ledger || ledger->info().seq != ledgerSeq)
899 auto const result{shard->storeLedger(ledger, recentStored)};
904 if (!shard->setLedgerStored(ledger))
907 if (!lastLedgerHash && ledgerSeq ==
lastLedgerSeq(shardIndex))
908 lastLedgerHash = ledger->info().hash;
910 recentStored = std::move(ledger);
913 using namespace boost::filesystem;
926 if (shard->storeNodeObject(nodeObject))
932 remove_all(markerFile);
934 JLOG(
j_.
debug()) <<
"shard " << shardIndex
935 <<
" was successfully imported";
937 shards_.emplace(shardIndex, std::move(shard))
945 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
946 <<
". Exception caught in function "
947 << __func__ <<
". Error: " << e.
what();
955 <<
"shard " << shardIndex <<
" failed to import";
956 shard->removeOnDestroy();
980 return shard->getWriteLoad();
997 <<
"shard " << shardIndex <<
" is not being acquired";
1001 auto const it{
shards_.find(shardIndex)};
1005 <<
"shard " << shardIndex <<
" is not being acquired";
1011 auto const nodeObject{
1013 if (shard->storeNodeObject(nodeObject))
1034 if (shard->fetchNodeObjectFromCache(hash, nodeObject))
1045 auto const ledgerSeq{srcLedger->info().seq};
1055 <<
"shard " << shardIndex <<
" is not being acquired";
1059 auto const it{
shards_.find(shardIndex)};
1063 <<
"shard " << shardIndex <<
" is not being acquired";
1069 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1071 if (result.error || result.count == 0 || result.size == 0)
1086 auto const it{
shards_.find(shardIndex)};
1092 return shard->getDesiredAsyncReadCount();
1109 return shard->getCacheHitRate();
1128 for (
auto const& e : shards)
1130 if (
auto const shard{e.lock()}; shard && shard->isOpen())
1141 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1152 return lhsShard->getLastUse() < rhsShard->getLastUse();
1155 for (
auto it{openFinals.
cbegin()};
1158 if ((*it)->tryClose())
1159 it = openFinals.
erase(it);
1186 get_if_exists<std::uint32_t>(
1187 section,
"earliest_seq", shardDBEarliestSeq);
1190 get_if_exists<std::uint32_t>(
1195 if (shardDBEarliestSeq != nodeDBEarliestSeq)
1199 "] define different 'earliest_seq' values");
1203 using namespace boost::filesystem;
1204 if (!get_if_exists<path>(section,
"path",
dir_))
1205 return fail(
"'path' missing");
1210 Section const& historicalShardPaths =
1211 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1213 auto values = historicalShardPaths.
values();
1215 std::sort(values.begin(), values.end());
1216 values.erase(
std::unique(values.begin(), values.end()), values.end());
1218 for (
auto const& s : values)
1220 auto const dir = path(s);
1224 "the 'path' cannot also be in the "
1225 "'historical_shard_path' section");
1232 if (section.exists(
"ledgers_per_shard"))
1235 if (!config.standalone())
1236 return fail(
"'ledgers_per_shard' only honored in stand alone");
1240 return fail(
"'ledgers_per_shard' must be a multiple of 256");
1247 backendName_ = get<std::string>(section,
"type",
"nudb");
1249 return fail(
"'type' value unsupported");
1264 auto const it{
shards_.find(shardIndex)};
1270 return shard->fetchNodeObject(hash, fetchReport);
1273 boost::optional<std::uint32_t>
1281 auto const maxShardIndex{[
this, validLedgerSeq]() {
1290 if (
shards_.size() >= maxNumShards)
1293 if (maxShardIndex < 1024 ||
1294 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1302 shardIndex <= maxShardIndex;
1312 if (available.
empty())
1315 if (available.
size() == 1)
1316 return available.
front();
1325 for (
int i = 0; i < 40; ++i)
1343 boost::optional<uint256>
const& expectedHash)
1352 auto shard{wptr.lock()};
1355 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1359 if (!shard->finalize(writeSQLite, expectedHash))
1378 if (shard->index() < boundaryIndex)
1382 shard->getDir().parent_path() ==
dir_)
1385 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1386 <<
" is not stored at a historical path";
1394 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1396 auto& recentShard = shard->index() == boundaryIndex
1401 recentShard = shard->index();
1403 if (shard->getDir().parent_path() !=
dir_)
1405 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1406 <<
" is not stored at the path";
1417 protocol::TMPeerShardInfo message;
1419 message.set_nodepubkey(publicKey.data(), publicKey.size());
1422 message, protocol::mtPEER_SHARD_INFO)));
1444 for (
auto const& e : shards)
1446 if (
auto const shard{e.lock()}; shard)
1448 auto const [sz, fd] = shard->getFileInfo();
1463 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1472 <<
"maximum shard store size exceeds available storage space";
1484 rs.insert(e.second->index());
1509 auto const availableSpace =
1510 boost::filesystem::space(path).available;
1518 capacities.
push_back(boost::filesystem::space(
dir_).available);
1530 if (numShards <= shardCap)
1533 numShards -= shardCap;
1538 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1539 <<
". Error: " << e.
what();
1551 if (!shard->setLedgerStored(ledger))
1561 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1571 <<
"shard " << shard->index() <<
" is no longer being acquired";
1594 if ((
shards_.erase(shard->index()) > 0) &&
1601 shard->removeOnDestroy();
1628 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1629 return entry.first < boundaryIndex;
1640 auto const latestShardIndex =
1645 auto const removeShard =
1655 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1660 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1664 auto const keepShard =
1665 [
this, &lock, removeShard, separateHistoricalPath](
1670 <<
"maximum number of historical shards reached";
1672 removeShard(shardIndex);
1675 if (separateHistoricalPath &&
1678 JLOG(
j_.
error()) <<
"insufficient storage space available";
1680 removeShard(shardIndex);
1689 auto const moveShard = [
this,
1695 auto& shard{it->second};
1700 if (!shard->tryClose())
1703 <<
"can't close shard to move to historical path";
1710 boost::filesystem::rename(
1711 shard->getDir().string(),
1716 JLOG(
j_.
error()) <<
"shard " << shardIndex
1717 <<
" failed to move to historical storage";
1723 std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1728 JLOG(
j_.
error()) <<
"shard " << shardIndex
1729 <<
" failed to open in historical storage";
1730 shard->removeOnDestroy();
1737 <<
"can't find shard to move to historical path";
1742 bool const curNotSynched =
1749 if (curNotSynched || prevNotSynched)
1754 if (keepShard(*prev) && separateHistoricalPath)
1765 if (cur == latestShardIndex - 1)
1774 if (keepShard(*cur) && separateHistoricalPath)
1794 auto const isHistoricalShard = shardIndex < boundaryIndex;
1803 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1809 JLOG(
j_.
error()) <<
"insufficient storage space available";
1817 boost::filesystem::path
1825 boost::filesystem::path historicalShardPath;
1834 if (potentialPaths.
empty())
1836 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1841 potentialPaths.
begin(),
1842 potentialPaths.
end(),
1843 &historicalShardPath,
1847 return historicalShardPath;
1862 struct statvfs buffer;
1863 if (statvfs(path.c_str(), &buffer))
1866 <<
"failed to acquire stats for 'historical_shard_path': "
1871 filesystemIDs[buffer.f_fsid].push_back(path.string());
1875 for (
auto const& entry : filesystemIDs)
1878 if (entry.second.size() > 1)
1883 <<
"The following paths correspond to the same filesystem: "
1884 << boost::algorithm::join(entry.second,
", ")
1885 <<
". Each configured historical storage path should"
1886 " be on a unique device or filesystem.";
1907 uniqueCapacities[boost::filesystem::space(path).available].push_back(
1910 for (
auto const& entry : uniqueCapacities)
1913 if (entry.second.size() > 1)
1918 <<
"Each of the following paths have " << entry.first
1919 <<
" bytes free, and may be located on the same device"
1921 << boost::algorithm::join(entry.second,
", ")
1922 <<
". Each configured historical storage path should"
1923 " be on a unique device or file system.";
1944 if (section.empty())
1947 return std::make_unique<DatabaseShardImp>(
1948 app, parent,
"ShardStore", scheduler, readThreads, j);