20 #include <ripple/app/ledger/InboundLedger.h>
21 #include <ripple/app/main/DBInit.h>
22 #include <ripple/app/rdb/backend/detail/Shard.h>
23 #include <ripple/basics/StringUtilities.h>
24 #include <ripple/core/ConfigSections.h>
25 #include <ripple/nodestore/Manager.h>
26 #include <ripple/nodestore/impl/DeterministicShard.h>
27 #include <ripple/nodestore/impl/Shard.h>
28 #include <ripple/protocol/LedgerHeader.h>
29 #include <ripple/protocol/digest.h>
41 :
Shard(app, db, index,
"", j)
49 boost::filesystem::path
const& dir,
54 , firstSeq_(db.firstLedgerSeq(index))
55 , lastSeq_(
std::max(firstSeq_, db.lastLedgerSeq(index)))
56 , maxLedgers_(db.maxLedgers(index))
57 , dir_((dir.empty() ? db.getRootDir() : dir) /
std::
to_string(index_))
72 <<
" backend in use, unable to remove directory";
85 boost::filesystem::remove_all(
dir_);
90 <<
". Exception caught in function " << __func__
91 <<
". Error: " << e.
what();
103 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" failed to find factory for "
107 section.set(
"path",
dir_.string());
112 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" already initialized";
115 backend_ = factory->createInstance(
133 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
137 return backend_->isOpen();
155 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
158 if (!backend_->isOpen())
168 <<
". Exception caught in function " << __func__
169 <<
". Error: " << e.
what();
173 lgrSQLiteDB_.reset();
175 acquireInfo_.reset();
190 <<
" prepare called when not acquiring";
198 <<
" missing acquire SQLite database";
202 if (acquireInfo_->storedSeqs.empty())
213 if (nodeObject->getHash() !=
finalKey)
228 backend_->store(nodeObject);
233 <<
". Exception caught in function " << __func__
234 <<
". Error: " << e.
what();
255 status = backend_->fetch(hash.
data(), &nodeObject);
260 <<
". Exception caught in function " << __func__
261 <<
". Error: " << e.
what();
272 <<
"shard " <<
index_ <<
". Corrupt node object at hash "
278 <<
"shard " <<
index_ <<
". Unknown status=" << status
279 <<
" fetching node object at hash " <<
to_string(hash);
304 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
". Ledger already stored";
309 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
". Source ledger sequence "
310 << srcLedger->info().seq <<
". " << msg;
315 if (srcLedger->info().hash.isZero())
316 return fail(
"Invalid hash");
317 if (srcLedger->info().accountHash.isZero())
318 return fail(
"Invalid account hash");
320 auto& srcDB{
const_cast<Database&
>(srcLedger->stateMap().family().db())};
322 return fail(
"Source and destination databases are the same");
326 return fail(
"Failed to lock backend");
330 auto storeBatch = [&]() {
332 for (
auto const& nodeObject : batch)
333 sz += nodeObject->getData().size();
338 backend_->storeBatch(batch);
343 std::string(
". Exception caught in function ") + __func__ +
344 ". Error: " + e.
what());
348 result.
count += batch.size();
358 addRaw(srcLedger->info(), s);
368 if (
auto nodeObject = srcDB.fetchNodeObject(
369 node.getHash().as_uint256(), srcLedger->info().seq))
382 if (srcLedger->stateMap().getHash().isNonZero())
384 if (!srcLedger->stateMap().isValid())
385 return fail(
"Invalid state map");
387 if (next && next->info().parentHash == srcLedger->info().hash)
389 auto have = next->stateMap().snapShot(
false);
390 srcLedger->stateMap().snapShot(
false)->visitDifferences(
394 srcLedger->stateMap().snapShot(
false)->visitNodes(visit);
396 return fail(
"Failed to store state map");
400 if (srcLedger->info().txHash.isNonZero())
402 if (!srcLedger->txMap().isValid())
403 return fail(
"Invalid transaction map");
405 srcLedger->txMap().snapShot(
false)->visitNodes(visit);
407 return fail(
"Failed to store transaction map");
410 if (!batch.
empty() && !storeBatch())
411 return fail(
"Failed to store");
431 auto const ledgerSeq{ledger->info().seq};
432 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
433 return fail(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
446 return fail(
"Missing acquire SQLite database");
448 if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
451 JLOG(
j_.
debug()) <<
"shard " <<
index_ <<
" ledger sequence "
452 << ledgerSeq <<
" already stored";
458 return fail(
"Failed to store ledger");
463 acquireInfo_->storedSeqs.insert(ledgerSeq);
467 auto session{acquireInfo_->SQLiteDB->checkoutDb()};
468 soci::blob sociBlob(*session);
473 auto const sHash{
to_string(ledger->info().hash)};
474 *session <<
"UPDATE Shard "
475 "SET LastLedgerHash = :lastLedgerHash,"
476 "StoredLedgerSeqs = :storedLedgerSeqs "
477 "WHERE ShardIndex = :shardIndex;",
478 soci::use(sHash), soci::use(sociBlob), soci::use(
index_);
482 *session <<
"UPDATE Shard "
483 "SET StoredLedgerSeqs = :storedLedgerSeqs "
484 "WHERE ShardIndex = :shardIndex;",
485 soci::use(sociBlob), soci::use(
index_);
490 acquireInfo_->storedSeqs.erase(ledgerSeq);
492 std::string(
". Exception caught in function ") + __func__ +
493 ". Error: " + e.
what());
497 progress_ = boost::icl::length(acquireInfo_->storedSeqs);
502 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
" stored ledger sequence "
510 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
519 <<
" missing acquire SQLite database";
522 return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq);
525 std::chrono::steady_clock::time_point
536 return {fileSz_, fdRequired_};
546 return backend_->getWriteLoad();
567 << (hash.isZero() ?
""
569 << (ledgerSeq == 0 ?
""
570 :
". Ledger sequence " +
587 backend_->fetch(
finalKey.
data(), &nodeObject) == Status::ok)
591 nodeObject->getData().data(), nodeObject->getData().size());
593 return fail(
"invalid version");
596 return fail(
"out of range ledger sequences");
599 return fail(
"invalid last ledger hash");
606 return fail(
"missing acquire SQLite database");
609 *acquireInfo_->SQLiteDB->checkoutDb(),
index_);
612 return fail(
"missing or invalid ShardIndex");
615 return fail(
"missing LastLedgerHash");
617 if (!hash.parseHex(*seqshash.hash) || hash.isZero())
618 return fail(
"invalid LastLedgerHash");
620 if (!seqshash.sequences)
621 return fail(
"missing StoredLedgerSeqs");
623 auto& storedSeqs{acquireInfo_->storedSeqs};
624 if (!
from_string(storedSeqs, *seqshash.sequences) ||
625 boost::icl::first(storedSeqs) !=
firstSeq_ ||
626 boost::icl::last(storedSeqs) !=
lastSeq_ ||
629 return fail(
"invalid StoredLedgerSeqs");
636 std::string(
". Exception caught in function ") + __func__ +
637 ". Error: " + e.
what());
642 if (referenceHash && *referenceHash != hash)
643 return fail(
"invalid last ledger hash");
649 auto const lastLedgerHash{hash};
652 auto const treeNodeCache{shardFamily.getTreeNodeCache(
lastSeq_)};
655 fullBelowCache->reset();
656 treeNodeCache->reset();
667 return fail(
"Failed to create deterministic shard");
679 return fail(
"invalid ledger");
681 ledger = std::make_shared<Ledger>(
685 if (ledger->info().seq != ledgerSeq)
686 return fail(
"invalid ledger sequence");
687 if (ledger->info().hash != hash)
688 return fail(
"invalid ledger hash");
690 ledger->stateMap().setLedgerSeq(ledgerSeq);
691 ledger->txMap().setLedgerSeq(ledgerSeq);
692 ledger->setImmutable();
693 if (!ledger->stateMap().fetchRoot(
694 SHAMapHash{ledger->info().accountHash},
nullptr))
696 return fail(
"missing root STATE node");
698 if (ledger->info().txHash.isNonZero() &&
699 !ledger->txMap().fetchRoot(
702 return fail(
"missing root TXN node");
706 return fail(
"failed to verify ledger");
708 if (!dShard->store(nodeObject))
709 return fail(
"failed to store node object");
712 return fail(
"failed storing to SQLite databases");
715 ledger->info().seq == ledgerSeq &&
719 hash = ledger->info().parentHash;
720 next = std::move(ledger);
727 fullBelowCache->reset();
728 treeNodeCache->reset();
767 auto const nodeObject{
769 if (!dShard->store(nodeObject))
770 return fail(
"failed to store node object");
777 backend_->store(nodeObject);
793 lgrSQLiteDB_.reset();
799 acquireInfo_.reset();
807 remove(
dir_ /
"nudb.key");
808 remove(
dir_ /
"nudb.dat");
809 rename(dShard->getDir() /
"nudb.key",
dir_ /
"nudb.key");
810 rename(dShard->getDir() /
"nudb.dat",
dir_ /
"nudb.dat");
814 return fail(
"failed to open");
824 std::string(
". Exception caught in function ") + __func__ +
825 ". Error: " + e.
what());
834 using namespace boost::filesystem;
836 auto preexist{
false};
839 lgrSQLiteDB_.reset();
841 acquireInfo_.reset();
855 auto createAcquireInfo = [
this, &config]() REQUIRES(
mutex_) {
857 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
862 acquireInfo_ = std::make_unique<AcquireInfo>();
874 preexist = exists(
dir_);
875 backend_->open(!preexist);
888 acquireInfo_->SQLiteDB->getSession(),
index_);
891 return fail(
"invalid acquire SQLite database");
895 auto& storedSeqs{acquireInfo_->storedSeqs};
897 return fail(
"invalid StoredLedgerSeqs");
899 if (boost::icl::first(storedSeqs) <
firstSeq_ ||
900 boost::icl::last(storedSeqs) >
lastSeq_)
902 return fail(
"invalid StoredLedgerSeqs");
906 progress_ = boost::icl::length(storedSeqs);
915 if (backend_->fetch(
finalKey.
data(), &nodeObject) != Status::ok)
918 return fail(
"incompatible, missing backend final key");
923 nodeObject->getData().data(), nodeObject->getData().size());
925 return fail(
"invalid version");
928 return fail(
"out of range ledger sequences");
931 return fail(
"invalid last ledger hash");
947 std::string(
". Exception caught in function ") + __func__ +
948 ". Error: " + e.
what());
964 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
974 lgrSQLiteDB_.reset();
986 lgrSQLiteDB_ = std::move(lgr);
987 lgrSQLiteDB_->getSession() << boost::str(
988 boost::format(
"PRAGMA cache_size=-%d;") %
992 txSQLiteDB_ = std::move(tx);
993 txSQLiteDB_->getSession() << boost::str(
994 boost::format(
"PRAGMA cache_size=-%d;") %
1010 lgrSQLiteDB_ = std::move(lgr);
1011 lgrSQLiteDB_->getSession() << boost::str(
1012 boost::format(
"PRAGMA cache_size=-%d;") %
1015 txSQLiteDB_ = std::move(tx);
1016 txSQLiteDB_->getSession() << boost::str(
1017 boost::format(
"PRAGMA cache_size=-%d;") %
1026 <<
". Exception caught in function " << __func__
1027 <<
". Error: " << e.
what();
1045 *txSQLiteDB_->checkoutDb(),
1046 *lgrSQLiteDB_->checkoutDb(),
1059 if (!acquireInfo_->storedSeqs.empty())
1060 s =
to_string(acquireInfo_->storedSeqs);
1063 acquireInfo_->SQLiteDB->getSession(),
1073 <<
". Exception caught in function " << __func__
1074 <<
". Error: " << e.
what();
1088 using namespace boost::filesystem;
1089 for (
auto const& d : directory_iterator(
dir_))
1091 if (is_regular_file(d))
1093 fileSz_ += file_size(d);
1101 <<
". Exception caught in function " << __func__
1102 <<
". Error: " << e.
what();
1113 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1114 << (ledger->info().hash.isZero() ?
""
1115 :
". Ledger hash " +
1117 << (ledger->info().seq == 0 ?
""
1118 :
". Ledger sequence " +
1123 if (ledger->info().hash.isZero())
1124 return fail(
"Invalid ledger hash");
1125 if (ledger->info().accountHash.isZero())
1126 return fail(
"Invalid ledger account hash");
1129 auto visit = [
this, &error, &dShard](
SHAMapTreeNode const& node) {
1133 auto nodeObject{
verifyFetch(node.getHash().as_uint256())};
1134 if (!nodeObject || !dShard->store(nodeObject))
1141 if (ledger->stateMap().getHash().isNonZero())
1143 if (!ledger->stateMap().isValid())
1144 return fail(
"Invalid state map");
1148 if (next && next->info().parentHash == ledger->info().hash)
1149 ledger->stateMap().visitDifferences(&next->stateMap(), visit);
1151 ledger->stateMap().visitNodes(visit);
1156 std::string(
". Exception caught in function ") + __func__ +
1157 ". Error: " + e.
what());
1163 return fail(
"Invalid state map");
1167 if (ledger->info().txHash.isNonZero())
1169 if (!ledger->txMap().isValid())
1170 return fail(
"Invalid transaction map");
1174 ledger->txMap().visitNodes(visit);
1179 std::string(
". Exception caught in function ") + __func__ +
1180 ". Error: " + e.
what());
1186 return fail(
"Invalid transaction map");
1198 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1199 <<
". Node object hash " <<
to_string(hash);
1207 switch (backend_->fetch(hash.data(), &nodeObject))
1211 if (nodeObject->getHash() !=
1213 return fail(
"Node object hash does not match payload");
1216 return fail(
"Missing node object");
1218 return fail(
"Corrupt node object");
1220 return fail(
"Unknown error");
1226 std::string(
". Exception caught in function ") + __func__ +
1227 ". Error: " + e.
what());
1240 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
1243 if (!backend_->isOpen())
1256 std::function<
bool(soci::session& session)>
const& callback,
1259 return callback(*db);
1268 return callback(*db,
index_);