20 #include <ripple/app/ledger/InboundLedgers.h>
21 #include <ripple/app/ledger/LedgerMaster.h>
22 #include <ripple/app/misc/NetworkOPs.h>
23 #include <ripple/basics/ByteUtilities.h>
24 #include <ripple/basics/chrono.h>
25 #include <ripple/basics/random.h>
26 #include <ripple/core/ConfigSections.h>
27 #include <ripple/nodestore/DummyScheduler.h>
28 #include <ripple/nodestore/impl/DatabaseShardImp.h>
29 #include <ripple/overlay/Overlay.h>
30 #include <ripple/overlay/predicates.h>
31 #include <ripple/protocol/HashPrefix.h>
33 #include <boost/algorithm/string/predicate.hpp>
36 #include <sys/statvfs.h>
60 , avgShardFileSz_(ledgersPerShard_ *
kilobytes(192ull))
66 Throw<std::runtime_error>(
67 "Attempted to create DatabaseShardImp in reporting mode. Reporting "
68 "does not support shards. Remove shards info from config");
79 JLOG(
j_.
error()) <<
"already initialized";
85 JLOG(
j_.
error()) <<
"invalid configuration file settings";
91 using namespace boost::filesystem;
98 for (
auto const& path : paths)
102 if (!is_directory(path))
104 JLOG(
j_.
error()) << path <<
" must be a directory";
108 else if (!create_directories(path))
111 <<
"failed to create path: " + path.string();
123 ctx_ = std::make_unique<nudb::context>();
128 for (
auto const& path : paths)
130 for (
auto const& it : directory_iterator(path))
133 if (!is_directory(it))
137 auto const shardDir{it.path()};
138 auto dirName{shardDir.stem().string()};
140 dirName.begin(), dirName.end(), [](
auto c) {
141 return ::isdigit(static_cast<unsigned char>(c));
152 <<
"shard " << shardIndex
153 <<
" ignored, comes before earliest shard index "
162 <<
"shard " << shardIndex
163 <<
" previously failed import, removing";
164 remove_all(shardDir);
168 auto shard{std::make_shared<Shard>(
169 app_, *
this, shardIndex, shardDir.parent_path(),
j_)};
173 shard->removeOnDestroy();
175 <<
"shard " << shardIndex <<
" removed, "
176 << (shard->isLegacy() ?
"legacy" :
"corrupted")
181 switch (shard->getState())
186 shards_.emplace(shardIndex, std::move(shard));
191 shards_.emplace(shardIndex, std::move(shard))
201 <<
"more than one shard being acquired";
205 shards_.emplace(shardIndex, std::move(shard));
211 <<
"shard " << shardIndex <<
" invalid state";
219 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
220 <<
". Error: " << e.
what();
233 boost::optional<std::uint32_t>
236 boost::optional<std::uint32_t> shardIndex;
245 return it->second->prepare();
260 JLOG(
j_.
debug()) <<
"no new shards to add";
268 auto const pathDesignation = [
this, shardIndex = *shardIndex]() {
273 if (!pathDesignation)
276 auto const needsHistoricalPath =
279 auto shard = [
this, shardIndex, needsHistoricalPath] {
281 return std::make_unique<Shard>(
292 auto const ledgerSeq{shard->prepare()};
295 shards_.emplace(*shardIndex, std::move(shard));
304 auto fail = [j =
j_, &shardIndexes](
306 boost::optional<std::uint32_t> shardIndex = boost::none) {
307 auto multipleIndexPrequel = [&shardIndexes] {
310 shardIndexes.
begin(),
312 indexesAsString.
begin(),
313 [](uint32_t
const index) { return std::to_string(index); });
316 (shardIndexes.
size() > 1 ?
"s " :
" ") +
317 boost::algorithm::join(indexesAsString,
", ");
322 : multipleIndexPrequel();
324 JLOG(j.error()) << prequel <<
" " << msg;
332 return fail(
"cannot be stored at this time");
334 auto historicalShardsToPrepare = 0;
336 for (
auto const shardIndex : shardIndexes)
341 "comes before earliest shard index " +
352 return fail(
"invalid index", shardIndex);
359 return fail(
"invalid index", shardIndex);
363 return fail(
"is already stored", shardIndex);
366 return fail(
"is already queued for import", shardIndex);
371 ++historicalShardsToPrepare;
378 return fail(
"maximum number of historical shards reached");
380 if (historicalShardsToPrepare)
385 return fail(
"insufficient storage space available");
388 if (
auto const recentShardsToPrepare =
389 shardIndexes.size() - historicalShardsToPrepare;
390 recentShardsToPrepare)
395 return fail(
"insufficient storage space available");
398 for (
auto const shardIndex : shardIndexes)
400 auto const prepareSuccessful =
403 (void)prepareSuccessful;
404 assert(prepareSuccessful);
428 rs.insert(shardIndex);
440 boost::filesystem::path
const& srcDir)
444 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
451 using namespace boost::filesystem;
454 if (!is_directory(srcDir) || is_empty(srcDir))
457 "invalid source directory " + srcDir.string(),
464 std::string(
". Exception caught in function ") + __func__ +
465 ". Error: " + e.
what(),
478 return fail(
"already exists", lock);
482 return fail(
"was not prepared for import", lock);
484 auto const pathDesignation{
486 if (!pathDesignation)
487 return fail(
"failed to import", lock);
496 auto renameDir = [&](path
const& src, path
const& dst) {
504 std::string(
". Exception caught in function ") + __func__ +
505 ". Error: " + e.
what(),
512 if (!renameDir(srcDir, dstDir))
516 auto shard{std::make_unique<Shard>(
517 app_, *
this, shardIndex, dstDir.parent_path(),
j_)};
522 renameDir(dstDir, srcDir);
526 auto const [it, inserted] = [&]() {
529 return shards_.emplace(shardIndex, std::move(shard));
535 renameDir(dstDir, srcDir);
553 auto const it{
shards_.find(shardIndex)};
560 switch (shard->getState())
565 if (shard->containsLedger(ledgerSeq))
578 JLOG(
j_.
error()) <<
"shard " << shardIndex <<
" " << msg;
582 auto ledger{std::make_shared<Ledger>(
587 if (ledger->info().seq != ledgerSeq)
590 "encountered invalid ledger sequence " +
std::to_string(ledgerSeq));
592 if (ledger->info().hash != hash)
595 "encountered invalid ledger hash " +
to_string(hash) +
600 if (!ledger->stateMap().fetchRoot(
601 SHAMapHash{ledger->info().accountHash},
nullptr))
604 "is missing root STATE node on hash " +
to_string(hash) +
608 if (ledger->info().txHash.isNonZero())
610 if (!ledger->txMap().fetchRoot(
614 "is missing root TXN node on hash " +
to_string(hash) +
624 auto const ledgerSeq{ledger->info().seq};
625 if (ledger->info().hash.isZero())
627 JLOG(
j_.
error()) <<
"zero ledger hash for ledger sequence "
631 if (ledger->info().accountHash.isZero())
633 JLOG(
j_.
error()) <<
"zero account hash for ledger sequence "
637 if (ledger->stateMap().getHash().isNonZero() &&
638 !ledger->stateMap().isValid())
640 JLOG(
j_.
error()) <<
"invalid state map for ledger sequence "
644 if (ledger->info().txHash.isNonZero() && !ledger->txMap().isValid())
646 JLOG(
j_.
error()) <<
"invalid transaction map for ledger sequence "
660 <<
"shard " << shardIndex <<
" is not being acquired";
664 auto const it{
shards_.find(shardIndex)};
668 <<
"shard " << shardIndex <<
" is not being acquired";
674 if (shard->containsLedger(ledgerSeq))
676 JLOG(
j_.
trace()) <<
"shard " << shardIndex <<
" ledger already stored";
719 for (
auto const& e : shards)
724 if (
auto const shard{e.lock()}; shard)
727 JLOG(
j_.
warn()) <<
" shard " << shardIndex <<
" unexpired";
745 JLOG(
j_.
error()) <<
"invalid source database";
752 auto loadLedger = [&](
bool ascendSort =
753 true) -> boost::optional<std::uint32_t> {
757 "WHERE LedgerSeq >= " +
759 " order by LedgerSeq " + (ascendSort ?
"asc" :
"desc") +
763 if (!ledger || ledgerSeq == 0)
765 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
766 " the SQLite database to import";
773 auto ledgerSeq{loadLedger()};
783 ledgerSeq = loadLedger(
false);
792 if (latestIndex < earliestIndex)
794 JLOG(
j_.
error()) <<
"no suitable ledgers were found in"
795 " the SQLite database to import";
804 shardIndex <= latestIndex;
807 auto const pathDesignation =
810 if (!pathDesignation)
813 auto const needsHistoricalPath =
820 <<
"shard " << shardIndex <<
" already being acquired";
828 <<
"shard " << shardIndex <<
" already being imported";
835 JLOG(
j_.
debug()) <<
"shard " << shardIndex <<
" already stored";
844 auto const numLedgers{
848 if (ledgerHashes.size() != numLedgers)
856 JLOG(
j_.
warn()) <<
"SQLite ledger sequence " << n
857 <<
" mismatches node store";
871 std::make_unique<Shard>(
app_, *
this, shardIndex, path,
j_)};
882 JLOG(
j_.
error()) <<
"shard " << shardIndex
883 <<
" failed to create temp marker file";
884 shard->removeOnDestroy();
892 boost::optional<uint256> lastLedgerHash;
894 while (
auto const ledgerSeq = shard->prepare())
897 if (!ledger || ledger->info().seq != ledgerSeq)
900 auto const result{shard->storeLedger(ledger, recentStored)};
905 if (!shard->setLedgerStored(ledger))
908 if (!lastLedgerHash && ledgerSeq ==
lastLedgerSeq(shardIndex))
909 lastLedgerHash = ledger->info().hash;
911 recentStored = std::move(ledger);
914 using namespace boost::filesystem;
927 if (shard->storeNodeObject(nodeObject))
933 remove_all(markerFile);
935 JLOG(
j_.
debug()) <<
"shard " << shardIndex
936 <<
" was successfully imported";
938 shards_.emplace(shardIndex, std::move(shard))
949 JLOG(
j_.
fatal()) <<
"shard index " << shardIndex
950 <<
". Exception caught in function "
951 << __func__ <<
". Error: " << e.
what();
959 <<
"shard " << shardIndex <<
" failed to import";
960 shard->removeOnDestroy();
984 return shard->getWriteLoad();
1001 <<
"shard " << shardIndex <<
" is not being acquired";
1005 auto const it{
shards_.find(shardIndex)};
1009 <<
"shard " << shardIndex <<
" is not being acquired";
1015 auto const nodeObject{
1017 if (shard->storeNodeObject(nodeObject))
1024 auto const ledgerSeq{srcLedger->info().seq};
1034 <<
"shard " << shardIndex <<
" is not being acquired";
1038 auto const it{
shards_.find(shardIndex)};
1042 <<
"shard " << shardIndex <<
" is not being acquired";
1048 auto const result{shard->storeLedger(srcLedger,
nullptr)};
1050 if (result.error || result.count == 0 || result.size == 0)
1072 for (
auto const& e : shards)
1074 if (
auto const shard{e.lock()}; shard && shard->isOpen())
1085 JLOG(
j_.
trace()) <<
"Open shards exceed configured limit of "
1096 return lhsShard->getLastUse() < rhsShard->getLastUse();
1099 for (
auto it{openFinals.
cbegin()};
1102 if ((*it)->tryClose())
1103 it = openFinals.
erase(it);
1130 get_if_exists<std::uint32_t>(
1131 section,
"earliest_seq", shardDBEarliestSeq);
1134 get_if_exists<std::uint32_t>(
1139 if (shardDBEarliestSeq != nodeDBEarliestSeq)
1143 "] define different 'earliest_seq' values");
1147 using namespace boost::filesystem;
1148 if (!get_if_exists<path>(section,
"path",
dir_))
1149 return fail(
"'path' missing");
1154 Section const& historicalShardPaths =
1155 config.section(SECTION_HISTORICAL_SHARD_PATHS);
1157 auto values = historicalShardPaths.
values();
1159 std::sort(values.begin(), values.end());
1160 values.erase(
std::unique(values.begin(), values.end()), values.end());
1162 for (
auto const& s : values)
1164 auto const dir = path(s);
1168 "the 'path' cannot also be in the "
1169 "'historical_shard_path' section");
1176 if (section.exists(
"ledgers_per_shard"))
1179 if (!config.standalone())
1180 return fail(
"'ledgers_per_shard' only honored in stand alone");
1184 return fail(
"'ledgers_per_shard' must be a multiple of 256");
1191 backendName_ = get<std::string>(section,
"type",
"nudb");
1193 return fail(
"'type' value unsupported");
1208 auto const it{
shards_.find(shardIndex)};
1214 return shard->fetchNodeObject(hash, fetchReport);
1217 boost::optional<std::uint32_t>
1225 auto const maxShardIndex{[
this, validLedgerSeq]() {
1234 if (
shards_.size() >= maxNumShards)
1237 if (maxShardIndex < 1024 ||
1238 static_cast<float>(
shards_.size()) / maxNumShards > 0.5f)
1246 shardIndex <= maxShardIndex;
1269 for (
int i = 0; i < 40; ++i)
1287 boost::optional<uint256>
const& expectedHash)
1296 auto shard{wptr.lock()};
1299 JLOG(
j_.
debug()) <<
"Shard removed before being finalized";
1303 if (!shard->finalize(writeSQLite, expectedHash))
1322 if (shard->index() < boundaryIndex)
1326 shard->getDir().parent_path() ==
dir_)
1329 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1330 <<
" is not stored at a historical path";
1338 assert(!boundaryIndex || shard->index() - boundaryIndex <= 1);
1340 auto& recentShard = shard->index() == boundaryIndex
1345 recentShard = shard->index();
1347 if (shard->getDir().parent_path() !=
dir_)
1349 JLOG(
j_.
warn()) <<
"shard " << shard->index()
1350 <<
" is not stored at the path";
1361 protocol::TMPeerShardInfo message;
1363 message.set_nodepubkey(publicKey.data(), publicKey.size());
1366 message, protocol::mtPEER_SHARD_INFO)));
1388 for (
auto const& e : shards)
1390 if (
auto const shard{e.lock()}; shard)
1392 auto const [sz, fd] = shard->getFileInfo();
1415 JLOG(
j_.
warn()) <<
"maximum number of historical shards reached";
1426 <<
"maximum shard store size exceeds available storage space";
1440 rs.insert(e.second->index());
1465 auto const availableSpace =
1466 boost::filesystem::space(path).available;
1486 if (numShards <= shardCap)
1489 numShards -= shardCap;
1494 JLOG(
j_.
fatal()) <<
"Exception caught in function " << __func__
1495 <<
". Error: " << e.
what();
1507 if (!shard->setLedgerStored(ledger))
1517 if (
auto const it{
shards_.find(shard->index())}; it !=
shards_.end())
1527 <<
"shard " << shard->index() <<
" is no longer being acquired";
1550 if ((
shards_.erase(shard->index()) > 0) &&
1557 shard->removeOnDestroy();
1587 shards_.begin(),
shards_.end(), [boundaryIndex](
auto const& entry) {
1588 return entry.first < boundaryIndex;
1599 auto const latestShardIndex =
1604 auto const removeShard =
1614 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1619 JLOG(
j_.
warn()) <<
"can't find shard to remove";
1623 auto const keepShard =
1624 [
this, &lock, removeShard, separateHistoricalPath](
1629 <<
"maximum number of historical shards reached";
1631 removeShard(shardIndex);
1634 if (separateHistoricalPath &&
1637 JLOG(
j_.
error()) <<
"insufficient storage space available";
1639 removeShard(shardIndex);
1648 auto const moveShard = [
this,
1654 auto& shard{it->second};
1659 if (!shard->tryClose())
1662 <<
"can't close shard to move to historical path";
1669 boost::filesystem::rename(
1670 shard->getDir().string(),
1675 JLOG(
j_.
error()) <<
"shard " << shardIndex
1676 <<
" failed to move to historical storage";
1682 std::make_shared<Shard>(
app_, *
this, shardIndex, dst,
j_);
1687 JLOG(
j_.
error()) <<
"shard " << shardIndex
1688 <<
" failed to open in historical storage";
1689 shard->removeOnDestroy();
1696 <<
"can't find shard to move to historical path";
1701 bool const curNotSynched =
1708 if (curNotSynched || prevNotSynched)
1713 if (keepShard(*prev) && separateHistoricalPath)
1724 if (cur == latestShardIndex - 1)
1733 if (keepShard(*cur) && separateHistoricalPath)
1753 auto const isHistoricalShard = shardIndex < boundaryIndex;
1762 JLOG(
j_.
error()) <<
"maximum number of historical shards reached";
1768 JLOG(
j_.
error()) <<
"insufficient storage space available";
1776 boost::filesystem::path
1784 boost::filesystem::path historicalShardPath;
1793 if (potentialPaths.
empty())
1795 JLOG(
j_.
error()) <<
"failed to select a historical shard path";
1800 potentialPaths.
begin(),
1801 potentialPaths.
end(),
1802 &historicalShardPath,
1806 return historicalShardPath;
1821 struct statvfs buffer;
1822 if (statvfs(path.c_str(), &buffer))
1825 <<
"failed to acquire stats for 'historical_shard_path': "
1830 filesystemIDs[buffer.f_fsid].push_back(path.string());
1834 for (
auto const& entry : filesystemIDs)
1837 if (entry.second.size() > 1)
1842 <<
"The following paths correspond to the same filesystem: "
1843 << boost::algorithm::join(entry.second,
", ")
1844 <<
". Each configured historical storage path should"
1845 " be on a unique device or filesystem.";
1866 uniqueCapacities[boost::filesystem::space(path).available].push_back(
1869 for (
auto const& entry : uniqueCapacities)
1872 if (entry.second.size() > 1)
1877 <<
"Each of the following paths have " << entry.first
1878 <<
" bytes free, and may be located on the same device"
1880 << boost::algorithm::join(entry.second,
", ")
1881 <<
". Each configured historical storage path should"
1882 " be on a unique device or file system.";
1903 if (section.empty())
1906 return std::make_unique<DatabaseShardImp>(
1907 app, parent,
"ShardStore", scheduler, readThreads, j);