20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/chrono.h>
26 #include <ripple/basics/random.h>
27 #include <ripple/core/ConfigSections.h>
28 #include <ripple/nodestore/DummyScheduler.h>
29 #include <ripple/nodestore/impl/DatabaseShardImp.h>
30 #include <ripple/overlay/Overlay.h>
31 #include <ripple/overlay/predicates.h>
32 #include <ripple/protocol/HashPrefix.h>
34 #include <boost/algorithm/string/predicate.hpp>
37 #include <sys/statvfs.h>
62 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
68 Throw<std::runtime_error>(
69 "Attempted to create DatabaseShardImp in reporting mode. Reporting "
70 "does not support shards. Remove shards info from config");
81 JLOG(
j_.
error()) <<
"already initialized";
87 JLOG(
j_.
error()) <<
"invalid configuration file settings";
93 using namespace boost::filesystem;
100 for (
auto const& path : paths)
104 if (!is_directory(path))
106 JLOG(
j_.
error()) << path <<
" must be a directory";
110 else if (!create_directories(path))
113 <<
"failed to create path: " + path.string();
125 ctx_ = std::make_unique<nudb::context>();
130 for (
auto const& path : paths)
132 for (
auto const& it : directory_iterator(path))
135 if (!is_directory(it))
139 auto const shardDir{it.path()};
140 auto dirName{shardDir.stem().string()};
142 dirName.begin(), dirName.end(), [](
auto c) {
143 return ::isdigit(static_cast<unsigned char>(c));
154 <<
"shard " << shardIndex
155 <<
" ignored, comes before earliest shard index "
164 <<
"shard " << shardIndex
165 <<
" previously failed import, removing";
166 remove_all(shardDir);
170 auto shard{std::make_shared<Shard>(
171 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
175 shard->removeOnDestroy();
177 <<
"shard " << shardIndex <<
" removed, "
178 << (shard->isLegacy() ?
"legacy" :
"corrupted")
183 switch (shard->getState())
188 shards_.emplace(shardIndex, std::move(shard));
193 shards_.emplace(shardIndex, std::move(shard))
203 <<
"more than one shard being acquired";
207 shards_.emplace(shardIndex, std::move(shard));
213 <<
"shard " << shardIndex <<
" invalid state";
221 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
222 <<
". Error: " << e.
what();
247 return it->second->prepare();
262 JLOG(
j_.
debug()) <<
"no new shards to add";
270 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
275 if (!pathDesignation)
278 auto const needsHistoricalPath =
281 auto shard = [
this, shardIndex, needsHistoricalPath] {
283 return std::make_unique<Shard>(
294 auto const ledgerSeq{shard->prepare()};
297 shards_.emplace(*shardIndex, std::move(shard));
306 auto fail = [j =
j_, &shardIndexes](
309 auto multipleIndexPrequel = [&shardIndexes] {
312 shardIndexes.
begin(),
314 indexesAsString.
begin(),
315 [](uint32_t
const index) { return std::to_string(index); });
318 (shardIndexes.
size() > 1 ?
"s " :
" ") +
319 boost::algorithm::join(indexesAsString,
", ");
324 : multipleIndexPrequel();
326 JLOG(j.error()) << prequel <<
" " << msg;
334 return fail(
"cannot be stored at this time");
336 auto historicalShardsToPrepare = 0;
338 for (
auto const shardIndex : shardIndexes)
343 "comes before earliest shard index " +
354 return fail(
"invalid index", shardIndex);
361 return fail(
"invalid index", shardIndex);
365 return fail(
"is already stored", shardIndex);
368 return fail(
"is already queued for import", shardIndex);
373 ++historicalShardsToPrepare;
380 return fail(
"maximum number of historical shards reached");
382 if (historicalShardsToPrepare)
387 return fail(
"insufficient storage space available");
390 if (
auto const recentShardsToPrepare =
391 shardIndexes.size() - historicalShardsToPrepare;
392 recentShardsToPrepare)
397 return fail(
"insufficient storage space available");
400 for (
auto const shardIndex : shardIndexes)
402 auto const prepareSuccessful =
405 (void)prepareSuccessful;
406 assert(prepareSuccessful);
430 rs.insert(shardIndex);
442 boost::filesystem::path
const& srcDir)
446 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
453 using namespace boost::filesystem;
456 if (!is_directory(srcDir) || is_empty(srcDir))
459 "invalid source directory " + srcDir.string(),
466 std::string(
". Exception caught in function ") + __func__ +
467 ". Error: " + e.
what(),
480 return fail(
"already exists", lock);
484 return fail(
"was not prepared for import", lock);
486 auto const pathDesignation{
488 if (!pathDesignation)
489 return fail(
"failed to import", lock);
498 auto renameDir = [&](path
const& src, path
const& dst) {
506 std::string(
". Exception caught in function ") + __func__ +
507 ". Error: " + e.
what(),
514 if (!renameDir(srcDir, dstDir))
518 auto shard{std::make_unique<Shard>(
519 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
524 renameDir(dstDir, srcDir);
528 auto const [it, inserted] = [&]() {
531 return shards_.emplace(shardIndex, std::move(shard));
537 renameDir(dstDir, srcDir);
555 auto const it{
shards_.find(shardIndex)};
562 switch (shard->getState())
567 if (shard->containsLedger(ledgerSeq))
580 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
584 auto ledger{std::make_shared<Ledger>(
589 if (ledger->info().seq != ledgerSeq)
592 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
594 if (ledger->info().hash != hash)
597 "encountered invalid ledger hash " +
to_string(hash) +
602 if (!ledger->stateMap().fetchRoot(
603 SHAMapHash{ledger->info().accountHash},
nullptr))
606 "is missing root STATE node on hash " +
to_string(hash) +
610 if (ledger->info().txHash.isNonZero())
612 if (!ledger->txMap().fetchRoot(
616 "is missing root TXN node on hash " +
to_string(hash) +
626 auto const ledgerSeq{ledger->info().seq};
627 if (ledger->info().hash.isZero())
629 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
633 if (ledger->info().accountHash.isZero())
635 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
639 if (ledger->stateMap().getHash().isNonZero() &&
640 !ledger->stateMap().isValid())
642 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
646 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
648 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
662 <<
"shard " << shardIndex <<
" is not being acquired";
666 auto const it{
shards_.find(shardIndex)};
670 <<
"shard " << shardIndex <<
" is not being acquired";
676 if (shard->containsLedger(ledgerSeq))
678 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
721 for (
auto const& e : shards)
726 if (
auto const shard{e.lock()}; shard)
729 JLOG(
j_.
warn()) <<
" shard " << shardIndex <<
" unexpired";
747 JLOG(
j_.
error()) <<
"invalid source database";
754 auto loadLedger = [&](
bool ascendSort =
776 ledgerSeq = info->seq;
778 if (!ledger || ledgerSeq == 0)
780 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
781 " the SQLite database to import";
788 auto ledgerSeq{loadLedger()};
798 ledgerSeq = loadLedger(
false);
807 if (latestIndex < earliestIndex)
809 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
810 " the SQLite database to import";
819 shardIndex <= latestIndex;
822 auto const pathDesignation =
825 if (!pathDesignation)
828 auto const needsHistoricalPath =
835 <<
"shard " << shardIndex <<
" already being acquired";
843 <<
"shard " << shardIndex <<
" already being imported";
850 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
859 auto const numLedgers{
865 if (ledgerHashes.size() != numLedgers)
873 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
874 <<
" mismatches node store";
888 std::make_unique<Shard>(
app_, *
this, shardIndex, path,
j_)};
899 JLOG(
j_.
error()) <<
"shard " << shardIndex
900 <<
" failed to create temp marker file";
901 shard->removeOnDestroy();
911 while (
auto const ledgerSeq = shard->prepare())
914 if (!ledger || ledger->info().seq != ledgerSeq)
917 auto const result{shard->storeLedger(ledger, recentStored)};
922 if (!shard->setLedgerStored(ledger))
925 if (!lastLedgerHash && ledgerSeq ==
lastLedgerSeq(shardIndex))
926 lastLedgerHash = ledger->info().hash;
928 recentStored = std::move(ledger);
931 using namespace boost::filesystem;
944 if (shard->storeNodeObject(nodeObject))
950 remove_all(markerFile);
952 JLOG(
j_.
debug()) <<
"shard " << shardIndex
953 <<
" was successfully imported";
955 shards_.emplace(shardIndex, std::move(shard))
966 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
967 <<
". Exception caught in function "
968 << __func__ <<
". Error: " << e.
what();
976 <<
"shard " << shardIndex <<
" failed to import";
977 shard->removeOnDestroy();
1001 return shard->getWriteLoad();
1018 <<
"shard " << shardIndex <<
" is not being acquired";
1022 auto const it{
shards_.find(shardIndex)};
1026 <<
"shard " << shardIndex <<
" is not being acquired";
1032 auto const nodeObject{
1034 if (shard->storeNodeObject(nodeObject))
1041 auto const ledgerSeq{srcLedger->info().seq};
1051 <<
"shard " << shardIndex <<
" is not being acquired";
1055 auto const it{
shards_.find(shardIndex)};
1059 <<
"shard " << shardIndex <<
" is not being acquired";
1065 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1067 if (result.error || result.count == 0 || result.size == 0)
1089 for (
auto const& e : shards)
1091 if (
auto const shard{e.lock()}; shard && shard->isOpen())
1102 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1113 return lhsShard->getLastUse() < rhsShard->getLastUse();
1116 for (
auto it{openFinals.
cbegin()};
1119 if ((*it)->tryClose())
1120 it = openFinals.
erase(it);
1147 get_if_exists<std::uint32_t>(
1148 section,
"earliest_seq", shardDBEarliestSeq);
1151 get_if_exists<std::uint32_t>(
1156 if (shardDBEarliestSeq != nodeDBEarliestSeq)
1160 "] define different 'earliest_seq' values");
1164 using namespace boost::filesystem;
1165 if (!get_if_exists<path>(section,
"path",
dir_))
1166 return fail(
"'path' missing");
1171 Section const& historicalShardPaths =
1172 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1174 auto values = historicalShardPaths.
values();
1176 std::sort(values.begin(), values.end());
1177 values.erase(
std::unique(values.begin(), values.end()), values.end());
1179 for (
auto const& s : values)
1181 auto const dir = path(s);
1185 "the 'path' cannot also be in the "
1186 "'historical_shard_path' section");
1193 if (section.exists(
"ledgers_per_shard"))
1196 if (!config.standalone())
1197 return fail(
"'ledgers_per_shard' only honored in stand alone");
1201 return fail(
"'ledgers_per_shard' must be a multiple of 256");
1208 backendName_ = get<std::string>(section,
"type",
"nudb");
1210 return fail(
"'type' value unsupported");
1225 auto const it{
shards_.find(shardIndex)};
1231 return shard->fetchNodeObject(hash, fetchReport);
1240 return std::nullopt;
1242 auto const maxShardIndex{[
this, validLedgerSeq]() {
1251 if (
shards_.size() >= maxNumShards)
1252 return std::nullopt;
1254 if (maxShardIndex < 1024 ||
1255 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1263 shardIndex <= maxShardIndex;
1274 return std::nullopt;
1286 for (
int i = 0; i < 40; ++i)
1297 return std::nullopt;
1313 auto shard{wptr.lock()};
1316 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1320 if (!shard->finalize(writeSQLite, expectedHash))
1339 if (shard->index() < boundaryIndex)
1343 shard->getDir().parent_path() ==
dir_)
1346 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1347 <<
" is not stored at a historical path";
1355 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1357 auto& recentShard = shard->index() == boundaryIndex
1362 recentShard = shard->index();
1364 if (shard->getDir().parent_path() !=
dir_)
1366 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1367 <<
" is not stored at the path";
1378 protocol::TMPeerShardInfo message;
1380 message.set_nodepubkey(publicKey.data(), publicKey.size());
1383 message, protocol::mtPEER_SHARD_INFO)));
1405 for (
auto const& e : shards)
1407 if (
auto const shard{e.lock()}; shard)
1409 auto const [sz, fd] = shard->getFileInfo();
1432 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1443 <<
"maximum shard store size exceeds available storage space";
1457 rs.insert(e.second->index());
1482 auto const availableSpace =
1483 boost::filesystem::space(path).available;
1503 if (numShards <= shardCap)
1506 numShards -= shardCap;
1511 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1512 <<
". Error: " << e.
what();
1524 if (!shard->setLedgerStored(ledger))
1534 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1544 <<
"shard " << shard->index() <<
" is no longer being acquired";
1567 if ((
shards_.erase(shard->index()) > 0) &&
1574 shard->removeOnDestroy();
1604 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1605 return entry.first < boundaryIndex;
1616 auto const latestShardIndex =
1621 auto const removeShard =
1631 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1636 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1640 auto const keepShard =
1641 [
this, &lock, removeShard, separateHistoricalPath](
1646 <<
"maximum number of historical shards reached";
1648 removeShard(shardIndex);
1651 if (separateHistoricalPath &&
1654 JLOG(
j_.
error()) <<
"insufficient storage space available";
1656 removeShard(shardIndex);
1665 auto const moveShard = [
this,
1671 auto& shard{it->second};
1676 if (!shard->tryClose())
1679 <<
"can't close shard to move to historical path";
1686 boost::filesystem::rename(
1687 shard->getDir().string(),
1692 JLOG(
j_.
error()) <<
"shard " << shardIndex
1693 <<
" failed to move to historical storage";
1699 std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1704 JLOG(
j_.
error()) <<
"shard " << shardIndex
1705 <<
" failed to open in historical storage";
1706 shard->removeOnDestroy();
1713 <<
"can't find shard to move to historical path";
1718 bool const curNotSynched =
1725 if (curNotSynched || prevNotSynched)
1730 if (keepShard(*prev) && separateHistoricalPath)
1735 prev = std::nullopt;
1741 if (cur == latestShardIndex - 1)
1750 if (keepShard(*cur) && separateHistoricalPath)
1770 auto const isHistoricalShard = shardIndex < boundaryIndex;
1779 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1781 return std::nullopt;
1785 JLOG(
j_.
error()) <<
"insufficient storage space available";
1787 return std::nullopt;
1793 boost::filesystem::path
1801 boost::filesystem::path historicalShardPath;
1810 if (potentialPaths.
empty())
1812 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1817 potentialPaths.
begin(),
1818 potentialPaths.
end(),
1819 &historicalShardPath,
1823 return historicalShardPath;
1838 struct statvfs buffer;
1839 if (statvfs(path.c_str(), &buffer))
1842 <<
"failed to acquire stats for 'historical_shard_path': "
1847 filesystemIDs[buffer.f_fsid].push_back(path.string());
1851 for (
auto const& entry : filesystemIDs)
1854 if (entry.second.size() > 1)
1859 <<
"The following paths correspond to the same filesystem: "
1860 << boost::algorithm::join(entry.second,
", ")
1861 <<
". Each configured historical storage path should"
1862 " be on a unique device or filesystem.";
1883 uniqueCapacities[boost::filesystem::space(path).available].push_back(
1886 for (
auto const& entry : uniqueCapacities)
1889 if (entry.second.size() > 1)
1894 <<
"Each of the following paths have " << entry.first
1895 <<
" bytes free, and may be located on the same device"
1897 << boost::algorithm::join(entry.second,
", ")
1898 <<
". Each configured historical storage path should"
1899 " be on a unique device or file system.";
1916 if (
shards_.count(shardIndex) &&
1919 return shards_[shardIndex]->callForLedgerSQL(callback);
1934 if (
shards_.count(shardIndex) &&
1937 return shards_[shardIndex]->callForTransactionSQL(callback);
1955 it =
shards_.lower_bound(*minShardIndex);
1959 for (; it != eit; it++)
1963 if (!visit(*it->second))
1977 minShardIndex, [&callback](
Shard& shard) ->
bool {
1989 minShardIndex, [&callback](
Shard& shard) ->
bool {
2010 for (; it != eit; it++)
2013 (!maxShardIndex || it->first <= *maxShardIndex))
2015 if (!visit(*it->second))
2058 if (section.empty())
2061 return std::make_unique<DatabaseShardImp>(
2062 app, parent,
"ShardStore", scheduler, readThreads, j);