20 #include <ripple/app/ledger/InboundLedger.h>
21 #include <ripple/app/main/DBInit.h>
22 #include <ripple/app/rdb/backend/detail/Shard.h>
23 #include <ripple/basics/StringUtilities.h>
24 #include <ripple/core/ConfigSections.h>
25 #include <ripple/nodestore/Manager.h>
26 #include <ripple/nodestore/impl/DeterministicShard.h>
27 #include <ripple/nodestore/impl/Shard.h>
28 #include <ripple/protocol/digest.h>
40 :
Shard(app, db, index,
"", j)
48 boost::filesystem::path
const& dir,
53 , firstSeq_(db.firstLedgerSeq(index))
54 , lastSeq_(
std::max(firstSeq_, db.lastLedgerSeq(index)))
55 , maxLedgers_(db.maxLedgers(index))
56 , dir_((dir.empty() ? db.getRootDir() : dir) /
std::
to_string(index_))
71 <<
" backend in use, unable to remove directory";
84 boost::filesystem::remove_all(
dir_);
89 <<
". Exception caught in function " << __func__
90 <<
". Error: " << e.
what();
102 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" failed to find factory for "
106 section.set(
"path",
dir_.string());
111 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" already initialized";
114 backend_ = factory->createInstance(
132 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
136 return backend_->isOpen();
154 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
157 if (!backend_->isOpen())
167 <<
". Exception caught in function " << __func__
168 <<
". Error: " << e.
what();
172 lgrSQLiteDB_.reset();
174 acquireInfo_.reset();
189 <<
" prepare called when not acquiring";
197 <<
" missing acquire SQLite database";
201 if (acquireInfo_->storedSeqs.empty())
212 if (nodeObject->getHash() !=
finalKey)
227 backend_->store(nodeObject);
232 <<
". Exception caught in function " << __func__
233 <<
". Error: " << e.
what();
254 status = backend_->fetch(hash.
data(), &nodeObject);
259 <<
". Exception caught in function " << __func__
260 <<
". Error: " << e.
what();
271 <<
"shard " <<
index_ <<
". Corrupt node object at hash "
277 <<
"shard " <<
index_ <<
". Unknown status=" << status
278 <<
" fetching node object at hash " <<
to_string(hash);
303 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
". Ledger already stored";
308 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
". Source ledger sequence "
309 << srcLedger->info().seq <<
". " << msg;
314 if (srcLedger->info().hash.isZero())
315 return fail(
"Invalid hash");
316 if (srcLedger->info().accountHash.isZero())
317 return fail(
"Invalid account hash");
319 auto& srcDB{
const_cast<Database&
>(srcLedger->stateMap().family().db())};
321 return fail(
"Source and destination databases are the same");
325 return fail(
"Failed to lock backend");
329 auto storeBatch = [&]() {
331 for (
auto const& nodeObject : batch)
332 sz += nodeObject->getData().size();
337 backend_->storeBatch(batch);
342 std::string(
". Exception caught in function ") + __func__ +
343 ". Error: " + e.
what());
347 result.
count += batch.size();
357 addRaw(srcLedger->info(), s);
367 if (
auto nodeObject = srcDB.fetchNodeObject(
368 node.getHash().as_uint256(), srcLedger->info().seq))
381 if (srcLedger->stateMap().getHash().isNonZero())
383 if (!srcLedger->stateMap().isValid())
384 return fail(
"Invalid state map");
386 if (next && next->info().parentHash == srcLedger->info().hash)
388 auto have = next->stateMap().snapShot(
false);
389 srcLedger->stateMap().snapShot(
false)->visitDifferences(
393 srcLedger->stateMap().snapShot(
false)->visitNodes(visit);
395 return fail(
"Failed to store state map");
399 if (srcLedger->info().txHash.isNonZero())
401 if (!srcLedger->txMap().isValid())
402 return fail(
"Invalid transaction map");
404 srcLedger->txMap().snapShot(
false)->visitNodes(visit);
406 return fail(
"Failed to store transaction map");
409 if (!batch.
empty() && !storeBatch())
410 return fail(
"Failed to store");
430 auto const ledgerSeq{ledger->info().seq};
431 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
432 return fail(
"Invalid ledger sequence " +
std::to_string(ledgerSeq));
445 return fail(
"Missing acquire SQLite database");
447 if (boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq))
450 JLOG(
j_.
debug()) <<
"shard " <<
index_ <<
" ledger sequence "
451 << ledgerSeq <<
" already stored";
457 return fail(
"Failed to store ledger");
462 acquireInfo_->storedSeqs.insert(ledgerSeq);
466 auto session{acquireInfo_->SQLiteDB->checkoutDb()};
467 soci::blob sociBlob(*session);
472 auto const sHash{
to_string(ledger->info().hash)};
473 *session <<
"UPDATE Shard "
474 "SET LastLedgerHash = :lastLedgerHash,"
475 "StoredLedgerSeqs = :storedLedgerSeqs "
476 "WHERE ShardIndex = :shardIndex;",
477 soci::use(sHash), soci::use(sociBlob), soci::use(
index_);
481 *session <<
"UPDATE Shard "
482 "SET StoredLedgerSeqs = :storedLedgerSeqs "
483 "WHERE ShardIndex = :shardIndex;",
484 soci::use(sociBlob), soci::use(
index_);
489 acquireInfo_->storedSeqs.erase(ledgerSeq);
491 std::string(
". Exception caught in function ") + __func__ +
492 ". Error: " + e.
what());
496 progress_ = boost::icl::length(acquireInfo_->storedSeqs);
501 JLOG(
j_.
trace()) <<
"shard " <<
index_ <<
" stored ledger sequence "
509 if (ledgerSeq < firstSeq_ || ledgerSeq >
lastSeq_)
518 <<
" missing acquire SQLite database";
521 return boost::icl::contains(acquireInfo_->storedSeqs, ledgerSeq);
524 std::chrono::steady_clock::time_point
535 return {fileSz_, fdRequired_};
545 return backend_->getWriteLoad();
566 << (hash.isZero() ?
""
568 << (ledgerSeq == 0 ?
""
569 :
". Ledger sequence " +
586 backend_->fetch(
finalKey.
data(), &nodeObject) == Status::ok)
590 nodeObject->getData().data(), nodeObject->getData().size());
592 return fail(
"invalid version");
595 return fail(
"out of range ledger sequences");
598 return fail(
"invalid last ledger hash");
605 return fail(
"missing acquire SQLite database");
608 *acquireInfo_->SQLiteDB->checkoutDb(),
index_);
611 return fail(
"missing or invalid ShardIndex");
614 return fail(
"missing LastLedgerHash");
616 if (!hash.parseHex(*seqshash.hash) || hash.isZero())
617 return fail(
"invalid LastLedgerHash");
619 if (!seqshash.sequences)
620 return fail(
"missing StoredLedgerSeqs");
622 auto& storedSeqs{acquireInfo_->storedSeqs};
623 if (!
from_string(storedSeqs, *seqshash.sequences) ||
624 boost::icl::first(storedSeqs) !=
firstSeq_ ||
625 boost::icl::last(storedSeqs) !=
lastSeq_ ||
628 return fail(
"invalid StoredLedgerSeqs");
635 std::string(
". Exception caught in function ") + __func__ +
636 ". Error: " + e.
what());
641 if (referenceHash && *referenceHash != hash)
642 return fail(
"invalid last ledger hash");
648 auto const lastLedgerHash{hash};
651 auto const treeNodeCache{shardFamily.getTreeNodeCache(
lastSeq_)};
654 fullBelowCache->reset();
655 treeNodeCache->reset();
666 return fail(
"Failed to create deterministic shard");
678 return fail(
"invalid ledger");
680 ledger = std::make_shared<Ledger>(
684 if (ledger->info().seq != ledgerSeq)
685 return fail(
"invalid ledger sequence");
686 if (ledger->info().hash != hash)
687 return fail(
"invalid ledger hash");
689 ledger->stateMap().setLedgerSeq(ledgerSeq);
690 ledger->txMap().setLedgerSeq(ledgerSeq);
694 ledger->setImmutable();
695 if (!ledger->stateMap().fetchRoot(
696 SHAMapHash{ledger->info().accountHash},
nullptr))
698 return fail(
"missing root STATE node");
700 if (ledger->info().txHash.isNonZero() &&
701 !ledger->txMap().fetchRoot(
704 return fail(
"missing root TXN node");
708 return fail(
"failed to verify ledger");
710 if (!dShard->store(nodeObject))
711 return fail(
"failed to store node object");
714 return fail(
"failed storing to SQLite databases");
716 hash = ledger->info().parentHash;
717 next = std::move(ledger);
724 fullBelowCache->reset();
725 treeNodeCache->reset();
764 auto const nodeObject{
766 if (!dShard->store(nodeObject))
767 return fail(
"failed to store node object");
774 backend_->store(nodeObject);
790 lgrSQLiteDB_.reset();
796 acquireInfo_.reset();
804 remove(
dir_ /
"nudb.key");
805 remove(
dir_ /
"nudb.dat");
806 rename(dShard->getDir() /
"nudb.key",
dir_ /
"nudb.key");
807 rename(dShard->getDir() /
"nudb.dat",
dir_ /
"nudb.dat");
811 return fail(
"failed to open");
821 std::string(
". Exception caught in function ") + __func__ +
822 ". Error: " + e.
what());
831 using namespace boost::filesystem;
833 auto preexist{
false};
836 lgrSQLiteDB_.reset();
838 acquireInfo_.reset();
852 auto createAcquireInfo = [
this, &config]() REQUIRES(
mutex_) {
854 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
859 acquireInfo_ = std::make_unique<AcquireInfo>();
871 preexist = exists(
dir_);
872 backend_->open(!preexist);
885 acquireInfo_->SQLiteDB->getSession(),
index_);
888 return fail(
"invalid acquire SQLite database");
892 auto& storedSeqs{acquireInfo_->storedSeqs};
894 return fail(
"invalid StoredLedgerSeqs");
896 if (boost::icl::first(storedSeqs) <
firstSeq_ ||
897 boost::icl::last(storedSeqs) >
lastSeq_)
899 return fail(
"invalid StoredLedgerSeqs");
903 progress_ = boost::icl::length(storedSeqs);
912 if (backend_->fetch(
finalKey.
data(), &nodeObject) != Status::ok)
915 return fail(
"incompatible, missing backend final key");
920 nodeObject->getData().data(), nodeObject->getData().size());
922 return fail(
"invalid version");
925 return fail(
"out of range ledger sequences");
928 return fail(
"invalid last ledger hash");
944 std::string(
". Exception caught in function ") + __func__ +
945 ". Error: " + e.
what());
961 setup.
startUp = config.standalone() ? config.LOAD : config.START_UP;
971 lgrSQLiteDB_.reset();
983 lgrSQLiteDB_ = std::move(lgr);
984 lgrSQLiteDB_->getSession() << boost::str(
985 boost::format(
"PRAGMA cache_size=-%d;") %
989 txSQLiteDB_ = std::move(tx);
990 txSQLiteDB_->getSession() << boost::str(
991 boost::format(
"PRAGMA cache_size=-%d;") %
1007 lgrSQLiteDB_ = std::move(lgr);
1008 lgrSQLiteDB_->getSession() << boost::str(
1009 boost::format(
"PRAGMA cache_size=-%d;") %
1012 txSQLiteDB_ = std::move(tx);
1013 txSQLiteDB_->getSession() << boost::str(
1014 boost::format(
"PRAGMA cache_size=-%d;") %
1023 <<
". Exception caught in function " << __func__
1024 <<
". Error: " << e.
what();
1042 *txSQLiteDB_->checkoutDb(),
1043 *lgrSQLiteDB_->checkoutDb(),
1056 if (!acquireInfo_->storedSeqs.empty())
1057 s =
to_string(acquireInfo_->storedSeqs);
1060 acquireInfo_->SQLiteDB->getSession(),
1070 <<
". Exception caught in function " << __func__
1071 <<
". Error: " << e.
what();
1085 using namespace boost::filesystem;
1086 for (
auto const& d : directory_iterator(
dir_))
1088 if (is_regular_file(d))
1090 fileSz_ += file_size(d);
1098 <<
". Exception caught in function " << __func__
1099 <<
". Error: " << e.
what();
1110 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1111 << (ledger->info().hash.isZero() ?
""
1112 :
". Ledger hash " +
1114 << (ledger->info().seq == 0 ?
""
1115 :
". Ledger sequence " +
1120 if (ledger->info().hash.isZero())
1121 return fail(
"Invalid ledger hash");
1122 if (ledger->info().accountHash.isZero())
1123 return fail(
"Invalid ledger account hash");
1126 auto visit = [
this, &error, &dShard](
SHAMapTreeNode const& node) {
1130 auto nodeObject{
verifyFetch(node.getHash().as_uint256())};
1131 if (!nodeObject || !dShard->store(nodeObject))
1138 if (ledger->stateMap().getHash().isNonZero())
1140 if (!ledger->stateMap().isValid())
1141 return fail(
"Invalid state map");
1145 if (next && next->info().parentHash == ledger->info().hash)
1146 ledger->stateMap().visitDifferences(&next->stateMap(), visit);
1148 ledger->stateMap().visitNodes(visit);
1153 std::string(
". Exception caught in function ") + __func__ +
1154 ". Error: " + e.
what());
1160 return fail(
"Invalid state map");
1164 if (ledger->info().txHash.isNonZero())
1166 if (!ledger->txMap().isValid())
1167 return fail(
"Invalid transaction map");
1171 ledger->txMap().visitNodes(visit);
1176 std::string(
". Exception caught in function ") + __func__ +
1177 ". Error: " + e.
what());
1183 return fail(
"Invalid transaction map");
1195 JLOG(j.error()) <<
"shard " <<
index <<
". " << msg
1196 <<
". Node object hash " <<
to_string(hash);
1204 switch (backend_->fetch(hash.data(), &nodeObject))
1208 if (nodeObject->getHash() !=
1210 return fail(
"Node object hash does not match payload");
1213 return fail(
"Missing node object");
1215 return fail(
"Corrupt node object");
1217 return fail(
"Unknown error");
1223 std::string(
". Exception caught in function ") + __func__ +
1224 ". Error: " + e.
what());
1237 JLOG(
j_.
error()) <<
"shard " <<
index_ <<
" not initialized";
1240 if (!backend_->isOpen())
1253 std::function<
bool(soci::session& session)>
const& callback,
1256 return callback(*db);
1265 return callback(*db,
index_);
Holds a collection of configuration values.
const boost::filesystem::path dir_
constexpr auto AcquireShardDBName
@ ledgerMaster
ledger master data for signing
virtual std::shared_ptr< TreeNodeCache > getTreeNodeCache(std::uint32_t ledgerSeq)=0
Return a pointer to the Family Tree Node Cache.
std::unique_ptr< DatabaseCon > makeAcquireDB(DatabaseCon::Setup const &setup, DatabaseCon::CheckpointerSetup const &checkpointerSetup)
makeAcquireDB Opens the shard acquire database and returns its descriptor.
std::enable_if_t< std::is_same< T, char >::value||std::is_same< T, unsigned char >::value, Slice > makeSlice(std::array< T, N > const &a)
Persistency layer for NodeObject.
std::atomic< bool > removeOnDestroy_
StoreLedgerResult storeLedger(std::shared_ptr< Ledger const > const &srcLedger, std::shared_ptr< Ledger const > const &next)
Stream trace() const
Severity stream access functions.
std::optional< std::uint32_t > prepare()
bool open(std::lock_guard< std::mutex > const &lock) REQUIRES(mutex_)
void convert(soci::blob &from, std::vector< std::uint8_t > &to)
void addRaw(LedgerInfo const &info, Serializer &s, bool includeHash)
std::pair< bool, AcquireShardSeqsHash > selectAcquireDBLedgerSeqsHash(soci::session &session, std::uint32_t index)
selectAcquireDBLedgerSeqsHash Returns the set of acquired ledger sequences and the last ledger hash f...
void insertAcquireDBIndex(soci::session &session, std::uint32_t index)
insertAcquireDBIndex Adds a new shard index to the shard acquire database.
Config::StartUpType startUp
static std::string shardDatabase()
Shard(Shard const &)=delete
Copy constructor (disallowed)
static std::shared_ptr< NodeObject > createObject(NodeObjectType type, Blob &&data, uint256 const &hash)
Create an object from fields.
std::shared_ptr< DeterministicShard > make_DeterministicShard(Application &app, boost::filesystem::path const &shardDir, std::uint32_t shardIndex, Serializer const &finalKey, beast::Journal j)
Creates shared pointer to deterministic shard and initializes it.
std::pair< std::uint64_t, std::uint32_t > getFileInfo() const
Returns a pair where the first item describes the storage space utilized and the second item is the n...
constexpr auto kilobytes(T value) noexcept
Contains information about a fetch operation.
bool isOpen() const
Returns true if the database are open.
boost::filesystem::path dataDir
bool from_string(RangeSet< T > &rs, std::string const &s)
Convert the given styled string to a RangeSet.
virtual std::shared_ptr< FullBelowCache > getFullBelowCache(std::uint32_t ledgerSeq)=0
Return a pointer to the Family Full Below Cache.
static const uint256 finalKey
const std::uint32_t lastSeq_
LedgerInfo deserializePrefixedHeader(Slice data, bool hasHash)
Deserialize a ledger header (prefixed with 4 bytes) from a byte array.
virtual NodeStore::Database & db()=0
std::shared_ptr< NodeObject > verifyFetch(uint256 const &hash) const
static constexpr std::uint32_t XRP_LEDGER_EARLIEST_FEES
The XRP Ledger mainnet's earliest ledger with a FeeSettings object.
bool tryClose()
Try to close databases if not in use.
@ batchWritePreallocationSize
bool isLegacy() const
Returns true if shard is older, without final key data.
static constexpr std::uint32_t version
bool storeSQLite(std::shared_ptr< Ledger const > const &ledger)
void updateAcquireDB(soci::session &session, std::shared_ptr< Ledger const > const &ledger, std::uint32_t index, std::uint32_t lastSeq, std::optional< std::string > const &seqs)
updateAcquireDB Updates information in the acquire DB.
bool containsLedger(std::uint32_t ledgerSeq) const
int getValueFor(SizedItem item, std::optional< std::size_t > node=std::nullopt) const
Retrieve the default value for the item at the specified node size.
bool storeNodeObject(std::shared_ptr< NodeObject > const &nodeObject)
bool initSQLite(std::lock_guard< std::mutex > const &) REQUIRES(mutex_)
virtual Config & config()=0
std::shared_ptr< NodeObject > fetchNodeObject(uint256 const &hash, FetchReport &fetchReport)
constexpr auto megabytes(T value) noexcept
void setFileStats(std::lock_guard< std::mutex > const &) REQUIRES(mutex_)
A collection of historical shards.
virtual JobQueue & getJobQueue()=0
const std::uint32_t firstSeq_
const std::uint32_t maxLedgers_
A generic endpoint for log messages.
bool verifyLedger(std::shared_ptr< Ledger const > const &ledger, std::shared_ptr< Ledger const > const &next, std::shared_ptr< DeterministicShard > const &dShard) const
Scheduling for asynchronous backend activity.
std::optional< T > prevMissing(RangeSet< T > const &rs, T t, T minVal=0)
Find the largest value not in the set that is less than a given value.
std::uint32_t index() const noexcept
Status
Return codes from Backend operations.
bool updateLedgerDBs(soci::session &txsession, soci::session &lgrsession, std::shared_ptr< Ledger const > const &ledger, std::uint32_t index, std::atomic< bool > &stop, beast::Journal j)
updateLedgerDBs Saves the given ledger to shard databases.
virtual Factory * find(std::string const &name)=0
Return a pointer to the matching factory if it exists.
std::atomic< std::uint32_t > progress_
T emplace_back(T... args)
Use hash_* containers for keys that do not need a cryptographically secure hashing algorithm.
int addBitString(base_uint< Bits, Tag > const &v)
static constexpr std::size_t keyBytes
Shard::Count makeBackendCount()
virtual Family * getShardFamily()=0
sha512_half_hasher::result_type sha512Half(Args const &... args)
Returns the SHA512-Half of a series of objects.
std::atomic< ShardState > state_
bool doCallForSQL(std::function< bool(soci::session &session)> const &callback, LockedSociSession &&db)
DatabasePair makeShardIncompleteLedgerDBs(Config const &config, DatabaseCon::Setup const &setup, DatabaseCon::CheckpointerSetup const &checkpointerSetup)
makeShardIncompleteLedgerDBs Opens shard databases for partially downloaded or unverified shards and ...
bool init(Scheduler &scheduler, nudb::context &context)
Initialize shard.
Keylet const & fees() noexcept
The (fixed) index of the object containing the ledger fees.
std::string to_string(Manifest const &m)
Format the specified manifest to a string for debugging purposes.
int add32(std::uint32_t i)
Information about the notional ledger backing the view.
bool finalize(bool writeSQLite, std::optional< uint256 > const &referenceHash)
Finalize shard by walking its ledgers, verifying each Merkle tree and creating a deterministic backen...
std::pair< bool, std::optional< std::string > > selectAcquireDBLedgerSeqs(soci::session &session, std::uint32_t index)
selectAcquireDBLedgerSeqs Returns the set of acquired ledgers for the given shard.
static Manager & instance()
Returns the instance of the manager singleton.
bool setLedgerStored(std::shared_ptr< Ledger const > const &ledger)
const std::uint32_t index_
std::atomic< bool > stop_
std::atomic< std::uint32_t > backendCount_
std::chrono::steady_clock::time_point getLastUse() const
std::int32_t getWriteLoad()
T & get(EitherAmount &amt)
DatabasePair makeShardCompleteLedgerDBs(Config const &config, DatabaseCon::Setup const &setup)
makeShardCompleteLedgerDBs Opens shard databases for verified shards and returns their descriptors.
Section & section(std::string const &name)
Returns the section with the given name.
std::atomic< bool > busy_