#include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include #include namespace xrpl { void SHAMapStoreImp::SavedStateDB::init(BasicConfig const& config, std::string const& dbName) { std::scoped_lock const lock(mutex); initStateDB(sqlDb, config, dbName); } LedgerIndex SHAMapStoreImp::SavedStateDB::getCanDelete() { std::scoped_lock const lock(mutex); return xrpl::getCanDelete(sqlDb); } LedgerIndex SHAMapStoreImp::SavedStateDB::setCanDelete(LedgerIndex canDelete) { std::scoped_lock const lock(mutex); return xrpl::setCanDelete(sqlDb, canDelete); } SavedState SHAMapStoreImp::SavedStateDB::getState() { std::scoped_lock const lock(mutex); return xrpl::getSavedState(sqlDb); } void SHAMapStoreImp::SavedStateDB::setState(SavedState const& state) { std::scoped_lock const lock(mutex); xrpl::setSavedState(sqlDb, state); } void SHAMapStoreImp::SavedStateDB::setLastRotated(LedgerIndex seq) { std::scoped_lock const lock(mutex); xrpl::setLastRotated(sqlDb, seq); } //------------------------------------------------------------------------------ SHAMapStoreImp::SHAMapStoreImp( Application& app, NodeStore::Scheduler& scheduler, beast::Journal journal) : app_(app) , scheduler_(scheduler) , journal_(journal) , working_(true) , canDelete_(std::numeric_limits::max()) { Config& config{app.config()}; Section& section{config.section(ConfigSection::nodeDatabase())}; if (section.empty()) { Throw( "Missing [" + ConfigSection::nodeDatabase() + "] entry in configuration file"); } // RocksDB only. Use sensible defaults if no values specified. if (boost::iequals(get(section, "type"), "RocksDB")) { if (!section.exists("cache_mb")) { section.set("cache_mb", std::to_string(config.getValueFor(SizedItem::HashNodeDbCache))); } if (!section.exists("filter_bits") && (config.nodeSize >= 2)) section.set("filter_bits", "10"); } getIfExists(section, "online_delete", deleteInterval_); if (deleteInterval_ != 0u) { // Configuration that affects the behavior of online delete getIfExists(section, "delete_batch", deleteBatch_); std::uint32_t temp = 0; if (getIfExists(section, "back_off_milliseconds", temp) || // Included for backward compatibility with an undocumented setting getIfExists(section, "backOff", temp)) { backOff_ = std::chrono::milliseconds{temp}; } if (getIfExists(section, "age_threshold_seconds", temp)) ageThreshold_ = std::chrono::seconds{temp}; if (getIfExists(section, "recovery_wait_seconds", temp)) recoveryWaitTime_ = std::chrono::seconds{temp}; getIfExists(section, "advisory_delete", advisoryDelete_); auto const minInterval = config.standalone() ? kMinimumDeletionIntervalSa : kMinimumDeletionInterval; if (deleteInterval_ < minInterval) { Throw( "online_delete must be at least " + std::to_string(minInterval)); } if (config.ledgerHistory > deleteInterval_) { Throw( "online_delete must not be less than ledger_history " "(currently " + std::to_string(config.ledgerHistory) + ")"); } stateDb_.init(config, dbName_); dbPaths(); } } std::unique_ptr SHAMapStoreImp::makeNodeStore(int readThreads) { auto nscfg = app_.config().section(ConfigSection::nodeDatabase()); // Provide default values. if (!nscfg.exists("cache_size")) { nscfg.set( "cache_size", std::to_string(app_.config().getValueFor(SizedItem::TreeCacheSize, std::nullopt))); } if (!nscfg.exists("cache_age")) { nscfg.set( "cache_age", std::to_string(app_.config().getValueFor(SizedItem::TreeCacheAge, std::nullopt))); } std::unique_ptr db; if (deleteInterval_ != 0u) { SavedState state = stateDb_.getState(); auto writableBackend = makeBackendRotating(state.writableDb); auto archiveBackend = makeBackendRotating(state.archiveDb); if (state.writableDb.empty()) { state.writableDb = writableBackend->getName(); state.archiveDb = archiveBackend->getName(); stateDb_.setState(state); } // Create NodeStore with two backends to allow online deletion of // data auto dbr = std::make_unique( scheduler_, readThreads, std::move(writableBackend), std::move(archiveBackend), nscfg, app_.getJournal(kNodeStoreName)); fdRequired_ += dbr->fdRequired(); dbRotating_ = dbr.get(); db.reset(dynamic_cast(dbr.release())); } else { db = NodeStore::Manager::instance().makeDatabase( megabytes(app_.config().getValueFor(SizedItem::BurstSize, std::nullopt)), scheduler_, readThreads, nscfg, app_.getJournal(kNodeStoreName)); fdRequired_ += db->fdRequired(); } return db; } void SHAMapStoreImp::onLedgerClosed(std::shared_ptr const& ledger) { { std::scoped_lock const lock(mutex_); newLedger_ = ledger; working_ = true; } cond_.notify_one(); } void SHAMapStoreImp::rendezvous() const { if (!working_) return; std::unique_lock lock(mutex_); rendezvous_.wait(lock, [&] { return !working_; }); } int SHAMapStoreImp::fdRequired() const { return fdRequired_; } bool SHAMapStoreImp::copyNode(std::uint64_t& nodeCount, SHAMapTreeNode const& node) { // Copy a single record from node to dbRotating_ dbRotating_->fetchNodeObject( node.getHash().asUInt256(), 0, NodeStore::FetchType::Synchronous, true); if ((++nodeCount % checkHealthInterval_) == 0u) { if (healthWait() == HealthResult::Stopping) return false; } return true; } void SHAMapStoreImp::run() { beast::setCurrentThreadName("SHAMapStore"); LedgerIndex lastRotated = stateDb_.getState().lastRotated; netOPs_ = &app_.getOPs(); ledgerMaster_ = &app_.getLedgerMaster(); fullBelowCache_ = &(*app_.getNodeFamily().getFullBelowCache()); treeNodeCache_ = &(*app_.getNodeFamily().getTreeNodeCache()); if (advisoryDelete_) canDelete_ = stateDb_.getCanDelete(); while (true) { healthy_ = true; std::shared_ptr validatedLedger; { std::unique_lock lock(mutex_); working_ = false; rendezvous_.notify_all(); if (stop_) { return; } cond_.wait(lock); if (newLedger_) { validatedLedger = std::move(newLedger_); } else { continue; } } LedgerIndex const validatedSeq = validatedLedger->header().seq; if (lastRotated == 0u) { lastRotated = validatedSeq; stateDb_.setLastRotated(lastRotated); } bool const readyToRotate = validatedSeq >= lastRotated + deleteInterval_ && canDelete_ >= lastRotated - 1 && healthWait() == HealthResult::KeepGoing; // will delete up to (not including) lastRotated if (readyToRotate) { JLOG(journal_.warn()) << "rotating validatedSeq " << validatedSeq << " lastRotated " << lastRotated << " deleteInterval " << deleteInterval_ << " canDelete_ " << canDelete_ << " state " << app_.getOPs().strOperatingMode(false) << " age " << ledgerMaster_->getValidatedLedgerAge().count() << 's'; clearPrior(lastRotated); if (healthWait() == HealthResult::Stopping) return; JLOG(journal_.debug()) << "copying ledger " << validatedSeq; std::uint64_t nodeCount = 0; try { validatedLedger->stateMap().snapShot(false)->visitNodes( std::bind( &SHAMapStoreImp::copyNode, this, std::ref(nodeCount), std::placeholders::_1)); } catch (SHAMapMissingNode const& e) { JLOG(journal_.error()) << "Missing node while copying ledger before rotate: " << e.what(); continue; } if (healthWait() == HealthResult::Stopping) return; // Only log if we completed without a "health" abort JLOG(journal_.debug()) << "copied ledger " << validatedSeq << " nodecount " << nodeCount; JLOG(journal_.debug()) << "freshening caches"; freshenCaches(); if (healthWait() == HealthResult::Stopping) return; // Only log if we completed without a "health" abort JLOG(journal_.debug()) << validatedSeq << " freshened caches"; JLOG(journal_.trace()) << "Making a new backend"; auto newBackend = makeBackendRotating(); JLOG(journal_.debug()) << validatedSeq << " new backend " << newBackend->getName(); clearCaches(validatedSeq); if (healthWait() == HealthResult::Stopping) return; lastRotated = validatedSeq; dbRotating_->rotate( std::move(newBackend), [&](std::string const& writableName, std::string const& archiveName) { SavedState savedState; savedState.writableDb = writableName; savedState.archiveDb = archiveName; savedState.lastRotated = lastRotated; stateDb_.setState(savedState); clearCaches(validatedSeq); }); JLOG(journal_.warn()) << "finished rotation " << validatedSeq; } } } void SHAMapStoreImp::dbPaths() { Section const section{app_.config().section(ConfigSection::nodeDatabase())}; boost::filesystem::path dbPath = get(section, "path"); if (boost::filesystem::exists(dbPath)) { if (!boost::filesystem::is_directory(dbPath)) { journal_.error() << "node db path must be a directory. " << dbPath.string(); Throw("node db path must be a directory."); } } else { boost::filesystem::create_directories(dbPath); } SavedState state = stateDb_.getState(); { auto update = [&dbPath](std::string& sPath) { if (sPath.empty()) return false; // Check if configured "path" matches stored directory path using namespace boost::filesystem; auto const stored{path(sPath)}; if (stored.parent_path() == dbPath) return false; sPath = (dbPath / stored.filename()).string(); return true; }; if (update(state.writableDb)) { update(state.archiveDb); stateDb_.setState(state); } } bool writableDbExists = false; bool archiveDbExists = false; std::vector pathsToDelete; for (boost::filesystem::directory_iterator it(dbPath); it != boost::filesystem::directory_iterator(); ++it) { if (state.writableDb.compare(it->path().string()) == 0) { writableDbExists = true; } else if (state.archiveDb.compare(it->path().string()) == 0) { archiveDbExists = true; } else if (dbPrefix_.compare(it->path().stem().string()) == 0) { pathsToDelete.push_back(it->path()); } } if ((!writableDbExists && !state.writableDb.empty()) || (!archiveDbExists && !state.archiveDb.empty()) || (writableDbExists != archiveDbExists) || state.writableDb.empty() != state.archiveDb.empty()) { boost::filesystem::path stateDbPathName = app_.config().legacy("database_path"); stateDbPathName /= dbName_; stateDbPathName += "*"; journal_.error() << "state db error:\n" << " writableDbExists " << writableDbExists << " archiveDbExists " << archiveDbExists << '\n' << " writableDb '" << state.writableDb << "' archiveDb '" << state.archiveDb << "\n\n" << "The existing data is in a corrupted state.\n" << "To resume operation, remove the files matching " << stateDbPathName.string() << " and contents of the directory " << get(section, "path") << '\n' << "Optionally, you can move those files to another\n" << "location if you wish to analyze or back up the data.\n" << "However, there is no guarantee that the data in its\n" << "existing form is usable."; Throw("state db error"); } // The necessary directories exist. Now, remove any others. for (boost::filesystem::path const& p : pathsToDelete) boost::filesystem::remove_all(p); } std::unique_ptr SHAMapStoreImp::makeBackendRotating(std::string path) { Section section{app_.config().section(ConfigSection::nodeDatabase())}; boost::filesystem::path newPath; if (!path.empty()) { newPath = path; } else { boost::filesystem::path p = get(section, "path"); p /= dbPrefix_; p += ".%%%%"; newPath = boost::filesystem::unique_path(p); } section.set("path", newPath.string()); auto backend{NodeStore::Manager::instance().makeBackend( section, megabytes(app_.config().getValueFor(SizedItem::BurstSize, std::nullopt)), scheduler_, app_.getJournal(kNodeStoreName))}; backend->open(); return backend; } void SHAMapStoreImp::clearSql( LedgerIndex lastRotated, std::string const& tableName, std::function()> const& getMinSeq, std::function const& deleteBeforeSeq) { XRPL_ASSERT(deleteInterval_, "xrpl::SHAMapStoreImp::clearSql : nonzero delete interval"); LedgerIndex min = std::numeric_limits::max(); { JLOG(journal_.trace()) << "Begin: Look up lowest value of: " << tableName; auto m = getMinSeq(); JLOG(journal_.trace()) << "End: Look up lowest value of: " << tableName; if (!m) return; min = *m; } if (min > lastRotated || healthWait() == HealthResult::Stopping) return; if (min == lastRotated) { // Micro-optimization mainly to clarify logs JLOG(journal_.trace()) << "Nothing to delete from " << tableName; return; } JLOG(journal_.debug()) << "start deleting in: " << tableName << " from " << min << " to " << lastRotated; while (min < lastRotated) { min = std::min(lastRotated, min + deleteBatch_); JLOG(journal_.trace()) << "Begin: Delete up to " << deleteBatch_ << " rows with LedgerSeq < " << min << " from: " << tableName; deleteBeforeSeq(min); JLOG(journal_.trace()) << "End: Delete up to " << deleteBatch_ << " rows with LedgerSeq < " << min << " from: " << tableName; if (healthWait() == HealthResult::Stopping) return; if (min < lastRotated) std::this_thread::sleep_for(backOff_); if (healthWait() == HealthResult::Stopping) return; } JLOG(journal_.debug()) << "finished deleting from: " << tableName; } void SHAMapStoreImp::clearCaches(LedgerIndex validatedSeq) { ledgerMaster_->clearLedgerCachePrior(validatedSeq); // Also clear the FullBelowCache so its generation counter is bumped. // This prevents stale "full below" markers from persisting across // backend rotation/online deletion and interfering with SHAMap sync. fullBelowCache_->clear(); } void SHAMapStoreImp::freshenCaches() { if (freshenCache(*treeNodeCache_)) return; if (freshenCache(app_.getMasterTransaction().getCache())) return; } void SHAMapStoreImp::clearPrior(LedgerIndex lastRotated) { // Do not allow ledgers to be acquired from the network // that are about to be deleted. minimumOnline_ = lastRotated + 1; JLOG(journal_.trace()) << "Begin: Clear internal ledgers up to " << lastRotated; ledgerMaster_->clearPriorLedgers(lastRotated); JLOG(journal_.trace()) << "End: Clear internal ledgers up to " << lastRotated; if (healthWait() == HealthResult::Stopping) return; auto& db = app_.getRelationalDatabase(); clearSql( lastRotated, "Ledgers", [&db]() -> std::optional { return db.getMinLedgerSeq(); }, [&db](LedgerIndex min) -> void { db.deleteBeforeLedgerSeq(min); }); if (healthWait() == HealthResult::Stopping) return; if (!app_.config().useTxTables()) return; clearSql( lastRotated, "Transactions", [&db]() -> std::optional { return db.getTransactionsMinLedgerSeq(); }, [&db](LedgerIndex min) -> void { db.deleteTransactionsBeforeLedgerSeq(min); }); if (healthWait() == HealthResult::Stopping) return; clearSql( lastRotated, "AccountTransactions", [&db]() -> std::optional { return db.getAccountTransactionsMinLedgerSeq(); }, [&db](LedgerIndex min) -> void { db.deleteAccountTransactionsBeforeLedgerSeq(min); }); if (healthWait() == HealthResult::Stopping) return; } SHAMapStoreImp::HealthResult SHAMapStoreImp::healthWait() { auto age = ledgerMaster_->getValidatedLedgerAge(); OperatingMode mode = netOPs_->getOperatingMode(); std::unique_lock lock(mutex_); while (!stop_ && (mode != OperatingMode::FULL || age > ageThreshold_)) { lock.unlock(); JLOG(journal_.warn()) << "Waiting " << recoveryWaitTime_.count() << "s for node to stabilize. state: " << app_.getOPs().strOperatingMode(mode, false) << ". age " << age.count() << 's'; std::this_thread::sleep_for(recoveryWaitTime_); age = ledgerMaster_->getValidatedLedgerAge(); mode = netOPs_->getOperatingMode(); lock.lock(); } return stop_ ? HealthResult::Stopping : HealthResult::KeepGoing; } void SHAMapStoreImp::stop() { if (thread_.joinable()) { { std::scoped_lock const lock(mutex_); stop_ = true; cond_.notify_one(); } thread_.join(); } } std::optional SHAMapStoreImp::minimumOnline() const { // minimumOnline_ with 0 value is equivalent to unknown/not set. // Don't attempt to acquire ledgers if that value is unknown. if ((deleteInterval_ != 0u) && (minimumOnline_ != 0u)) return minimumOnline_.load(); return app_.getLedgerMaster().minSqlSeq(); } //------------------------------------------------------------------------------ std::unique_ptr makeSHAMapStore(Application& app, NodeStore::Scheduler& scheduler, beast::Journal journal) { return std::make_unique(app, scheduler, journal); } } // namespace xrpl