20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/core/ConfigSections.h>
27 #include <ripple/nodestore/DummyScheduler.h>
28 #include <ripple/nodestore/impl/DatabaseShardImp.h>
29 #include <ripple/overlay/Overlay.h>
30 #include <ripple/overlay/predicates.h>
31 #include <ripple/protocol/HashPrefix.h>
33 #include <boost/algorithm/string/predicate.hpp>
36 #include <sys/statvfs.h>
60 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
73 JLOG(
j_.
error()) <<
"already initialized";
79 JLOG(
j_.
error()) <<
"invalid configuration file settings";
85 using namespace boost::filesystem;
92 for (
auto const& path : paths)
96 if (!is_directory(path))
98 JLOG(
j_.
error()) << path <<
" must be a directory";
102 else if (!create_directories(path))
105 <<
"failed to create path: " + path.string();
117 ctx_ = std::make_unique<nudb::context>();
122 for (
auto const& path : paths)
124 for (
auto const& it : directory_iterator(path))
127 if (!is_directory(it))
131 auto const shardDir{it.path()};
132 auto dirName{shardDir.stem().string()};
134 dirName.begin(), dirName.end(), [](
auto c) {
135 return ::isdigit(static_cast<unsigned char>(c));
146 <<
"shard " << shardIndex
147 <<
" ignored, comes before earliest shard index "
156 <<
"shard " << shardIndex
157 <<
" previously failed import, removing";
158 remove_all(shardDir);
162 auto shard{std::make_shared<Shard>(
163 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
167 shard->removeOnDestroy();
169 <<
"shard " << shardIndex <<
" removed, "
170 << (shard->isLegacy() ?
"legacy" :
"corrupted")
175 switch (shard->getState())
180 shards_.emplace(shardIndex, std::move(shard));
185 shards_.emplace(shardIndex, std::move(shard))
195 <<
"more than one shard being acquired";
199 shards_.emplace(shardIndex, std::move(shard));
205 <<
"shard " << shardIndex <<
" invalid state";
213 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
214 <<
". Error: " << e.
what();
227 boost::optional<std::uint32_t>
230 boost::optional<std::uint32_t> shardIndex;
239 return it->second->prepare();
254 JLOG(
j_.
debug()) <<
"no new shards to add";
262 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
267 if (!pathDesignation)
270 auto const needsHistoricalPath =
273 auto shard = [
this, shardIndex, needsHistoricalPath] {
275 return std::make_unique<Shard>(
286 auto const ledgerSeq{shard->prepare()};
289 shards_.emplace(*shardIndex, std::move(shard));
298 auto fail = [j =
j_, &shardIndexes](
300 boost::optional<std::uint32_t> shardIndex = boost::none) {
301 auto multipleIndexPrequel = [&shardIndexes] {
304 shardIndexes.
begin(),
306 indexesAsString.
begin(),
307 [](uint32_t
const index) { return std::to_string(index); });
310 (shardIndexes.
size() > 1 ?
"s " :
" ") +
311 boost::algorithm::join(indexesAsString,
", ");
316 : multipleIndexPrequel();
318 JLOG(j.error()) << prequel <<
" " << msg;
326 return fail(
"cannot be stored at this time");
328 auto historicalShardsToPrepare = 0;
330 for (
auto const shardIndex : shardIndexes)
335 "comes before earliest shard index " +
346 return fail(
"invalid index", shardIndex);
353 return fail(
"invalid index", shardIndex);
357 return fail(
"is already stored", shardIndex);
360 return fail(
"is already queued for import", shardIndex);
365 ++historicalShardsToPrepare;
372 return fail(
"maximum number of historical shards reached");
374 if (historicalShardsToPrepare)
379 return fail(
"insufficient storage space available");
382 if (
auto const recentShardsToPrepare =
383 shardIndexes.size() - historicalShardsToPrepare;
384 recentShardsToPrepare)
389 return fail(
"insufficient storage space available");
392 for (
auto const shardIndex : shardIndexes)
394 auto const prepareSuccessful =
397 (void)prepareSuccessful;
398 assert(prepareSuccessful);
422 rs.insert(shardIndex);
434 boost::filesystem::path
const& srcDir)
438 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
445 using namespace boost::filesystem;
448 if (!is_directory(srcDir) || is_empty(srcDir))
451 "invalid source directory " + srcDir.string(),
458 std::string(
". Exception caught in function ") + __func__ +
459 ". Error: " + e.
what(),
472 return fail(
"already exists", lock);
476 return fail(
"was not prepared for import", lock);
478 auto const pathDesignation{
480 if (!pathDesignation)
481 return fail(
"failed to import", lock);
490 auto renameDir = [&](path
const& src, path
const& dst) {
498 std::string(
". Exception caught in function ") + __func__ +
499 ". Error: " + e.
what(),
506 if (!renameDir(srcDir, dstDir))
510 auto shard{std::make_unique<Shard>(
511 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
516 renameDir(dstDir, srcDir);
520 auto const [it, inserted] = [&]() {
523 return shards_.emplace(shardIndex, std::move(shard));
529 renameDir(dstDir, srcDir);
547 auto const it{
shards_.find(shardIndex)};
554 switch (shard->getState())
559 if (shard->containsLedger(ledgerSeq))
572 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
576 auto ledger{std::make_shared<Ledger>(
581 if (ledger->info().seq != ledgerSeq)
584 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
586 if (ledger->info().hash != hash)
589 "encountered invalid ledger hash " +
to_string(hash) +
594 if (!ledger->stateMap().fetchRoot(
595 SHAMapHash{ledger->info().accountHash},
nullptr))
598 "is missing root STATE node on hash " +
to_string(hash) +
602 if (ledger->info().txHash.isNonZero())
604 if (!ledger->txMap().fetchRoot(
608 "is missing root TXN node on hash " +
to_string(hash) +
618 auto const ledgerSeq{ledger->info().seq};
619 if (ledger->info().hash.isZero())
621 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
625 if (ledger->info().accountHash.isZero())
627 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
631 if (ledger->stateMap().getHash().isNonZero() &&
632 !ledger->stateMap().isValid())
634 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
638 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
640 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
654 <<
"shard " << shardIndex <<
" is not being acquired";
658 auto const it{
shards_.find(shardIndex)};
662 <<
"shard " << shardIndex <<
" is not being acquired";
668 if (shard->containsLedger(ledgerSeq))
670 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
713 for (
auto const& e : shards)
718 if (
auto const shard{e.lock()}; shard)
721 JLOG(
j_.
warn()) <<
" shard " << shardIndex <<
" unexpired";
729 JLOG(
j_.
warn()) <<
" Children failed to stop";
744 JLOG(
j_.
error()) <<
"invalid source database";
751 auto loadLedger = [&](
bool ascendSort =
752 true) -> boost::optional<std::uint32_t> {
756 "WHERE LedgerSeq >= " +
758 " order by LedgerSeq " + (ascendSort ?
"asc" :
"desc") +
762 if (!ledger || ledgerSeq == 0)
764 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
765 " the SQLite database to import";
772 auto ledgerSeq{loadLedger()};
782 ledgerSeq = loadLedger(
false);
791 if (latestIndex < earliestIndex)
793 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
794 " the SQLite database to import";
803 shardIndex <= latestIndex;
806 auto const pathDesignation =
809 if (!pathDesignation)
812 auto const needsHistoricalPath =
819 <<
"shard " << shardIndex <<
" already being acquired";
827 <<
"shard " << shardIndex <<
" already being imported";
834 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
843 auto const numLedgers{
847 if (ledgerHashes.size() != numLedgers)
855 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
856 <<
" mismatches node store";
870 std::make_unique<Shard>(
app_, *
this, shardIndex, path,
j_)};
881 JLOG(
j_.
error()) <<
"shard " << shardIndex
882 <<
" failed to create temp marker file";
883 shard->removeOnDestroy();
891 boost::optional<uint256> lastLedgerHash;
893 while (
auto const ledgerSeq = shard->prepare())
896 if (!ledger || ledger->info().seq != ledgerSeq)
899 auto const result{shard->storeLedger(ledger, recentStored)};
904 if (!shard->setLedgerStored(ledger))
907 if (!lastLedgerHash && ledgerSeq ==
lastLedgerSeq(shardIndex))
908 lastLedgerHash = ledger->info().hash;
910 recentStored = std::move(ledger);
913 using namespace boost::filesystem;
926 if (shard->storeNodeObject(nodeObject))
932 remove_all(markerFile);
934 JLOG(
j_.
debug()) <<
"shard " << shardIndex
935 <<
" was successfully imported";
937 shards_.emplace(shardIndex, std::move(shard))
948 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
949 <<
". Exception caught in function "
950 << __func__ <<
". Error: " << e.
what();
958 <<
"shard " << shardIndex <<
" failed to import";
959 shard->removeOnDestroy();
983 return shard->getWriteLoad();
1000 <<
"shard " << shardIndex <<
" is not being acquired";
1004 auto const it{
shards_.find(shardIndex)};
1008 <<
"shard " << shardIndex <<
" is not being acquired";
1014 auto const nodeObject{
1016 if (shard->storeNodeObject(nodeObject))
1037 if (shard->fetchNodeObjectFromCache(hash, nodeObject))
1048 auto const ledgerSeq{srcLedger->info().seq};
1058 <<
"shard " << shardIndex <<
" is not being acquired";
1062 auto const it{
shards_.find(shardIndex)};
1066 <<
"shard " << shardIndex <<
" is not being acquired";
1072 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1074 if (result.error || result.count == 0 || result.size == 0)
1089 auto const it{
shards_.find(shardIndex)};
1095 return shard->getDesiredAsyncReadCount();
1112 return shard->getCacheHitRate();
1131 for (
auto const& e : shards)
1133 if (
auto const shard{e.lock()}; shard && shard->isOpen())
1144 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1155 return lhsShard->getLastUse() < rhsShard->getLastUse();
1158 for (
auto it{openFinals.
cbegin()};
1161 if ((*it)->tryClose())
1162 it = openFinals.
erase(it);
1189 get_if_exists<std::uint32_t>(
1190 section,
"earliest_seq", shardDBEarliestSeq);
1193 get_if_exists<std::uint32_t>(
1198 if (shardDBEarliestSeq != nodeDBEarliestSeq)
1202 "] define different 'earliest_seq' values");
1206 using namespace boost::filesystem;
1207 if (!get_if_exists<path>(section,
"path",
dir_))
1208 return fail(
"'path' missing");
1213 Section const& historicalShardPaths =
1214 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1216 auto values = historicalShardPaths.
values();
1218 std::sort(values.begin(), values.end());
1219 values.erase(
std::unique(values.begin(), values.end()), values.end());
1221 for (
auto const& s : values)
1223 auto const dir = path(s);
1227 "the 'path' cannot also be in the "
1228 "'historical_shard_path' section");
1235 if (section.exists(
"ledgers_per_shard"))
1238 if (!config.standalone())
1239 return fail(
"'ledgers_per_shard' only honored in stand alone");
1243 return fail(
"'ledgers_per_shard' must be a multiple of 256");
1250 backendName_ = get<std::string>(section,
"type",
"nudb");
1252 return fail(
"'type' value unsupported");
1267 auto const it{
shards_.find(shardIndex)};
1273 return shard->fetchNodeObject(hash, fetchReport);
1276 boost::optional<std::uint32_t>
1284 auto const maxShardIndex{[
this, validLedgerSeq]() {
1293 if (
shards_.size() >= maxNumShards)
1296 if (maxShardIndex < 1024 ||
1297 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1305 shardIndex <= maxShardIndex;
1315 if (available.
empty())
1318 if (available.
size() == 1)
1319 return available.
front();
1328 for (
int i = 0; i < 40; ++i)
1346 boost::optional<uint256>
const& expectedHash)
1355 auto shard{wptr.lock()};
1358 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1362 if (!shard->finalize(writeSQLite, expectedHash))
1381 if (shard->index() < boundaryIndex)
1385 shard->getDir().parent_path() ==
dir_)
1388 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1389 <<
" is not stored at a historical path";
1397 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1399 auto& recentShard = shard->index() == boundaryIndex
1404 recentShard = shard->index();
1406 if (shard->getDir().parent_path() !=
dir_)
1408 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1409 <<
" is not stored at the path";
1420 protocol::TMPeerShardInfo message;
1422 message.set_nodepubkey(publicKey.data(), publicKey.size());
1425 message, protocol::mtPEER_SHARD_INFO)));
1447 for (
auto const& e : shards)
1449 if (
auto const shard{e.lock()}; shard)
1451 auto const [sz, fd] = shard->getFileInfo();
1474 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1485 <<
"maximum shard store size exceeds available storage space";
1499 rs.insert(e.second->index());
1524 auto const availableSpace =
1525 boost::filesystem::space(path).available;
1533 capacities.
push_back(boost::filesystem::space(
dir_).available);
1545 if (numShards <= shardCap)
1548 numShards -= shardCap;
1553 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1554 <<
". Error: " << e.
what();
1566 if (!shard->setLedgerStored(ledger))
1576 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1586 <<
"shard " << shard->index() <<
" is no longer being acquired";
1609 if ((
shards_.erase(shard->index()) > 0) &&
1616 shard->removeOnDestroy();
1646 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1647 return entry.first < boundaryIndex;
1658 auto const latestShardIndex =
1663 auto const removeShard =
1673 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1678 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1682 auto const keepShard =
1683 [
this, &lock, removeShard, separateHistoricalPath](
1688 <<
"maximum number of historical shards reached";
1690 removeShard(shardIndex);
1693 if (separateHistoricalPath &&
1696 JLOG(
j_.
error()) <<
"insufficient storage space available";
1698 removeShard(shardIndex);
1707 auto const moveShard = [
this,
1713 auto& shard{it->second};
1718 if (!shard->tryClose())
1721 <<
"can't close shard to move to historical path";
1728 boost::filesystem::rename(
1729 shard->getDir().string(),
1734 JLOG(
j_.
error()) <<
"shard " << shardIndex
1735 <<
" failed to move to historical storage";
1741 std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1746 JLOG(
j_.
error()) <<
"shard " << shardIndex
1747 <<
" failed to open in historical storage";
1748 shard->removeOnDestroy();
1755 <<
"can't find shard to move to historical path";
1760 bool const curNotSynched =
1767 if (curNotSynched || prevNotSynched)
1772 if (keepShard(*prev) && separateHistoricalPath)
1783 if (cur == latestShardIndex - 1)
1792 if (keepShard(*cur) && separateHistoricalPath)
1812 auto const isHistoricalShard = shardIndex < boundaryIndex;
1821 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1827 JLOG(
j_.
error()) <<
"insufficient storage space available";
1835 boost::filesystem::path
1843 boost::filesystem::path historicalShardPath;
1852 if (potentialPaths.
empty())
1854 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1859 potentialPaths.
begin(),
1860 potentialPaths.
end(),
1861 &historicalShardPath,
1865 return historicalShardPath;
1880 struct statvfs buffer;
1881 if (statvfs(path.c_str(), &buffer))
1884 <<
"failed to acquire stats for 'historical_shard_path': "
1889 filesystemIDs[buffer.f_fsid].push_back(path.string());
1893 for (
auto const& entry : filesystemIDs)
1896 if (entry.second.size() > 1)
1901 <<
"The following paths correspond to the same filesystem: "
1902 << boost::algorithm::join(entry.second,
", ")
1903 <<
". Each configured historical storage path should"
1904 " be on a unique device or filesystem.";
1925 uniqueCapacities[boost::filesystem::space(path).available].push_back(
1928 for (
auto const& entry : uniqueCapacities)
1931 if (entry.second.size() > 1)
1936 <<
"Each of the following paths have " << entry.first
1937 <<
" bytes free, and may be located on the same device"
1939 << boost::algorithm::join(entry.second,
", ")
1940 <<
". Each configured historical storage path should"
1941 " be on a unique device or file system.";
1962 if (section.empty())
1965 return std::make_unique<DatabaseShardImp>(
1966 app, parent,
"ShardStore", scheduler, readThreads, j);