20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/app/rdb/backend/SQLiteDatabase.h>
24 #include <ripple/basics/ByteUtilities.h>
25 #include <ripple/basics/RangeSet.h>
26 #include <ripple/basics/chrono.h>
27 #include <ripple/basics/random.h>
28 #include <ripple/core/ConfigSections.h>
29 #include <ripple/nodestore/DummyScheduler.h>
30 #include <ripple/nodestore/impl/DatabaseShardImp.h>
31 #include <ripple/overlay/Overlay.h>
32 #include <ripple/overlay/predicates.h>
33 #include <ripple/protocol/HashPrefix.h>
34 #include <ripple/protocol/LedgerHeader.h>
35 #include <ripple/protocol/digest.h>
37 #include <boost/algorithm/string/predicate.hpp>
40 #include <sys/statvfs.h>
58 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
64 Throw<std::runtime_error>(
65 "Attempted to create DatabaseShardImp in reporting mode. Reporting "
66 "does not support shards. Remove shards info from config");
77 JLOG(
j_.
error()) <<
"already initialized";
83 JLOG(
j_.
error()) <<
"invalid configuration file settings";
89 using namespace boost::filesystem;
96 for (
auto const& path : paths)
100 if (!is_directory(path))
102 JLOG(
j_.
error()) << path <<
" must be a directory";
106 else if (!create_directories(path))
109 <<
"failed to create path: " + path.string();
121 ctx_ = std::make_unique<nudb::context>();
126 for (
auto const& path : paths)
128 for (
auto const& it : directory_iterator(path))
131 if (!is_directory(it))
135 auto const shardDir{it.path()};
136 auto dirName{shardDir.stem().string()};
138 dirName.begin(), dirName.end(), [](
auto c) {
139 return ::isdigit(static_cast<unsigned char>(c));
150 <<
"shard " << shardIndex
151 <<
" ignored, comes before earliest shard index "
160 <<
"shard " << shardIndex
161 <<
" previously failed database import, removing";
162 remove_all(shardDir);
166 auto shard{std::make_shared<Shard>(
167 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
171 shard->removeOnDestroy();
173 <<
"shard " << shardIndex <<
" removed, "
174 << (shard->isLegacy() ?
"legacy" :
"corrupted")
179 switch (shard->getState())
184 shards_.emplace(shardIndex, std::move(shard));
189 shards_.emplace(shardIndex, std::move(shard))
199 <<
"more than one shard being acquired";
203 shards_.emplace(shardIndex, std::move(shard));
209 <<
"shard " << shardIndex <<
" invalid state";
217 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
218 <<
". Error: " << e.
what();
241 return it->second->prepare();
256 JLOG(
j_.
debug()) <<
"no new shards to add";
264 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
269 if (!pathDesignation)
272 auto const needsHistoricalPath =
275 auto shard = [
this, shardIndex, needsHistoricalPath] {
277 return std::make_unique<Shard>(
288 auto const ledgerSeq{shard->prepare()};
291 shards_.emplace(*shardIndex, std::move(shard));
302 auto fail = [j =
j_, &shardIndexes](
305 auto multipleIndexPrequel = [&shardIndexes] {
308 shardIndexes.
begin(),
310 indexesAsString.
begin(),
311 [](uint32_t
const index) { return std::to_string(index); });
314 (shardIndexes.
size() > 1 ?
"s " :
" ") +
315 boost::algorithm::join(indexesAsString,
", ");
318 JLOG(j.error()) << (shardIndex ?
"shard " +
std::to_string(*shardIndex)
319 : multipleIndexPrequel())
324 if (shardIndexes.
empty())
325 return fail(
"invalid shard indexes");
331 return fail(
"cannot be stored at this time");
333 auto historicalShardsToPrepare = 0;
335 for (
auto const shardIndex : shardIndexes)
340 "comes before earliest shard index " +
351 return fail(
"invalid index", shardIndex);
358 return fail(
"invalid index", shardIndex);
362 return fail(
"is already stored", shardIndex);
366 "is already queued for import from the shard archive handler",
373 if (shard->index() == shardIndex)
375 "is being imported from the nodestore", shardIndex);
382 ++historicalShardsToPrepare;
389 return fail(
"maximum number of historical shards reached");
391 if (historicalShardsToPrepare)
396 return fail(
"insufficient storage space available");
399 if (
auto const recentShardsToPrepare =
400 shardIndexes.size() - historicalShardsToPrepare;
401 recentShardsToPrepare)
406 return fail(
"insufficient storage space available");
409 for (
auto const shardIndex : shardIndexes)
435 rs.insert(shardIndex);
447 boost::filesystem::path
const& srcDir)
451 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
459 using namespace boost::filesystem;
462 if (!is_directory(srcDir) || is_empty(srcDir))
465 "invalid source directory " + srcDir.string(),
472 std::string(
". Exception caught in function ") + __func__ +
473 ". Error: " + e.
what(),
486 return fail(
"already exists", lock);
490 return fail(
"was not prepared for import", lock);
492 auto const pathDesignation{
494 if (!pathDesignation)
495 return fail(
"failed to import", lock);
504 auto renameDir = [&, fname = __func__](path
const& src, path
const& dst) {
512 std::string(
". Exception caught in function ") + fname +
513 ". Error: " + e.
what(),
520 if (!renameDir(srcDir, dstDir))
524 auto shard{std::make_unique<Shard>(
525 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
531 renameDir(dstDir, srcDir);
535 auto const [it, inserted] = [&]() {
538 return shards_.emplace(shardIndex, std::move(shard));
544 renameDir(dstDir, srcDir);
562 auto const it{
shards_.find(shardIndex)};
569 switch (shard->getState())
574 if (shard->containsLedger(ledgerSeq))
587 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
591 auto ledger{std::make_shared<Ledger>(
596 if (ledger->info().seq != ledgerSeq)
599 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
601 if (ledger->info().hash != hash)
604 "encountered invalid ledger hash " +
to_string(hash) +
609 if (!ledger->stateMap().fetchRoot(
610 SHAMapHash{ledger->info().accountHash},
nullptr))
613 "is missing root STATE node on hash " +
to_string(hash) +
617 if (ledger->info().txHash.isNonZero())
619 if (!ledger->txMap().fetchRoot(
623 "is missing root TXN node on hash " +
to_string(hash) +
633 auto const ledgerSeq{ledger->info().seq};
634 if (ledger->info().hash.isZero())
636 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
640 if (ledger->info().accountHash.isZero())
642 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
646 if (ledger->stateMap().getHash().isNonZero() &&
647 !ledger->stateMap().isValid())
649 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
653 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
655 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
669 <<
"shard " << shardIndex <<
" is not being acquired";
673 auto const it{
shards_.find(shardIndex)};
677 <<
"shard " << shardIndex <<
" is not being acquired";
683 if (shard->containsLedger(ledgerSeq))
685 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
708 for (
auto const& [_, shard] :
shards_)
718 for (
auto const& wptr : shards)
720 if (
auto const shard{wptr.lock()})
722 JLOG(
j_.
warn()) <<
" shard " << shard->index() <<
" unexpired";
777 JLOG(
j_.
error()) <<
"database import already in progress";
787 auto shouldHalt = [
this] {
788 bool expected =
true;
814 ledgerSeq = info->seq;
816 if (!ledger || ledgerSeq == 0)
818 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
819 " the SQLite database to import";
830 auto const earliestIndex = [&] {
837 return earliestIndex;
841 auto const latestLedgerSeq = loadLedger(
"desc");
842 if (!latestLedgerSeq)
845 auto const latestIndex = [&] {
855 if (latestIndex < earliestIndex)
857 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
858 " the SQLite database to import";
862 JLOG(
j_.
debug()) <<
"Importing ledgers for shards " << earliestIndex
863 <<
" through " << latestIndex;
870 earliestIndex, latestIndex, 0);
874 for (
std::uint32_t shardIndex = earliestIndex; shardIndex <= latestIndex;
880 auto const pathDesignation = [
this, shardIndex] {
884 auto const pathDesignation =
887 return pathDesignation;
890 if (!pathDesignation)
900 <<
"shard " << shardIndex <<
" already being acquired";
908 <<
"shard " << shardIndex <<
" already being imported";
915 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
926 auto const ledgerHashes{
929 if (ledgerHashes.size() !=
maxLedgers(shardIndex))
937 if (!source.fetchNodeObject(ledgerHashes.at(n).ledgerHash, n))
939 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
940 <<
" mismatches node store";
952 bool const needsHistoricalPath =
955 auto const path = needsHistoricalPath
960 auto shard{std::make_shared<Shard>(
app_, *
this, shardIndex, path,
j_)};
983 JLOG(
j_.
error()) <<
"shard " << shardIndex
984 <<
" failed to create temp marker file";
985 shard->removeOnDestroy();
994 while (
auto const ledgerSeq = shard->prepare())
1001 if (!ledger || ledger->info().seq != ledgerSeq)
1004 auto const result{shard->storeLedger(ledger, recentStored)};
1009 if (!shard->setLedgerStored(ledger))
1012 if (!lastLedgerHash && ledgerSeq == lastSeq)
1013 lastLedgerHash = ledger->info().hash;
1015 recentStored = std::move(ledger);
1021 using namespace boost::filesystem;
1022 bool success{
false};
1034 if (shard->storeNodeObject(nodeObject))
1042 remove_all(markerFile);
1044 JLOG(
j_.
debug()) <<
"shard " << shardIndex
1045 <<
" was successfully imported"
1046 " from the NodeStore";
1048 shards_.emplace(shardIndex, std::move(shard))
1062 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
1063 <<
". Exception caught in function "
1064 << __func__ <<
". Error: " << e.
what();
1071 JLOG(
j_.
error()) <<
"shard " << shardIndex
1072 <<
" failed to import from the NodeStore";
1075 shard->removeOnDestroy();
1099 return shard->getWriteLoad();
1116 <<
"shard " << shardIndex <<
" is not being acquired";
1120 auto const it{
shards_.find(shardIndex)};
1124 <<
"shard " << shardIndex <<
" is not being acquired";
1130 auto const nodeObject{
1132 if (shard->storeNodeObject(nodeObject))
1139 auto const ledgerSeq{srcLedger->info().seq};
1149 <<
"shard " << shardIndex <<
" is not being acquired";
1153 auto const it{
shards_.find(shardIndex)};
1157 <<
"shard " << shardIndex <<
" is not being acquired";
1163 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1165 if (result.error || result.count == 0 || result.size == 0)
1187 for (
auto const& weak : shards)
1189 if (
auto const shard{weak.lock()}; shard && shard->isOpen())
1198 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1209 return lhsShard->getLastUse() < rhsShard->getLastUse();
1212 for (
auto it{openFinals.
cbegin()};
1215 if ((*it)->tryClose())
1216 it = openFinals.
erase(it);
1239 currentShard[jss::storedSeqs] = shard->getStoredSeqs();
1241 ret[jss::currentShard] = currentShard;
1244 ret[jss::message] =
"Database import halt initiated...";
1262 rpcINTERNAL,
"Database import already in progress");
1270 result[jss::message] =
"Database import initiated...";
1292 result[jss::message] =
"Database import halt initiated...";
1321 get_if_exists<std::uint32_t>(section, name, shardDBValue);
1324 get_if_exists<std::uint32_t>(
1327 return shardDBValue == nodeDBValue;
1336 "ledgers_per_shard" +
"' values");
1342 "earliest_seq" +
"' values");
1345 using namespace boost::filesystem;
1346 if (!get_if_exists<path>(section,
"path",
dir_))
1347 return fail(
"'path' missing");
1352 Section const& historicalShardPaths =
1353 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1355 auto values = historicalShardPaths.
values();
1357 std::sort(values.begin(), values.end());
1358 values.erase(
std::unique(values.begin(), values.end()), values.end());
1360 for (
auto const& s : values)
1362 auto const dir = path(s);
1366 "the 'path' cannot also be in the "
1367 "'historical_shard_path' section");
1377 return fail(
"'type' value unsupported");
1393 auto const it{
shards_.find(shardIndex)};
1399 return shard->fetchNodeObject(hash, fetchReport);
1408 return std::nullopt;
1410 auto const maxShardIndex{[
this, validLedgerSeq]() {
1419 if (
shards_.size() >= maxNumShards)
1420 return std::nullopt;
1422 if (maxShardIndex < 1024 ||
1423 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1441 return std::nullopt;
1453 for (
int i = 0; i < 40; ++i)
1464 return std::nullopt;
1470 bool const writeSQLite,
1480 auto shard{wptr.lock()};
1483 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1487 if (!shard->finalize(writeSQLite, expectedHash))
1504 if (shard->index() < boundaryIndex)
1508 shard->getDir().parent_path() ==
dir_)
1511 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1512 <<
" is not stored at a historical path";
1518 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1522 if (shard->index() == boundaryIndex)
1527 if (shard->getDir().parent_path() !=
dir_)
1529 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1530 <<
" is not stored at the path";
1558 for (
auto const& weak : shards)
1560 if (
auto const shard{weak.lock()}; shard)
1562 auto const [sz, fd] = shard->getFileInfo();
1585 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1596 <<
"maximum shard store size exceeds available storage space";
1620 auto const availableSpace =
1621 boost::filesystem::space(path).available;
1641 if (numShards <= shardCap)
1644 numShards -= shardCap;
1649 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1650 <<
". Error: " << e.
what();
1662 if (!shard->setLedgerStored(ledger))
1672 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1682 <<
"shard " << shard->index() <<
" is no longer being acquired";
1706 shard->removeOnDestroy();
1736 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1737 return entry.first < boundaryIndex;
1750 auto const latestShardIndex =
1754 auto const removeShard = [
this](
std::uint32_t const shardIndex) ->
void {
1763 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1768 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1772 auto const keepShard = [
this, &lock, removeShard, separateHistoricalPath](
1776 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1777 removeShard(shardIndex);
1780 if (separateHistoricalPath &&
1783 JLOG(
j_.
error()) <<
"insufficient storage space available";
1784 removeShard(shardIndex);
1793 auto const moveShard = [
this,
1795 auto it{
shards_.find(shardIndex)};
1798 JLOG(
j_.
warn()) <<
"can't find shard to move to historical path";
1802 auto& shard{it->second};
1807 if (!shard->tryClose())
1809 JLOG(
j_.
warn()) <<
"can't close shard to move to historical path";
1817 boost::filesystem::rename(
1822 JLOG(
j_.
error()) <<
"shard " << shardIndex
1823 <<
" failed to move to historical storage";
1828 shard = std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1833 JLOG(
j_.
error()) <<
"shard " << shardIndex
1834 <<
" failed to open in historical storage";
1835 shard->removeOnDestroy();
1841 bool const curNotSynched =
1848 if (curNotSynched || prevNotSynched)
1853 if (keepShard(*prev) && separateHistoricalPath)
1856 prev = std::nullopt;
1862 if (cur == latestShardIndex - 1)
1869 if (keepShard(*cur) && separateHistoricalPath)
1886 auto const isHistoricalShard = shardIndex < boundaryIndex;
1895 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1897 return std::nullopt;
1901 JLOG(
j_.
error()) <<
"insufficient storage space available";
1903 return std::nullopt;
1909 boost::filesystem::path
1917 boost::filesystem::path historicalShardPath;
1926 if (potentialPaths.
empty())
1928 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1933 potentialPaths.
begin(),
1934 potentialPaths.
end(),
1935 &historicalShardPath,
1939 return historicalShardPath;
1954 struct statvfs buffer;
1955 if (statvfs(path.c_str(), &buffer))
1958 <<
"failed to acquire stats for 'historical_shard_path': "
1963 filesystemIDs[buffer.f_fsid].push_back(path.string());
1967 for (
auto const& entry : filesystemIDs)
1970 if (entry.second.size() > 1)
1975 <<
"The following paths correspond to the same filesystem: "
1976 << boost::algorithm::join(entry.second,
", ")
1977 <<
". Each configured historical storage path should"
1978 " be on a unique device or filesystem.";
1999 uniqueCapacities[boost::filesystem::space(path).available].push_back(
2002 for (
auto const& entry : uniqueCapacities)
2005 if (entry.second.size() > 1)
2010 <<
"Each of the following paths have " << entry.first
2011 <<
" bytes free, and may be located on the same device"
2013 << boost::algorithm::join(entry.second,
", ")
2014 <<
". Each configured historical storage path should"
2015 " be on a unique device or file system.";
2026 std::function<
bool(soci::session& session)>
const& callback)
2030 JLOG(
j_.
warn()) <<
"callForLedgerSQLByLedgerSeq ledger seq too early: "
2040 const uint32_t shardIndex,
2041 std::function<
bool(soci::session& session)>
const& callback)
2045 auto const it{
shards_.find(shardIndex)};
2049 it->second->callForLedgerSQL(callback);
2055 std::function<
bool(soci::session& session)>
const& callback)
2064 std::function<
bool(soci::session& session)>
const& callback)
2068 auto const it{
shards_.find(shardIndex)};
2072 it->second->callForTransactionSQL(callback);
2087 it =
shards_.lower_bound(*minShardIndex);
2091 for (; it != eit; it++)
2095 if (!visit(*it->second))
2110 minShardIndex, [&callback](
Shard& shard) ->
bool {
2122 minShardIndex, [&callback](
Shard& shard) ->
bool {
2143 for (; it != eit; it++)
2146 (!maxShardIndex || it->first <= *maxShardIndex))
2148 if (!visit(*it->second))
2181 auto shardInfo{std::make_unique<ShardInfo>()};
2182 for (
auto const& [_, shard] :
shards_)
2185 shard->index(), shard->getState(), shard->getPercentProgress());
2209 message, protocol::mtPEER_SHARD_INFO_V2)));
2246 if (section.empty())
2249 return std::make_unique<DatabaseShardImp>(app, scheduler, readThreads, j);