diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index a9bda926a..cac31fc92 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -51,6 +51,7 @@ #include #include #include +#include #include #include #include @@ -419,9 +420,11 @@ public: , m_nodeStoreScheduler (*this) - , m_shaMapStore (make_SHAMapStore (*this, setup_SHAMapStore (*config_), - *this, m_nodeStoreScheduler, logs_->journal("SHAMapStore"), - logs_->journal("NodeObject"), m_txMaster, *config_)) + , m_shaMapStore(make_SHAMapStore( + *this, + *this, + m_nodeStoreScheduler, + logs_->journal("SHAMapStore"))) , accountIDCache_(128000) @@ -443,14 +446,15 @@ public: m_collectorManager->group ("jobq"), m_nodeStoreScheduler, logs_->journal("JobQueue"), *logs_, *perfLog_)) - // - // Anything which calls addJob must be a descendant of the JobQueue - // - , m_nodeStore ( - m_shaMapStore->makeDatabase ("NodeStore.main", 4, *m_jobQueue)) + , m_nodeStore(m_shaMapStore->makeNodeStore("NodeStore.main", 4)) - , shardStore_ ( - m_shaMapStore->makeDatabaseShard ("ShardStore", 4, *m_jobQueue)) + // The shard store is optional and make_ShardStore can return null. + , shardStore_(make_ShardStore( + *this, + *m_jobQueue, + m_nodeStoreScheduler, + 4, + logs_->journal("ShardStore"))) , family_ (*this, *m_nodeStore, *m_collectorManager) @@ -535,8 +539,13 @@ public: logs_->journal("Application"), std::chrono::milliseconds (100), get_io_service()) { if (shardStore_) + { sFamily_ = std::make_unique( - *this, *shardStore_, *m_collectorManager); + *this, + *shardStore_, + *m_collectorManager); + } + add (m_resourceManager.get ()); // @@ -853,9 +862,6 @@ public: setup.startUp == Config::LOAD_FILE || setup.startUp == Config::REPLAY) { - // perform any needed table updates - updateTxnDB(); - // Check if AccountTransactions has primary key std::string cid, name, type; std::size_t notnull, dflt_value, pk; @@ -913,14 +919,6 @@ public: bool initNodeStoreDBs() { - if (config_->section(ConfigSection::nodeDatabase()).empty()) - { - JLOG(m_journal.fatal()) << - "The [node_db] configuration setting " << - "has been updated and must be set"; - return false; - } - if (config_->doImport) { auto j = logs_->journal("NodeObject"); @@ -1251,7 +1249,6 @@ private: // and new validations must be greater than this. std::atomic maxDisallowedLedger_ {0}; - void updateTxnDB (); bool nodeToShards (); bool validateShards (); void startGenesisLedger (); @@ -1632,6 +1629,9 @@ int ApplicationImp::fdlimit() const // doubled if online delete is enabled). needed += std::max(5, m_shaMapStore->fdlimit()); + if (shardStore_) + needed += shardStore_->fdlimit(); + // One fd per incoming connection a port can accept, or // if no limit is set, assume it'll handle 256 clients. for(auto const& p : serverHandler_->setup().ports) @@ -2068,111 +2068,6 @@ ApplicationImp::journal (std::string const& name) return logs_->journal (name); } -void -ApplicationImp::updateTxnDB() -{ - auto schemaHas = [&](std::string const& column) - { - std::string cid, name; - soci::statement st = (mTxnDB->getSession().prepare << - ("PRAGMA table_info(AccountTransactions);"), - soci::into(cid), - soci::into(name)); - - st.execute(); - while (st.fetch()) - { - if (name == column) - return true; - } - - return false; - }; - - assert(schemaHas("TransID")); - assert(!schemaHas("foobar")); - - if (schemaHas("TxnSeq")) - return; - - JLOG (m_journal.warn()) << "Transaction sequence field is missing"; - - auto& session = getTxnDB ().getSession (); - - std::vector< std::pair > txIDs; - txIDs.reserve (300000); - - JLOG (m_journal.info()) << "Parsing transactions"; - int i = 0; - uint256 transID; - - boost::optional strTransId; - soci::blob sociTxnMetaBlob(session); - soci::indicator tmi; - Blob txnMeta; - - soci::statement st = - (session.prepare << - "SELECT TransID, TxnMeta FROM Transactions;", - soci::into(strTransId), - soci::into(sociTxnMetaBlob, tmi)); - - st.execute (); - while (st.fetch ()) - { - if (soci::i_ok == tmi) - convert (sociTxnMetaBlob, txnMeta); - else - txnMeta.clear (); - - std::string tid = strTransId.value_or(""); - transID.SetHex (tid, true); - - if (txnMeta.size () == 0) - { - txIDs.push_back (std::make_pair (transID, -1)); - JLOG (m_journal.info()) << "No metadata for " << transID; - } - else - { - TxMeta m (transID, 0, txnMeta); - txIDs.push_back (std::make_pair (transID, m.getIndex ())); - } - - if ((++i % 1000) == 0) - { - JLOG (m_journal.info()) << i << " transactions read"; - } - } - - JLOG (m_journal.info()) << "All " << i << " transactions read"; - - soci::transaction tr(session); - - JLOG (m_journal.info()) << "Dropping old index"; - session << "DROP INDEX AcctTxIndex;"; - - JLOG (m_journal.info()) << "Altering table"; - session << "ALTER TABLE AccountTransactions ADD COLUMN TxnSeq INTEGER;"; - - boost::format fmt ("UPDATE AccountTransactions SET TxnSeq = %d WHERE TransID = '%s';"); - i = 0; - for (auto& t : txIDs) - { - session << boost::str (fmt % t.second % to_string (t.first)); - - if ((++i % 1000) == 0) - { - JLOG (m_journal.info()) << i << " transactions updated"; - } - } - - JLOG (m_journal.info()) << "Building new index"; - session << "CREATE INDEX AcctTxIndex ON AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);"; - - tr.commit (); -} - bool ApplicationImp::nodeToShards() { assert(m_overlay); diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index da8f320c3..8a647a97a 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -31,28 +31,12 @@ class TransactionMaster; /** * class to create database, launch online delete thread, and - * related sqlite databse + * related SQLite database */ class SHAMapStore : public Stoppable { public: - struct Setup - { - explicit Setup() = default; - - bool standalone = false; - std::uint32_t deleteInterval = 0; - bool advisoryDelete = false; - std::uint32_t ledgerHistory = 0; - Section nodeDatabase; - std::string databasePath; - std::uint32_t deleteBatch = 100; - std::uint32_t backOff = 100; - std::int32_t ageThreshold = 60; - Section shardDatabase; - }; - SHAMapStore (Stoppable& parent) : Stoppable ("SHAMapStore", parent) {} /** Called by LedgerMaster every time a ledger validates. */ @@ -62,13 +46,9 @@ public: virtual std::uint32_t clampFetchDepth (std::uint32_t fetch_depth) const = 0; - virtual std::unique_ptr makeDatabase ( - std::string const& name, - std::int32_t readThreads, Stoppable& parent) = 0; - - virtual std::unique_ptr makeDatabaseShard( - std::string const& name, std::int32_t readThreads, - Stoppable& parent) = 0; + virtual + std::unique_ptr + makeNodeStore(std::string const& name, std::int32_t readThreads) = 0; /** Highest ledger that may be deleted. */ virtual LedgerIndex setCanDelete (LedgerIndex canDelete) = 0; @@ -88,19 +68,12 @@ public: //------------------------------------------------------------------------------ -SHAMapStore::Setup -setup_SHAMapStore(Config const& c); - std::unique_ptr make_SHAMapStore( Application& app, - SHAMapStore::Setup const& s, Stoppable& parent, NodeStore::Scheduler& scheduler, - beast::Journal journal, - beast::Journal nodeStoreJournal, - TransactionMaster& transactionMaster, - BasicConfig const& conf); + beast::Journal journal); } #endif diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index 00ca2cb9b..5f4a06a7e 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -24,7 +24,8 @@ #include #include #include -#include + +#include namespace ripple { void SHAMapStoreImp::SavedStateDB::init (BasicConfig const& config, @@ -160,100 +161,78 @@ SHAMapStoreImp::SavedStateDB::setLastRotated (LedgerIndex seq) //------------------------------------------------------------------------------ -SHAMapStoreImp::SHAMapStoreImp ( - Application& app, - Setup const& setup, - Stoppable& parent, - NodeStore::Scheduler& scheduler, - beast::Journal journal, - beast::Journal nodeStoreJournal, - TransactionMaster& transactionMaster, - BasicConfig const& config) +SHAMapStoreImp::SHAMapStoreImp( + Application& app, + Stoppable& parent, + NodeStore::Scheduler& scheduler, + beast::Journal journal) : SHAMapStore (parent) , app_ (app) - , setup_ (setup) , scheduler_ (scheduler) , journal_ (journal) - , nodeStoreJournal_ (nodeStoreJournal) , working_(true) - , transactionMaster_ (transactionMaster) , canDelete_ (std::numeric_limits ::max()) { - if (setup_.deleteInterval) + Config& config {app.config()}; + Section& section {config.section(ConfigSection::nodeDatabase())}; + if (section.empty()) { - auto const minInterval = setup.standalone ? - minimumDeletionIntervalSA_ : minimumDeletionInterval_; - if (setup_.deleteInterval < minInterval) + Throw( + "Missing [" + ConfigSection::nodeDatabase() + + "] entry in configuration file"); + + } + + // RocksDB only. Use sensible defaults if no values specified. + if (boost::beast::detail::iequals( + get(section, "type"), "RocksDB")) + { + if (!section.exists("cache_mb")) { - Throw ("online_delete must be at least " + + section.set("cache_mb", std::to_string( + config.getSize(siHashNodeDBCache))); + } + + if (!section.exists("filter_bits") && (config.NODE_SIZE >= 2)) + section.set("filter_bits", "10"); + } + + get_if_exists(section, "delete_batch", deleteBatch_); + get_if_exists(section, "backOff", backOff_); + get_if_exists(section, "age_threshold", ageThreshold_); + get_if_exists(section, "online_delete", deleteInterval_); + + if (deleteInterval_) + { + get_if_exists(section, "advisory_delete", advisoryDelete_); + + auto const minInterval = config.standalone() ? + minimumDeletionIntervalSA_ : minimumDeletionInterval_; + if (deleteInterval_ < minInterval) + { + Throw("online_delete must be at least " + std::to_string (minInterval)); } - if (setup_.ledgerHistory > setup_.deleteInterval) + if (config.LEDGER_HISTORY > deleteInterval_) { - Throw ( + Throw( "online_delete must not be less than ledger_history (currently " + - std::to_string (setup_.ledgerHistory) + ")"); + std::to_string (config.LEDGER_HISTORY) + ")"); } - state_db_.init (config, dbName_); - + state_db_.init(config, dbName_); dbPaths(); } - if (! setup_.shardDatabase.empty()) - { - // The node and shard stores must use - // the same earliest ledger sequence - std::array seq; - if (get_if_exists( - setup_.nodeDatabase, "earliest_seq", seq[0])) - { - if (get_if_exists( - setup_.shardDatabase, "earliest_seq", seq[1]) && - seq[0] != seq[1]) - { - Throw("earliest_seq set more than once"); - } - } - - boost::filesystem::path dbPath = - get(setup_.shardDatabase, "path"); - if (dbPath.empty()) - Throw("shard path missing"); - if (boost::filesystem::exists(dbPath)) - { - if (! boost::filesystem::is_directory(dbPath)) - Throw("shard db path must be a directory."); - } - else - boost::filesystem::create_directories(dbPath); - - auto const maxDiskSpace = get( - setup_.shardDatabase, "max_size_gb", 0); - // Must be large enough for one shard - if (maxDiskSpace < 3) - Throw("max_size_gb too small"); - if ((maxDiskSpace << 30) < maxDiskSpace) - Throw("overflow max_size_gb"); - - std::uint32_t lps; - if (get_if_exists( - setup_.shardDatabase, "ledgers_per_shard", lps)) - { - // ledgers_per_shard to be set only in standalone for testing - if (! setup_.standalone) - Throw( - "ledgers_per_shard only honored in stand alone"); - } - } } std::unique_ptr -SHAMapStoreImp::makeDatabase (std::string const& name, - std::int32_t readThreads, Stoppable& parent) +SHAMapStoreImp::makeNodeStore(std::string const& name, std::int32_t readThreads) { + // Anything which calls addJob must be a descendant of the JobQueue. + // Therefore Database objects use the JobQueue as Stoppable parent. std::unique_ptr db; - if (setup_.deleteInterval) + if (deleteInterval_) { SavedState state = state_db_.getState(); auto writableBackend = makeBackendRotating(state.writableDb); @@ -267,40 +246,32 @@ SHAMapStoreImp::makeDatabase (std::string const& name, // Create NodeStore with two backends to allow online deletion of data auto dbr = std::make_unique( - "NodeStore.main", scheduler_, readThreads, parent, - std::move(writableBackend), std::move(archiveBackend), - setup_.nodeDatabase, nodeStoreJournal_); + name, + scheduler_, + readThreads, + app_.getJobQueue(), + std::move(writableBackend), + std::move(archiveBackend), + app_.config().section(ConfigSection::nodeDatabase()), + app_.logs().journal(nodeStoreName_)); fdlimit_ += dbr->fdlimit(); dbRotating_ = dbr.get(); db.reset(dynamic_cast(dbr.release())); } else { - db = NodeStore::Manager::instance().make_Database (name, scheduler_, - readThreads, parent, setup_.nodeDatabase, nodeStoreJournal_); + db = NodeStore::Manager::instance().make_Database( + name, + scheduler_, + readThreads, + app_.getJobQueue(), + app_.config().section(ConfigSection::nodeDatabase()), + app_.logs().journal(nodeStoreName_)); fdlimit_ += db->fdlimit(); } return db; } -std::unique_ptr -SHAMapStoreImp::makeDatabaseShard(std::string const& name, - std::int32_t readThreads, Stoppable& parent) -{ - std::unique_ptr db; - if(! setup_.shardDatabase.empty()) - { - db = std::make_unique( - app_, name, parent, scheduler_, readThreads, - setup_.shardDatabase, app_.journal("ShardStore")); - if (db->init()) - fdlimit_ += db->fdlimit(); - else - db.reset(); - } - return db; -} - void SHAMapStoreImp::onLedgerClosed( std::shared_ptr const& ledger) @@ -359,7 +330,7 @@ SHAMapStoreImp::run() transactionDb_ = &app_.getTxnDB(); ledgerDb_ = &app_.getLedgerDB(); - if (setup_.advisoryDelete) + if (advisoryDelete_) canDelete_ = state_db_.getCanDelete (); while (1) @@ -393,12 +364,12 @@ SHAMapStoreImp::run() } // will delete up to (not including) lastRotated) - if (validatedSeq >= lastRotated + setup_.deleteInterval + if (validatedSeq >= lastRotated + deleteInterval_ && canDelete_ >= lastRotated - 1) { JLOG(journal_.debug()) << "rotating validatedSeq " << validatedSeq << " lastRotated " << lastRotated << " deleteInterval " - << setup_.deleteInterval << " canDelete_ " << canDelete_; + << deleteInterval_ << " canDelete_ " << canDelete_; switch (health()) { @@ -498,8 +469,8 @@ SHAMapStoreImp::run() void SHAMapStoreImp::dbPaths() { - boost::filesystem::path dbPath = - get(setup_.nodeDatabase, "path"); + Section section {app_.config().section(ConfigSection::nodeDatabase())}; + boost::filesystem::path dbPath = get(section, "path"); if (boost::filesystem::exists (dbPath)) { @@ -536,7 +507,8 @@ SHAMapStoreImp::dbPaths() (writableDbExists != archiveDbExists) || state.writableDb.empty() != state.archiveDb.empty()) { - boost::filesystem::path stateDbPathName = setup_.databasePath; + boost::filesystem::path stateDbPathName = + app_.config().legacy("database_path"); stateDbPathName /= dbName_; stateDbPathName += "*"; @@ -550,7 +522,7 @@ SHAMapStoreImp::dbPaths() << "remove the files matching " << stateDbPathName.string() << " and contents of the directory " - << get(setup_.nodeDatabase, "path") + << dbPath << std::endl; Throw ("state db error"); @@ -560,8 +532,8 @@ SHAMapStoreImp::dbPaths() std::unique_ptr SHAMapStoreImp::makeBackendRotating (std::string path) { + Section section {app_.config().section(ConfigSection::nodeDatabase())}; boost::filesystem::path newPath; - Section parameters = setup_.nodeDatabase; if (path.size()) { @@ -569,15 +541,15 @@ SHAMapStoreImp::makeBackendRotating (std::string path) } else { - boost::filesystem::path p = get(parameters, "path"); + boost::filesystem::path p = get(section, "path"); p /= dbPrefix_; p += ".%%%%"; newPath = boost::filesystem::unique_path (p); } - parameters.set("path", newPath.string()); + section.set("path", newPath.string()); auto backend {NodeStore::Manager::instance().make_Backend( - parameters, scheduler_, nodeStoreJournal_)}; + section, scheduler_, app_.logs().journal(nodeStoreName_))}; backend->open(); return backend; } @@ -608,7 +580,7 @@ SHAMapStoreImp::clearSql (DatabaseCon& database, "start: " << deleteQuery << " from " << min << " to " << lastRotated; while (min < lastRotated) { - min = std::min(lastRotated, min + setup_.deleteBatch); + min = std::min(lastRotated, min + deleteBatch_); { auto db = database.checkoutDb (); *db << boost::str (formattedDeleteQuery % min); @@ -617,7 +589,7 @@ SHAMapStoreImp::clearSql (DatabaseCon& database, return true; if (min < lastRotated) std::this_thread::sleep_for ( - std::chrono::milliseconds (setup_.backOff)); + std::chrono::milliseconds (backOff_)); } JLOG(journal_.debug()) << "finished: " << deleteQuery; return true; @@ -637,7 +609,7 @@ SHAMapStoreImp::freshenCaches() return; if (freshenCache (*treeNodeCache_)) return; - if (freshenCache (transactionMaster_.getCache())) + if (freshenCache (app_.getMasterTransaction().getCache())) return; } @@ -684,11 +656,11 @@ SHAMapStoreImp::health() NetworkOPs::OperatingMode mode = netOPs_->getOperatingMode(); auto age = ledgerMaster_->getValidatedLedgerAge(); - if (mode != NetworkOPs::omFULL || age.count() >= setup_.ageThreshold) + if (mode != NetworkOPs::omFULL || age.count() >= ageThreshold_) { JLOG(journal_.warn()) << "Not deleting. state: " << mode << " age " << age.count() - << " age threshold " << setup_.ageThreshold; + << " age threshold " << ageThreshold_; healthy_ = false; } @@ -701,7 +673,7 @@ SHAMapStoreImp::health() void SHAMapStoreImp::onStop() { - if (setup_.deleteInterval) + if (deleteInterval_) { { std::lock_guard lock (mutex_); @@ -718,7 +690,7 @@ SHAMapStoreImp::onStop() void SHAMapStoreImp::onChildrenStopped() { - if (setup_.deleteInterval) + if (deleteInterval_) { { std::lock_guard lock (mutex_); @@ -733,52 +705,15 @@ SHAMapStoreImp::onChildrenStopped() } //------------------------------------------------------------------------------ -SHAMapStore::Setup -setup_SHAMapStore (Config const& c) -{ - SHAMapStore::Setup setup; - - setup.standalone = c.standalone(); - - // Get existing settings and add some default values if not specified: - setup.nodeDatabase = c.section (ConfigSection::nodeDatabase ()); - - // These two parameters apply only to RocksDB. We want to give them sensible - // defaults if no values are specified. - if (!setup.nodeDatabase.exists ("cache_mb")) - setup.nodeDatabase.set ("cache_mb", std::to_string (c.getSize (siHashNodeDBCache))); - - if (!setup.nodeDatabase.exists ("filter_bits") && (c.NODE_SIZE >= 2)) - setup.nodeDatabase.set ("filter_bits", "10"); - - get_if_exists (setup.nodeDatabase, "online_delete", setup.deleteInterval); - - if (setup.deleteInterval) - get_if_exists (setup.nodeDatabase, "advisory_delete", setup.advisoryDelete); - - setup.ledgerHistory = c.LEDGER_HISTORY; - setup.databasePath = c.legacy("database_path"); - - get_if_exists (setup.nodeDatabase, "delete_batch", setup.deleteBatch); - get_if_exists (setup.nodeDatabase, "backOff", setup.backOff); - get_if_exists (setup.nodeDatabase, "age_threshold", setup.ageThreshold); - - setup.shardDatabase = c.section(ConfigSection::shardDatabase()); - return setup; -} std::unique_ptr -make_SHAMapStore (Application& app, - SHAMapStore::Setup const& setup, - Stoppable& parent, - NodeStore::Scheduler& scheduler, - beast::Journal journal, - beast::Journal nodeStoreJournal, - TransactionMaster& transactionMaster, - BasicConfig const& config) +make_SHAMapStore( + Application& app, + Stoppable& parent, + NodeStore::Scheduler& scheduler, + beast::Journal journal) { - return std::make_unique(app, setup, parent, scheduler, - journal, nodeStoreJournal, transactionMaster, config); + return std::make_unique(app, parent, scheduler, journal); } } diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index 32146ac06..cbf876e33 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -84,10 +84,8 @@ private: // minimum # of ledgers required for standalone mode. static std::uint32_t const minimumDeletionIntervalSA_ = 8; - Setup setup_; NodeStore::Scheduler& scheduler_; beast::Journal journal_; - beast::Journal nodeStoreJournal_; NodeStore::DatabaseRotating* dbRotating_ = nullptr; SavedStateDB state_db_; std::thread thread_; @@ -98,8 +96,15 @@ private: mutable std::mutex mutex_; std::shared_ptr newLedger_; std::atomic working_; - TransactionMaster& transactionMaster_; std::atomic canDelete_; + int fdlimit_ = 0; + + std::uint32_t deleteInterval_ = 0; + bool advisoryDelete_ = false; + std::uint32_t deleteBatch_ = 100; + std::uint32_t backOff_ = 100; + std::int32_t ageThreshold_ = 60; + // these do not exist upon SHAMapStore creation, but do exist // as of onPrepare() or before NetworkOPs* netOPs_ = nullptr; @@ -108,17 +113,15 @@ private: TreeNodeCache* treeNodeCache_ = nullptr; DatabaseCon* transactionDb_ = nullptr; DatabaseCon* ledgerDb_ = nullptr; - int fdlimit_ = 0; + + static constexpr auto nodeStoreName_ = "NodeStore"; public: - SHAMapStoreImp (Application& app, - Setup const& setup, - Stoppable& parent, - NodeStore::Scheduler& scheduler, - beast::Journal journal, - beast::Journal nodeStoreJournal, - TransactionMaster& transactionMaster, - BasicConfig const& config); + SHAMapStoreImp( + Application& app, + Stoppable& parent, + NodeStore::Scheduler& scheduler, + beast::Journal journal); ~SHAMapStoreImp() { @@ -129,22 +132,17 @@ public: std::uint32_t clampFetchDepth (std::uint32_t fetch_depth) const override { - return setup_.deleteInterval ? std::min (fetch_depth, - setup_.deleteInterval) : fetch_depth; + return deleteInterval_ ? std::min (fetch_depth, + deleteInterval_) : fetch_depth; } - std::unique_ptr makeDatabase ( - std::string const&name, - std::int32_t readThreads, Stoppable& parent) override; - - std::unique_ptr - makeDatabaseShard(std::string const& name, - std::int32_t readThreads, Stoppable& parent) override; + std::unique_ptr + makeNodeStore(std::string const& name, std::int32_t readThreads) override; LedgerIndex setCanDelete (LedgerIndex seq) override { - if (setup_.advisoryDelete) + if (advisoryDelete_) canDelete_ = seq; return state_db_.setCanDelete (seq); } @@ -152,7 +150,7 @@ public: bool advisoryDelete() const override { - return setup_.advisoryDelete; + return advisoryDelete_; } // All ledgers prior to this one are eligible @@ -230,7 +228,7 @@ private: void onStart() override { - if (setup_.deleteInterval) + if (deleteInterval_) thread_ = std::thread (&SHAMapStoreImp::run, this); } diff --git a/src/ripple/core/Stoppable.h b/src/ripple/core/Stoppable.h index 61ee09f5a..cca36e013 100644 --- a/src/ripple/core/Stoppable.h +++ b/src/ripple/core/Stoppable.h @@ -209,6 +209,16 @@ public: /** Destroy the Stoppable. */ virtual ~Stoppable (); + RootStoppable& getRoot() {return m_root;} + + /** Set the parent of this Stoppable. + + @note The Stoppable must not already have a parent. + The parent to be set cannot not be stopping. + Both roots must match. + */ + void setParent(Stoppable& parent); + /** Returns `true` if the stoppable should stop. */ bool isStopping () const; @@ -318,6 +328,7 @@ private: std::condition_variable m_cv; std::mutex m_mut; bool m_is_stopping = false; + bool hasParent_ {false}; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/core/impl/Stoppable.cpp b/src/ripple/core/impl/Stoppable.cpp index 3c163f0b0..5a4755b4c 100644 --- a/src/ripple/core/impl/Stoppable.cpp +++ b/src/ripple/core/impl/Stoppable.cpp @@ -17,7 +17,9 @@ */ //============================================================================== +#include #include + #include namespace ripple { @@ -34,16 +36,23 @@ Stoppable::Stoppable (std::string name, Stoppable& parent) , m_root (parent.m_root) , m_child (this) { - // Must not have stopping parent. - assert (! parent.isStopping()); - - parent.m_children.push_front (&m_child); + setParent(parent); } Stoppable::~Stoppable () { } +void Stoppable::setParent (Stoppable& parent) +{ + assert(!hasParent_); + assert(!parent.isStopping()); + assert(std::addressof(m_root) == std::addressof(parent.m_root)); + + parent.m_children.push_front(&m_child); + hasParent_ = true; +} + bool Stoppable::isStopping() const { return m_root.isStopping(); diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index 5a4f6fe06..e4394a79b 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -60,7 +60,8 @@ public: @param name The Stoppable name for this Database. @param parent The parent Stoppable. @param scheduler The scheduler to use for performing asynchronous tasks. - @param readThreads The number of async read threads to create. + @param readThreads The number of asynchronous read threads to create. + @param config The configuration settings @param journal Destination for logging output. */ Database(std::string name, Stoppable& parent, Scheduler& scheduler, @@ -282,7 +283,7 @@ private: // The default is 32570 to match the XRP ledger network's earliest // allowed sequence. Alternate networks may set this value. - std::uint32_t earliestSeq_ {XRP_LEDGER_EARLIEST_SEQ}; + std::uint32_t const earliestSeq_; virtual std::shared_ptr diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 0af67d721..a6bd7dd28 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -42,8 +42,8 @@ public: @param name The Stoppable name for this Database @param parent The parent Stoppable @param scheduler The scheduler to use for performing asynchronous tasks - @param readThreads The number of async read threads to create - @param config The configuration for the database + @param readThreads The number of asynchronous read threads to create + @param config The shard configuration section for the database @param journal Destination for logging output */ DatabaseShard( @@ -89,9 +89,9 @@ public: bool prepareShard(std::uint32_t shardIndex) = 0; - /** Remove shard indexes from prepared import + /** Remove a previously prepared shard index for import - @param indexes Shard indexes to be removed from import + @param shardIndex Shard index to be removed from import */ virtual void @@ -219,6 +219,15 @@ seqToShardIndex(std::uint32_t seq, return (seq - 1) / ledgersPerShard; } +extern +std::unique_ptr +make_ShardStore( + Application& app, + Stoppable& parent, + Scheduler& scheduler, + int readThreads, + beast::Journal j); + } } diff --git a/src/ripple/nodestore/impl/Database.cpp b/src/ripple/nodestore/impl/Database.cpp index 2d7bdced4..5cf0ddf38 100644 --- a/src/ripple/nodestore/impl/Database.cpp +++ b/src/ripple/nodestore/impl/Database.cpp @@ -33,17 +33,16 @@ Database::Database( int readThreads, Section const& config, beast::Journal journal) - : Stoppable(name, parent) + : Stoppable(name, parent.getRoot()) , j_(journal) , scheduler_(scheduler) + , earliestSeq_(get( + config, + "earliest_seq", + XRP_LEDGER_EARLIEST_SEQ)) { - std::uint32_t seq; - if (get_if_exists(config, "earliest_seq", seq)) - { - if (seq < 1) - Throw("Invalid earliest_seq"); - earliestSeq_ = seq; - } + if (earliestSeq_ < 1) + Throw("Invalid earliest_seq"); while (readThreads-- > 0) readThreads_.emplace_back(&Database::threadEntry, this); diff --git a/src/ripple/nodestore/impl/DatabaseNodeImp.h b/src/ripple/nodestore/impl/DatabaseNodeImp.h index 2458ddbd5..4e62f9ace 100644 --- a/src/ripple/nodestore/impl/DatabaseNodeImp.h +++ b/src/ripple/nodestore/impl/DatabaseNodeImp.h @@ -49,6 +49,7 @@ public: , backend_(std::move(backend)) { assert(backend_); + setParent(parent); } ~DatabaseNodeImp() override diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index ea15494b3..4e83d8f48 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -45,6 +45,7 @@ DatabaseRotatingImp::DatabaseRotatingImp( fdLimit_ += writableBackend_->fdlimit(); if (archiveBackend_) fdLimit_ += archiveBackend_->fdlimit(); + setParent(parent); } // Make sure to call it already locked! diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index dbdbe05cb..291cc7399 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -17,12 +17,13 @@ */ //============================================================================== - #include #include #include +#include #include #include +#include #include #include #include @@ -32,30 +33,24 @@ namespace ripple { namespace NodeStore { -constexpr std::uint32_t DatabaseShard::ledgersPerShardDefault; - DatabaseShardImp::DatabaseShardImp( Application& app, - std::string const& name, Stoppable& parent, + std::string const& name, Scheduler& scheduler, int readThreads, - Section const& config, beast::Journal j) - : DatabaseShard(name, parent, scheduler, readThreads, config, j) + : DatabaseShard( + name, + parent, + scheduler, + readThreads, + app.config().section(ConfigSection::shardDatabase()), + j) , app_(app) - , ctx_(std::make_unique()) - , config_(config) - , dir_(get(config, "path")) - , backendName_(Manager::instance().find( - get(config, "type", "nudb"))->getName()) - , maxDiskSpace_(get(config, "max_size_gb") << 30) - , ledgersPerShard_(get( - config, "ledgers_per_shard", ledgersPerShardDefault)) , earliestShardIndex_(seqToShardIndex(earliestSeq())) - , avgShardSz_(ledgersPerShard_ * (192 * 1024)) + , avgShardSz_(ledgersPerShard_ * kilobytes(192)) { - ctx_->start(); } DatabaseShardImp::~DatabaseShardImp() @@ -85,36 +80,94 @@ DatabaseShardImp::init() "Already initialized"; return false; } - if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + + auto fail = [&](std::string const& msg) { JLOG(j_.error()) << - "ledgers_per_shard must be a multiple of 256"; + "[" << ConfigSection::shardDatabase() << "] " << msg; return false; + }; + + Config const& config {app_.config()}; + Section const& section {config.section(ConfigSection::shardDatabase())}; + if (section.empty()) + return fail("missing configuration"); + + { + // Node and shard stores must use same earliest ledger sequence + std::uint32_t seq; + if (get_if_exists( + config.section(ConfigSection::nodeDatabase()), + "earliest_seq", + seq)) + { + std::uint32_t seq2; + if (get_if_exists(section, "earliest_seq", seq2) && + seq != seq2) + { + return fail("and [" + ConfigSection::shardDatabase() + + "] both define 'earliest_seq'"); + } + } + } + + if (!get_if_exists(section, "path", dir_)) + return fail("'path' missing"); + + if (boost::filesystem::exists(dir_)) + { + if (!boost::filesystem::is_directory(dir_)) + return fail("'path' must be a directory"); + } + else + boost::filesystem::create_directories(dir_); + + { + std::uint64_t i; + if (!get_if_exists(section, "max_size_gb", i)) + return fail("'max_size_gb' missing"); + + // Minimum disk space required (in gigabytes) + static constexpr auto minDiskSpace = 10; + if (i < minDiskSpace) + { + return fail("'max_size_gb' must be at least " + + std::to_string(minDiskSpace)); + } + + if ((i << 30) < i) + return fail("'max_size_gb' overflow"); + + // Convert to bytes + maxDiskSpace_ = i << 30; + } + + if (section.exists("ledgers_per_shard")) + { + // To be set only in standalone for testing + if (!config.standalone()) + return fail("'ledgers_per_shard' only honored in stand alone"); + + ledgersPerShard_ = get(section, "ledgers_per_shard"); + if (ledgersPerShard_ == 0 || ledgersPerShard_ % 256 != 0) + return fail("'ledgers_per_shard' must be a multiple of 256"); } // NuDB is the default and only supported permanent storage backend // "Memory" and "none" types are supported for tests + backendName_ = get(section, "type", "nudb"); if (!iequals(backendName_, "NuDB") && !iequals(backendName_, "Memory") && !iequals(backendName_, "none")) { - JLOG(j_.error()) << - "Unsupported shard store type: " << backendName_; - return false; + return fail("'type' value unsupported"); } + // Find backend file handle requirement + if (auto factory = Manager::instance().find(backendName_)) { - // Find backend file handle requirement - auto factory {Manager::instance().find(backendName_)}; - if (!factory) - { - JLOG(j_.error()) << - "Failed to create shard store type " << backendName_; - return false; - } - - auto backend {factory->createInstance(NodeObject::keyBytes, - config_, scheduler_, *ctx_, j_)}; + auto backend {factory->createInstance( + NodeObject::keyBytes, section, scheduler_, j_)}; backed_ = backend->backed(); if (!backed_) { @@ -123,9 +176,14 @@ DatabaseShardImp::init() } fdLimit_ = backend->fdlimit(); } + else + return fail("'type' value unsupported"); try { + ctx_ = std::make_unique(); + ctx_->start(); + // Find shards for (auto const& d : directory_iterator(dir_)) { @@ -166,7 +224,7 @@ DatabaseShardImp::init() auto shard {std::make_unique( *this, shardIndex, cacheSz_, cacheAge_, j_)}; - if (!shard->open(config_, scheduler_, *ctx_)) + if (!shard->open(section, scheduler_, *ctx_)) return false; usedDiskSpace_ += shard->fileSize(); @@ -250,7 +308,10 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) 1, static_cast(complete_.size() + 1)))}; incomplete_ = std::make_unique( *this, *shardIndex, sz, cacheAge_, j_); - if (!incomplete_->open(config_, scheduler_, *ctx_)) + if (!incomplete_->open( + app_.config().section(ConfigSection::shardDatabase()), + scheduler_, + *ctx_)) { incomplete_.reset(); return boost::none; @@ -422,7 +483,7 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, // Create the new shard auto shard {std::make_unique( *this, shardIndex, cacheSz_, cacheAge_, j_)}; - auto fail = [&](std::string msg) + auto fail = [&](std::string const& msg) { if (!msg.empty()) { @@ -434,8 +495,13 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, return false; }; - if (!shard->open(config_, scheduler_, *ctx_)) + if (!shard->open( + app_.config().section(ConfigSection::shardDatabase()), + scheduler_, + *ctx_)) + { return fail({}); + } if (!shard->complete()) return fail("incomplete shard"); @@ -750,7 +816,10 @@ DatabaseShardImp::import(Database& source) auto const shardDir {dir_ / std::to_string(shardIndex)}; auto shard = std::make_unique( *this, shardIndex, shardCacheSz, cacheAge_, j_); - if (!shard->open(config_, scheduler_, *ctx_)) + if (!shard->open( + app_.config().section(ConfigSection::shardDatabase()), + scheduler_, + *ctx_)) { shard.reset(); continue; @@ -1192,5 +1261,36 @@ DatabaseShardImp::available() const } } +//------------------------------------------------------------------------------ + +std::unique_ptr +make_ShardStore( + Application& app, + Stoppable& parent, + Scheduler& scheduler, + int readThreads, + beast::Journal j) +{ + // The shard store is optional. Future changes will require it. + Section const& section { + app.config().section(ConfigSection::shardDatabase())}; + if (section.empty()) + return nullptr; + + auto shardStore = std::make_unique( + app, + parent, + "ShardStore", + scheduler, + readThreads, + j); + if (shardStore->init()) + shardStore->setParent(parent); + else + shardStore.reset(); + + return shardStore; +} + } // NodeStore } // ripple diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 68e3a4b19..841c17b0c 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -37,11 +37,10 @@ public: DatabaseShardImp( Application& app, - std::string const& name, Stoppable& parent, + std::string const& name, Scheduler& scheduler, int readThreads, - Section const& config, beast::Journal j); ~DatabaseShardImp() override; @@ -180,8 +179,8 @@ private: // Shards prepared for import std::map preShards_; - Section const config_; - boost::filesystem::path const dir_; + // The shard store root directory + boost::filesystem::path dir_; // If new shards can be stored bool canAdd_ {true}; @@ -193,10 +192,10 @@ private: bool backed_; // The name associated with the backend used with the shard store - std::string const backendName_; + std::string backendName_; // Maximum disk space the DB can use (in bytes) - std::uint64_t const maxDiskSpace_; + std::uint64_t maxDiskSpace_; // Disk space used to store the shards (in bytes) std::uint64_t usedDiskSpace_ {0}; @@ -204,7 +203,7 @@ private: // Each shard stores 16384 ledgers. The earliest shard may store // less if the earliest ledger sequence truncates its beginning. // The value should only be altered for unit tests. - std::uint32_t const ledgersPerShard_; + std::uint32_t ledgersPerShard_ = ledgersPerShardDefault; // The earliest shard index std::uint32_t const earliestShardIndex_; @@ -218,7 +217,7 @@ private: // File name used to mark shards being imported from node store static constexpr auto importMarker_ = "import"; - + std::shared_ptr fetchFrom(uint256 const& hash, std::uint32_t seq) override; diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index 376a5a7c1..53e085038 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -65,14 +65,7 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx) return false; } - boost::system::error_code ec; - auto const preexist {exists(dir_, ec)}; - if (ec) - { - JLOG(j_.error()) << - "shard " << index_ << ": " << ec.message(); - return false; - } + auto const preexist {exists(dir_)}; config.set("path", dir_.string()); backend_ = factory->createInstance( @@ -85,6 +78,8 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx) JLOG(j_.error()) << "shard " << index_ << ": " << msg; } + if (backend_) + backend_->close(); if (!preexist) removeAll(dir_, j_); return false; @@ -92,6 +87,7 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx) try { + // Open/Create the NuDB key/value store for node objects backend_->open(!preexist); if (!backend_->backed()) diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index b95ab9151..1e5666e38 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -152,8 +152,12 @@ private: // Path to control file boost::filesystem::path const control_; + // Disk space utilized by the shard std::uint64_t fileSize_ {0}; + + // NuDB key/value store for node objects std::shared_ptr backend_; + beast::Journal j_; // True if shard has its entire ledger range stored