diff --git a/CMakeLists.txt b/CMakeLists.txt index 60aaf052d3..4f1bb2c649 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -2086,7 +2086,6 @@ else () src/ripple/app/main/Application.cpp src/ripple/app/main/BasicApp.cpp src/ripple/app/main/CollectorManager.cpp - src/ripple/app/main/DBInit.cpp src/ripple/app/main/LoadManager.cpp src/ripple/app/main/Main.cpp src/ripple/app/main/NodeIdentity.cpp diff --git a/src/ripple/app/main/Application.cpp b/src/ripple/app/main/Application.cpp index d87ff769fa..98b23a3481 100644 --- a/src/ripple/app/main/Application.cpp +++ b/src/ripple/app/main/Application.cpp @@ -582,7 +582,7 @@ public: void signalStop() override; bool checkSigs() const override; void checkSigs(bool) override; - int fdlimit () const override; + int fdRequired() const override; //-------------------------------------------------------------------------- @@ -858,12 +858,12 @@ public: // transaction database mTxnDB = std::make_unique ( setup, - TxnDBName, - TxnDBInit, - TxnDBCount); + TxDBName, + TxDBPragma, + TxDBInit); mTxnDB->getSession() << boost::str(boost::format("PRAGMA cache_size=-%d;") % - (config_->getSize(siTxnDBCache) * kilobytes(1))); + kilobytes(config_->getSize(siTxnDBCache))); mTxnDB->setupCheckpointing(m_jobQueue.get(), logs()); if (!setup.standAlone || @@ -900,20 +900,20 @@ public: // ledger database mLedgerDB = std::make_unique ( setup, - LedgerDBName, - LedgerDBInit, - LedgerDBCount); + LgrDBName, + LgrDBPragma, + LgrDBInit); mLedgerDB->getSession() << boost::str(boost::format("PRAGMA cache_size=-%d;") % - (config_->getSize(siLgrDBCache) * kilobytes(1))); + kilobytes(config_->getSize(siLgrDBCache))); mLedgerDB->setupCheckpointing(m_jobQueue.get(), logs()); // wallet database mWalletDB = std::make_unique ( setup, WalletDBName, - WalletDBInit, - WalletDBCount); + std::array(), + WalletDBInit); } catch (std::exception const& e) { @@ -963,11 +963,8 @@ public: family().treecache().setTargetAge( seconds{config_->getSize(siTreeCacheAge)}); - if (shardStore_) + if (sFamily_) { - shardStore_->tune( - config_->getSize(siNodeCacheSize), - seconds{config_->getSize(siNodeCacheAge)}); sFamily_->treecache().setTargetSize( config_->getSize(siTreeCacheSize)); sFamily_->treecache().setTargetAge( @@ -1174,9 +1171,10 @@ public: } DatabaseCon::Setup dbSetup = setup_DatabaseCon(*config_); - boost::filesystem::path dbPath = dbSetup.dataDir / TxnDBName; + boost::filesystem::path dbPath = dbSetup.dataDir / TxDBName; boost::system::error_code ec; - boost::optional dbSize = boost::filesystem::file_size(dbPath, ec); + boost::optional dbSize = + boost::filesystem::file_size(dbPath, ec); if (ec) { JLOG(m_journal.error()) @@ -1632,7 +1630,7 @@ void ApplicationImp::checkSigs(bool check) checkSigs_ = check; } -int ApplicationImp::fdlimit() const +int ApplicationImp::fdRequired() const { // Standard handles, config file, misc I/O etc: int needed = 128; @@ -1642,10 +1640,10 @@ int ApplicationImp::fdlimit() const // the number of fds needed by the backend (internally // doubled if online delete is enabled). - needed += std::max(5, m_shaMapStore->fdlimit()); + needed += std::max(5, m_shaMapStore->fdRequired()); if (shardStore_) - needed += shardStore_->fdlimit(); + needed += shardStore_->fdRequired(); // One fd per incoming connection a port can accept, or // if no limit is set, assume it'll handle 256 clients. diff --git a/src/ripple/app/main/Application.h b/src/ripple/app/main/Application.h index 82101c633c..9c8b03561c 100644 --- a/src/ripple/app/main/Application.h +++ b/src/ripple/app/main/Application.h @@ -183,8 +183,8 @@ public: virtual beast::Journal journal (std::string const& name) = 0; - /* Returns the number of file descriptors the application wants */ - virtual int fdlimit () const = 0; + /* Returns the number of file descriptors the application needs */ + virtual int fdRequired() const = 0; /** Retrieve the "wallet database" */ virtual DatabaseCon& getWalletDB () = 0; diff --git a/src/ripple/app/main/DBInit.cpp b/src/ripple/app/main/DBInit.cpp deleted file mode 100644 index 3f027942c2..0000000000 --- a/src/ripple/app/main/DBInit.cpp +++ /dev/null @@ -1,149 +0,0 @@ -//------------------------------------------------------------------------------ -/* - This file is part of rippled: https://github.com/ripple/rippled - Copyright (c) 2012, 2013 Ripple Labs Inc. - - Permission to use, copy, modify, and/or distribute this software for any - purpose with or without fee is hereby granted, provided that the above - copyright notice and this permission notice appear in all copies. - - THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES - WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF - MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR - ANY SPECIAL , DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES - WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN - ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF - OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. -*/ -//============================================================================== - -#include -#include - -namespace ripple { - -// Transaction database holds transactions and public keys -const char* TxnDBName = "transaction.db"; -const char* TxnDBInit[] = -{ - "PRAGMA page_size=4096;", - "PRAGMA synchronous=NORMAL;", - "PRAGMA journal_mode=WAL;", - "PRAGMA journal_size_limit=1582080;", - "PRAGMA max_page_count=2147483646;", - -#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP) - "PRAGMA mmap_size=17179869184;", -#endif - - "BEGIN TRANSACTION;", - - "CREATE TABLE IF NOT EXISTS Transactions ( \ - TransID CHARACTER(64) PRIMARY KEY, \ - TransType CHARACTER(24), \ - FromAcct CHARACTER(35), \ - FromSeq BIGINT UNSIGNED, \ - LedgerSeq BIGINT UNSIGNED, \ - Status CHARACTER(1), \ - RawTxn BLOB, \ - TxnMeta BLOB \ - );", - "CREATE INDEX IF NOT EXISTS TxLgrIndex ON \ - Transactions(LedgerSeq);", - - "CREATE TABLE IF NOT EXISTS AccountTransactions ( \ - TransID CHARACTER(64), \ - Account CHARACTER(64), \ - LedgerSeq BIGINT UNSIGNED, \ - TxnSeq INTEGER \ - );", - "CREATE INDEX IF NOT EXISTS AcctTxIDIndex ON \ - AccountTransactions(TransID);", - "CREATE INDEX IF NOT EXISTS AcctTxIndex ON \ - AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);", - "CREATE INDEX IF NOT EXISTS AcctLgrIndex ON \ - AccountTransactions(LedgerSeq, Account, TransID);", - - "END TRANSACTION;" -}; -int TxnDBCount = std::extent::value; - -// Ledger database holds ledgers and ledger confirmations -const char* LedgerDBName = "ledger.db"; -const char* LedgerDBInit[] = -{ - "PRAGMA synchronous=NORMAL;", - "PRAGMA journal_mode=WAL;", - "PRAGMA journal_size_limit=1582080;", - - "BEGIN TRANSACTION;", - - "CREATE TABLE IF NOT EXISTS Ledgers ( \ - LedgerHash CHARACTER(64) PRIMARY KEY, \ - LedgerSeq BIGINT UNSIGNED, \ - PrevHash CHARACTER(64), \ - TotalCoins BIGINT UNSIGNED, \ - ClosingTime BIGINT UNSIGNED, \ - PrevClosingTime BIGINT UNSIGNED, \ - CloseTimeRes BIGINT UNSIGNED, \ - CloseFlags BIGINT UNSIGNED, \ - AccountSetHash CHARACTER(64), \ - TransSetHash CHARACTER(64) \ - );", - "CREATE INDEX IF NOT EXISTS SeqLedger ON Ledgers(LedgerSeq);", - - // Old table and indexes no longer needed - "DROP TABLE IF EXISTS Validations;", - - "END TRANSACTION;" -}; -int LedgerDBCount = std::extent::value; - -const char* WalletDBName = "wallet.db"; -const char* WalletDBInit[] = -{ - "BEGIN TRANSACTION;", - - // A node's identity must be persisted, including - // for clustering purposes. This table holds one - // entry: the server's unique identity, but the - // value can be overriden by specifying a node - // identity in the config file using a [node_seed] - // entry. - "CREATE TABLE IF NOT EXISTS NodeIdentity ( \ - PublicKey CHARACTER(53), \ - PrivateKey CHARACTER(52) \ - );", - - // Peer reservations. - "CREATE TABLE IF NOT EXISTS PeerReservations ( \ - PublicKey CHARACTER(53) UNIQUE NOT NULL, \ - Description CHARACTER(64) NOT NULL \ - );", - - // Validator Manifests - "CREATE TABLE IF NOT EXISTS ValidatorManifests ( \ - RawData BLOB NOT NULL \ - );", - - "CREATE TABLE IF NOT EXISTS PublisherManifests ( \ - RawData BLOB NOT NULL \ - );", - - // Old tables that were present in wallet.db and we - // no longer need or use. - "DROP INDEX IF EXISTS SeedNodeNext;", - "DROP INDEX IF EXISTS SeedDomainNext;", - "DROP TABLE IF EXISTS Features;", - "DROP TABLE IF EXISTS TrustedNodes;", - "DROP TABLE IF EXISTS ValidatorReferrals;", - "DROP TABLE IF EXISTS IpReferrals;", - "DROP TABLE IF EXISTS SeedNodes;", - "DROP TABLE IF EXISTS SeedDomains;", - "DROP TABLE IF EXISTS Misc;", - - "END TRANSACTION;" -}; -int WalletDBCount = std::extent::value; - -} // ripple diff --git a/src/ripple/app/main/DBInit.h b/src/ripple/app/main/DBInit.h index 865c3a2185..b632d168bf 100644 --- a/src/ripple/app/main/DBInit.h +++ b/src/ripple/app/main/DBInit.h @@ -20,20 +20,145 @@ #ifndef RIPPLE_APP_DATA_DBINIT_H_INCLUDED #define RIPPLE_APP_DATA_DBINIT_H_INCLUDED +#include + namespace ripple { -// VFALCO TODO Tidy these up into a class with functions and return types. -extern const char* TxnDBName; -extern const char* TxnDBInit[]; -extern int TxnDBCount; +//////////////////////////////////////////////////////////////////////////////// -extern const char* LedgerDBName; -extern const char* LedgerDBInit[]; -extern int LedgerDBCount; +// Ledger database holds ledgers and ledger confirmations +static constexpr auto LgrDBName {"ledger.db"}; -extern const char* WalletDBName; -extern const char* WalletDBInit[]; -extern int WalletDBCount; +static constexpr +std::array LgrDBPragma {{ + "PRAGMA synchronous=NORMAL;", + "PRAGMA journal_mode=WAL;", + "PRAGMA journal_size_limit=1582080;" +}}; + +static constexpr +std::array LgrDBInit {{ + "BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS Ledgers ( \ + LedgerHash CHARACTER(64) PRIMARY KEY, \ + LedgerSeq BIGINT UNSIGNED, \ + PrevHash CHARACTER(64), \ + TotalCoins BIGINT UNSIGNED, \ + ClosingTime BIGINT UNSIGNED, \ + PrevClosingTime BIGINT UNSIGNED, \ + CloseTimeRes BIGINT UNSIGNED, \ + CloseFlags BIGINT UNSIGNED, \ + AccountSetHash CHARACTER(64), \ + TransSetHash CHARACTER(64) \ + );", + "CREATE INDEX IF NOT EXISTS SeqLedger ON Ledgers(LedgerSeq);", + + // Old table and indexes no longer needed + "DROP TABLE IF EXISTS Validations;", + + "END TRANSACTION;" +}}; + +//////////////////////////////////////////////////////////////////////////////// + +// Transaction database holds transactions and public keys +static constexpr auto TxDBName {"transaction.db"}; + +static constexpr +#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP) + std::array TxDBPragma {{ +#else + std::array TxDBPragma {{ +#endif + "PRAGMA page_size=4096;", + "PRAGMA synchronous=NORMAL;", + "PRAGMA journal_mode=WAL;", + "PRAGMA journal_size_limit=1582080;", + "PRAGMA max_page_count=2147483646;", +#if (ULONG_MAX > UINT_MAX) && !defined (NO_SQLITE_MMAP) + "PRAGMA mmap_size=17179869184;" +#endif +}}; + +static constexpr +std::array TxDBInit {{ + "BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS Transactions ( \ + TransID CHARACTER(64) PRIMARY KEY, \ + TransType CHARACTER(24), \ + FromAcct CHARACTER(35), \ + FromSeq BIGINT UNSIGNED, \ + LedgerSeq BIGINT UNSIGNED, \ + Status CHARACTER(1), \ + RawTxn BLOB, \ + TxnMeta BLOB \ + );", + "CREATE INDEX IF NOT EXISTS TxLgrIndex ON \ + Transactions(LedgerSeq);", + + "CREATE TABLE IF NOT EXISTS AccountTransactions ( \ + TransID CHARACTER(64), \ + Account CHARACTER(64), \ + LedgerSeq BIGINT UNSIGNED, \ + TxnSeq INTEGER \ + );", + "CREATE INDEX IF NOT EXISTS AcctTxIDIndex ON \ + AccountTransactions(TransID);", + "CREATE INDEX IF NOT EXISTS AcctTxIndex ON \ + AccountTransactions(Account, LedgerSeq, TxnSeq, TransID);", + "CREATE INDEX IF NOT EXISTS AcctLgrIndex ON \ + AccountTransactions(LedgerSeq, Account, TransID);", + + "END TRANSACTION;" +}}; + +//////////////////////////////////////////////////////////////////////////////// + +// Pragma for Ledger and Transaction databases with complete shards +static constexpr +std::array CompleteShardDBPragma {{ + "PRAGMA synchronous=OFF;", + "PRAGMA journal_mode=OFF;" +}}; + +//////////////////////////////////////////////////////////////////////////////// + +static constexpr auto WalletDBName {"wallet.db"}; + +static constexpr +std::array WalletDBInit {{ + "BEGIN TRANSACTION;", + + // A node's identity must be persisted, including + // for clustering purposes. This table holds one + // entry: the server's unique identity, but the + // value can be overriden by specifying a node + // identity in the config file using a [node_seed] + // entry. + "CREATE TABLE IF NOT EXISTS NodeIdentity ( \ + PublicKey CHARACTER(53), \ + PrivateKey CHARACTER(52) \ + );", + + // Peer reservations + "CREATE TABLE IF NOT EXISTS PeerReservations ( \ + PublicKey CHARACTER(53) UNIQUE NOT NULL, \ + Description CHARACTER(64) NOT NULL \ + );", + + // Validator Manifests + "CREATE TABLE IF NOT EXISTS ValidatorManifests ( \ + RawData BLOB NOT NULL \ + );", + + "CREATE TABLE IF NOT EXISTS PublisherManifests ( \ + RawData BLOB NOT NULL \ + );", + + "END TRANSACTION;" +}}; } // ripple diff --git a/src/ripple/app/main/Main.cpp b/src/ripple/app/main/Main.cpp index 03d9bc61df..06689b3848 100644 --- a/src/ripple/app/main/Main.cpp +++ b/src/ripple/app/main/Main.cpp @@ -489,9 +489,12 @@ int run (int argc, char** argv) std::cerr << "vacuum not applicable in standalone mode.\n"; return -1; } - boost::filesystem::path dbPath = dbSetup.dataDir / TxnDBName; - auto txnDB = std::make_unique (dbSetup, TxnDBName, - TxnDBInit, TxnDBCount); + boost::filesystem::path dbPath = dbSetup.dataDir / TxDBName; + auto txnDB = std::make_unique( + dbSetup, + TxDBName, + TxDBPragma, + TxDBInit); if (txnDB.get() == nullptr) { std::cerr << "Cannot create connection to " << dbPath.string() << @@ -711,7 +714,7 @@ int run (int argc, char** argv) // With our configuration parsed, ensure we have // enough file descriptors available: if (!adjustDescriptorLimit( - app->fdlimit(), + app->fdRequired(), app->logs().journal("Application"))) { StopSustain(); diff --git a/src/ripple/app/misc/SHAMapStore.h b/src/ripple/app/misc/SHAMapStore.h index 8a647a97a3..b41e9c4ca1 100644 --- a/src/ripple/app/misc/SHAMapStore.h +++ b/src/ripple/app/misc/SHAMapStore.h @@ -62,8 +62,8 @@ public: /** Highest ledger that may be deleted. */ virtual LedgerIndex getCanDelete() = 0; - /** The number of files that are needed. */ - virtual int fdlimit() const = 0; + /** Returns the number of file descriptors that are needed. */ + virtual int fdRequired() const = 0; }; //------------------------------------------------------------------------------ diff --git a/src/ripple/app/misc/SHAMapStoreImp.cpp b/src/ripple/app/misc/SHAMapStoreImp.cpp index c11a0221d3..1cc7eb9a38 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.cpp +++ b/src/ripple/app/misc/SHAMapStoreImp.cpp @@ -254,7 +254,7 @@ SHAMapStoreImp::makeNodeStore(std::string const& name, std::int32_t readThreads) std::move(archiveBackend), app_.config().section(ConfigSection::nodeDatabase()), app_.logs().journal(nodeStoreName_)); - fdlimit_ += dbr->fdlimit(); + fdRequired_ += dbr->fdRequired(); dbRotating_ = dbr.get(); db.reset(dynamic_cast(dbr.release())); } @@ -267,7 +267,7 @@ SHAMapStoreImp::makeNodeStore(std::string const& name, std::int32_t readThreads) app_.getJobQueue(), app_.config().section(ConfigSection::nodeDatabase()), app_.logs().journal(nodeStoreName_)); - fdlimit_ += db->fdlimit(); + fdRequired_ += db->fdRequired(); } return db; } @@ -298,9 +298,9 @@ SHAMapStoreImp::rendezvous() const } int -SHAMapStoreImp::fdlimit () const +SHAMapStoreImp::fdRequired() const { - return fdlimit_; + return fdRequired_; } bool diff --git a/src/ripple/app/misc/SHAMapStoreImp.h b/src/ripple/app/misc/SHAMapStoreImp.h index cbf876e33d..59b8f6d2ad 100644 --- a/src/ripple/app/misc/SHAMapStoreImp.h +++ b/src/ripple/app/misc/SHAMapStoreImp.h @@ -24,6 +24,7 @@ #include #include #include + #include #include @@ -97,7 +98,7 @@ private: std::shared_ptr newLedger_; std::atomic working_; std::atomic canDelete_; - int fdlimit_ = 0; + int fdRequired_ = 0; std::uint32_t deleteInterval_ = 0; bool advisoryDelete_ = false; @@ -172,7 +173,7 @@ public: void onLedgerClosed (std::shared_ptr const& ledger) override; void rendezvous() const override; - int fdlimit() const override; + int fdRequired() const override; private: // callback for visitNodes diff --git a/src/ripple/core/Config.h b/src/ripple/core/Config.h index daf71dbcff..cfe27a7d67 100644 --- a/src/ripple/core/Config.h +++ b/src/ripple/core/Config.h @@ -44,7 +44,7 @@ class Rules; enum SizedItemName { - siSweepInterval, + siSweepInterval = 0, siNodeCacheSize, siNodeCacheAge, siTreeCacheSize, @@ -56,14 +56,32 @@ enum SizedItemName siLedgerFetch, siHashNodeDBCache, siTxnDBCache, - siLgrDBCache, + siLgrDBCache }; -struct SizedItem -{ - SizedItemName item; - int sizes[5]; -}; +static constexpr +std::array, 13> sizedItems +{{ + // tiny small medium large huge + {{ 10, 30, 60, 90, 120 }}, // siSweepInterval + {{ 2, 3, 5, 5, 8 }}, // siLedgerFetch + + {{ 16384, 32768, 131072, 262144, 524288 }}, // siNodeCacheSize + {{ 60, 90, 120, 900, 1800 }}, // siNodeCacheAge + + {{ 128000, 256000, 512000, 768000, 2048000 }}, // siTreeCacheSize + {{ 30, 60, 90, 120, 900 }}, // siTreeCacheAge + + {{ 4096, 8192, 16384, 65536, 131072 }}, // siSLECacheSize + {{ 30, 60, 90, 120, 300 }}, // siSLECacheAge + + {{ 32, 128, 256, 384, 768 }}, // siLedgerSize + {{ 30, 90, 180, 240, 900 }}, // siLedgerAge + + {{ 4, 12, 24, 64, 128 }}, // siHashNodeDBCache + {{ 4, 12, 24, 64, 128 }}, // siTxnDBCache + {{ 4, 8, 16, 32, 128 }} // siLgrDBCache +}}; // This entire derived class is deprecated. // For new config information use the style implied @@ -182,11 +200,23 @@ public: std::unordered_set> features; public: - Config() - : j_ {beast::Journal::getNullSink()} - { } + Config() : j_ {beast::Journal::getNullSink()} {} + + static + int + getSize(SizedItemName item, std::uint32_t nodeSize) + { + assert(item < sizedItems.size() && nodeSize < sizedItems[item].size()); + return sizedItems[item][nodeSize]; + } + + int + getSize(SizedItemName item) const + { + assert(item < sizedItems.size()); + return getSize(item, NODE_SIZE); + } - int getSize (SizedItemName) const; /* Be very careful to make sure these bool params are in the right order. */ void setup (std::string const& strConf, bool bQuiet, diff --git a/src/ripple/core/DatabaseCon.h b/src/ripple/core/DatabaseCon.h index 7753879e24..62d3413def 100644 --- a/src/ripple/core/DatabaseCon.h +++ b/src/ripple/core/DatabaseCon.h @@ -20,13 +20,13 @@ #ifndef RIPPLE_APP_DATA_DATABASECON_H_INCLUDED #define RIPPLE_APP_DATA_DATABASECON_H_INCLUDED +#include #include #include #include #include #include - namespace soci { class session; } @@ -86,10 +86,43 @@ public: boost::filesystem::path dataDir; }; - DatabaseCon (Setup const& setup, - std::string const& name, - const char* initString[], - int countInit); + template + DatabaseCon( + Setup const& setup, + std::string const& DBName, + std::array const& pragma, + std::array const& initSQL) + { + // Use temporary files or regular DB files? + auto const useTempFiles = + setup.standAlone && + setup.startUp != Config::LOAD && + setup.startUp != Config::LOAD_FILE && + setup.startUp != Config::REPLAY; + boost::filesystem::path pPath = + useTempFiles ? "" : (setup.dataDir / DBName); + + open(session_, "sqlite", pPath.string()); + + try + { + for (auto const& p : pragma) + { + soci::statement st = session_.prepare << p; + st.execute(true); + } + for (auto const& sql : initSQL) + { + soci::statement st = session_.prepare << sql; + st.execute(true); + } + } + catch (soci::soci_error&) + { + // TODO: We should at least log this error. It is annoying to wire + // a logger into every context, but there are other solutions. + } + } soci::session& getSession() { diff --git a/src/ripple/core/impl/Config.cpp b/src/ripple/core/impl/Config.cpp index adb3039d5b..987f2e7e09 100644 --- a/src/ripple/core/impl/Config.cpp +++ b/src/ripple/core/impl/Config.cpp @@ -559,42 +559,6 @@ void Config::loadFromString (std::string const& fileContents) } } -int Config::getSize (SizedItemName item) const -{ - SizedItem sizeTable[] = // tiny small medium large huge - { - - { siSweepInterval, { 10, 30, 60, 90, 120 } }, - - { siLedgerFetch, { 2, 3, 5, 5, 8 } }, - - { siNodeCacheSize, { 16384, 32768, 131072, 262144, 524288 } }, - { siNodeCacheAge, { 60, 90, 120, 900, 1800 } }, - - { siTreeCacheSize, { 128000, 256000, 512000, 768000, 2048000 } }, - { siTreeCacheAge, { 30, 60, 90, 120, 900 } }, - - { siSLECacheSize, { 4096, 8192, 16384, 65536, 131072 } }, - { siSLECacheAge, { 30, 60, 90, 120, 300 } }, - - { siLedgerSize, { 32, 128, 256, 384, 768 } }, - { siLedgerAge, { 30, 90, 180, 240, 900 } }, - - { siHashNodeDBCache, { 4, 12, 24, 64, 128 } }, - { siTxnDBCache, { 4, 12, 24, 64, 128 } }, - { siLgrDBCache, { 4, 8, 16, 32, 128 } }, - }; - - for (int i = 0; i < (sizeof (sizeTable) / sizeof (SizedItem)); ++i) - { - if (sizeTable[i].item == item) - return sizeTable[i].sizes[NODE_SIZE]; - } - - assert (false); - return -1; -} - boost::filesystem::path Config::getDebugLogFile () const { auto log_file = DEBUG_LOGFILE; diff --git a/src/ripple/core/impl/DatabaseCon.cpp b/src/ripple/core/impl/DatabaseCon.cpp index 91427c93e6..54c8a4f835 100644 --- a/src/ripple/core/impl/DatabaseCon.cpp +++ b/src/ripple/core/impl/DatabaseCon.cpp @@ -25,38 +25,6 @@ namespace ripple { -DatabaseCon::DatabaseCon ( - Setup const& setup, - std::string const& strName, - const char* initStrings[], - int initCount) -{ - auto const useTempFiles // Use temporary files or regular DB files? - = setup.standAlone && - setup.startUp != Config::LOAD && - setup.startUp != Config::LOAD_FILE && - setup.startUp != Config::REPLAY; - boost::filesystem::path pPath = useTempFiles - ? "" : (setup.dataDir / strName); - - open (session_, "sqlite", pPath.string()); - - for (int i = 0; i < initCount; ++i) - { - try - { - soci::statement st = session_.prepare << - initStrings[i]; - st.execute(true); - } - catch (soci::soci_error&) - { - // TODO: We should at least log this error. It is annoying to wire - // a logger into every context, but there are other solutions. - } - } -} - DatabaseCon::Setup setup_DatabaseCon (Config const& c) { DatabaseCon::Setup setup; diff --git a/src/ripple/nodestore/Backend.h b/src/ripple/nodestore/Backend.h index 43097292cb..5b2292e9fa 100644 --- a/src/ripple/nodestore/Backend.h +++ b/src/ripple/nodestore/Backend.h @@ -112,14 +112,14 @@ public: /** Perform consistency checks on database. */ virtual void verify() = 0; - /** Returns the number of file handles the backend expects to need. */ - virtual int fdlimit() const = 0; + /** Returns the number of file descriptors the backend expects to need. */ + virtual int fdRequired() const = 0; /** Returns true if the backend uses permanent storage. */ bool backed() const { - return fdlimit(); + return fdRequired(); } }; diff --git a/src/ripple/nodestore/Database.h b/src/ripple/nodestore/Database.h index e4394a79b2..7b59a766f8 100644 --- a/src/ripple/nodestore/Database.h +++ b/src/ripple/nodestore/Database.h @@ -204,9 +204,9 @@ public: std::uint32_t getFetchSize() const { return fetchSz_; } - /** Return the number of files needed by our backend(s) */ + /** Returns the number of file descriptors the database expects to need */ int - fdlimit() const { return fdLimit_; } + fdRequired() const { return fdRequired_; } void onStop() override; @@ -222,7 +222,7 @@ public: protected: beast::Journal j_; Scheduler& scheduler_; - int fdLimit_ {0}; + int fdRequired_ {0}; void stopThreads(); diff --git a/src/ripple/nodestore/backend/MemoryFactory.cpp b/src/ripple/nodestore/backend/MemoryFactory.cpp index 0b255b78be..3e1d0d0a31 100644 --- a/src/ripple/nodestore/backend/MemoryFactory.cpp +++ b/src/ripple/nodestore/backend/MemoryFactory.cpp @@ -191,7 +191,7 @@ public: } int - fdlimit() const override + fdRequired() const override { return 0; } diff --git a/src/ripple/nodestore/backend/NuDBFactory.cpp b/src/ripple/nodestore/backend/NuDBFactory.cpp index 4a055fab82..3bc3228268 100644 --- a/src/ripple/nodestore/backend/NuDBFactory.cpp +++ b/src/ripple/nodestore/backend/NuDBFactory.cpp @@ -297,7 +297,7 @@ public: } int - fdlimit() const override + fdRequired() const override { return 3; } diff --git a/src/ripple/nodestore/backend/NullFactory.cpp b/src/ripple/nodestore/backend/NullFactory.cpp index c35b08b011..4db5274eb1 100644 --- a/src/ripple/nodestore/backend/NullFactory.cpp +++ b/src/ripple/nodestore/backend/NullFactory.cpp @@ -98,9 +98,9 @@ public: { } - /** Returns the number of file handles the backend expects to need */ + /** Returns the number of file descriptors the backend expects to need */ int - fdlimit() const override + fdRequired() const override { return 0; } diff --git a/src/ripple/nodestore/backend/RocksDBFactory.cpp b/src/ripple/nodestore/backend/RocksDBFactory.cpp index fb2a6bd99c..ff6710ca8c 100644 --- a/src/ripple/nodestore/backend/RocksDBFactory.cpp +++ b/src/ripple/nodestore/backend/RocksDBFactory.cpp @@ -99,7 +99,7 @@ public: BatchWriter m_batch; std::string m_name; std::unique_ptr m_db; - int fdlimit_ = 2048; + int fdRequired_ = 2048; rocksdb::Options m_options; RocksDBBackend (int keyBytes, Section const& keyValues, @@ -128,7 +128,7 @@ public: } if (get_if_exists (keyValues, "open_files", m_options.max_open_files)) - fdlimit_ = m_options.max_open_files; + fdRequired_ = m_options.max_open_files; if (keyValues.exists ("file_size_mb")) { @@ -405,11 +405,11 @@ public: { } - /** Returns the number of file handles the backend expects to need */ + /** Returns the number of file descriptors the backend expects to need */ int - fdlimit() const override + fdRequired() const override { - return fdlimit_; + return fdRequired_; } }; diff --git a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp index 4e83d8f489..76b2b4ec59 100644 --- a/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseRotatingImp.cpp @@ -42,9 +42,9 @@ DatabaseRotatingImp::DatabaseRotatingImp( , archiveBackend_(std::move(archiveBackend)) { if (writableBackend_) - fdLimit_ += writableBackend_->fdlimit(); + fdRequired_ += writableBackend_->fdRequired(); if (archiveBackend_) - fdLimit_ += archiveBackend_->fdlimit(); + fdRequired_ += archiveBackend_->fdRequired(); setParent(parent); } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index 49ab5ddf03..b0e70a8e9d 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -49,10 +49,11 @@ DatabaseShardImp::DatabaseShardImp( j) , app_(app) , earliestShardIndex_(seqToShardIndex(earliestSeq())) - , avgShardSz_(ledgersPerShard_ * kilobytes(192)) + , avgShardFileSz_(ledgersPerShard_ * kilobytes(192)) { } + DatabaseShardImp::~DatabaseShardImp() { // Stop threads before data members are destroyed @@ -74,20 +75,16 @@ DatabaseShardImp::init() using namespace boost::beast::detail; std::lock_guard lock(m_); - if (init_) - { - JLOG(j_.error()) << - "Already initialized"; - return false; - } - - auto fail = [&](std::string const& msg) + auto fail = [this](std::string const& msg) { JLOG(j_.error()) << "[" << ConfigSection::shardDatabase() << "] " << msg; return false; }; + if (init_) + return fail("already initialized"); + Config const& config {app_.config()}; Section const& section {config.section(ConfigSection::shardDatabase())}; if (section.empty()) @@ -123,23 +120,19 @@ DatabaseShardImp::init() boost::filesystem::create_directories(dir_); { - std::uint64_t i; - if (!get_if_exists(section, "max_size_gb", i)) + std::uint64_t sz; + if (!get_if_exists(section, "max_size_gb", sz)) return fail("'max_size_gb' missing"); - // Minimum disk space required (in gigabytes) - static constexpr auto minDiskSpace = 10; - if (i < minDiskSpace) - { - return fail("'max_size_gb' must be at least " + - std::to_string(minDiskSpace)); - } - - if ((i << 30) < i) + if ((sz << 30) < sz) return fail("'max_size_gb' overflow"); + // Minimum storage space required (in gigabytes) + if (sz < 10) + return fail("'max_size_gb' must be at least 10"); + // Convert to bytes - maxDiskSpace_ = i << 30; + maxFileSz_ = sz << 30; } if (section.exists("ledgers_per_shard")) @@ -163,7 +156,7 @@ DatabaseShardImp::init() return fail("'type' value unsupported"); } - // Find backend file handle requirement + // Check if backend uses permanent storage if (auto factory = Manager::instance().find(backendName_)) { auto backend {factory->createInstance( @@ -171,13 +164,13 @@ DatabaseShardImp::init() backed_ = backend->backed(); if (!backed_) { + setFileStats(lock); init_ = true; return true; } - fdLimit_ = backend->fdlimit(); } else - return fail("'type' value unsupported"); + return fail(backendName_ + " backend unsupported"); try { @@ -205,10 +198,9 @@ DatabaseShardImp::init() auto const shardIndex {std::stoul(dirName)}; if (shardIndex < earliestShardIndex()) { - JLOG(j_.fatal()) << - "Invalid shard index " << shardIndex << - ". Earliest shard index " << earliestShardIndex(); - return false; + return fail("shard " + std::to_string(shardIndex) + + " comes before earliest shard index " + + std::to_string(earliestShardIndex())); } // Check if a previous import failed @@ -222,46 +214,28 @@ DatabaseShardImp::init() continue; } - auto shard {std::make_unique( - *this, shardIndex, cacheSz_, cacheAge_, j_)}; - if (!shard->open(section, scheduler_, *ctx_)) + auto shard {std::make_unique(app_, *this, shardIndex, j_)}; + if (!shard->open(scheduler_, *ctx_)) return false; - usedDiskSpace_ += shard->fileSize(); if (shard->complete()) complete_.emplace(shard->index(), std::move(shard)); else { if (incomplete_) - { - JLOG(j_.fatal()) << - "More than one control file found"; - return false; - } + return fail("more than one control file found"); incomplete_ = std::move(shard); } } } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); - return false; + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } - if (!incomplete_ && complete_.empty()) - { - // New Shard Store, calculate file descriptor requirements - if (maxDiskSpace_ > available()) - { - JLOG(j_.error()) << - "Insufficient disk space"; - } - fdLimit_ = 1 + (fdLimit_ * - std::max(1, maxDiskSpace_ / avgShardSz_)); - } - else - updateStats(lock); + setFileStats(lock); + updateStatus(lock); init_ = true; return true; } @@ -271,24 +245,23 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) { std::lock_guard lock(m_); assert(init_); + if (incomplete_) return incomplete_->prepare(); if (!canAdd_) return boost::none; if (backed_) { - // Check available disk space - if (usedDiskSpace_ + avgShardSz_ > maxDiskSpace_) + // Check available storage space + if (fileSz_ + avgShardFileSz_ > maxFileSz_) { - JLOG(j_.debug()) << - "Maximum size reached"; + JLOG(j_.debug()) << "maximum storage size reached"; canAdd_ = false; return boost::none; } - if (avgShardSz_ > available()) + if (avgShardFileSz_ > available()) { - JLOG(j_.error()) << - "Insufficient disk space"; + JLOG(j_.error()) << "insufficient storage space available"; canAdd_ = false; return boost::none; } @@ -297,21 +270,15 @@ DatabaseShardImp::prepareLedger(std::uint32_t validLedgerSeq) auto const shardIndex {findShardIndexToAdd(validLedgerSeq, lock)}; if (!shardIndex) { - JLOG(j_.debug()) << - "No new shards to add"; + JLOG(j_.debug()) << "no new shards to add"; canAdd_ = false; return boost::none; } // With every new shard, clear family caches app_.shardFamily()->reset(); - int const sz {std::max(shardCacheSz, cacheSz_ / std::max( - 1, static_cast(complete_.size() + 1)))}; - incomplete_ = std::make_unique( - *this, *shardIndex, sz, cacheAge_, j_); - if (!incomplete_->open( - app_.config().section(ConfigSection::shardDatabase()), - scheduler_, - *ctx_)) + + incomplete_ = std::make_unique(app_, *this, *shardIndex, j_); + if (!incomplete_->open(scheduler_, *ctx_)) { incomplete_.reset(); return boost::none; @@ -325,18 +292,21 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) { std::lock_guard lock(m_); assert(init_); - if (!canAdd_) + + auto fail = [this, shardIndex](std::string const& msg) { JLOG(j_.error()) << - "Unable to add more shards to the database"; + "shard " << shardIndex << " " << msg; return false; - } + }; + + if (!canAdd_) + return fail("cannot be stored at this time"); if (shardIndex < earliestShardIndex()) { - JLOG(j_.error()) << - "Invalid shard index " << shardIndex; - return false; + return fail("comes before earliest shard index " + + std::to_string(earliestShardIndex())); } // If we are synced to the network, check if the shard index @@ -345,11 +315,7 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) { // seq will be greater than zero if valid if (seq > earliestSeq() && shardIndex >= seqToShardIndex(seq)) - { - JLOG(j_.error()) << - "Invalid shard index " << shardIndex; - return false; - } + return fail("has an invalid index"); return true; }; if (!seqCheck(app_.getLedgerMaster().getValidLedgerIndex()) || @@ -360,23 +326,18 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) if (complete_.find(shardIndex) != complete_.end()) { - JLOG(j_.debug()) << - "Shard index " << shardIndex << - " stored"; + JLOG(j_.debug()) << "shard " << shardIndex << " is already stored"; return false; } if (incomplete_ && incomplete_->index() == shardIndex) { - JLOG(j_.debug()) << - "Shard index " << shardIndex << - " is being acquired"; + JLOG(j_.debug()) << "shard " << shardIndex << " is being acquired"; return false; } if (preShards_.find(shardIndex) != preShards_.end()) { JLOG(j_.debug()) << - "Shard index " << shardIndex << - " is prepared for import"; + "shard " << shardIndex << " is already prepared for import"; return false; } @@ -384,19 +345,15 @@ DatabaseShardImp::prepareShard(std::uint32_t shardIndex) if (backed_) { std::uint64_t const sz { - (preShards_.size() + 1 + (incomplete_ ? 1 : 0)) * avgShardSz_}; - if (usedDiskSpace_ + sz > maxDiskSpace_) + (preShards_.size() + 1 + (incomplete_ ? 1 : 0)) * avgShardFileSz_}; + if (fileSz_ + sz > maxFileSz_) { JLOG(j_.debug()) << - "Exceeds maximum size"; + "shard " << shardIndex << " exceeds the maximum storage size"; return false; } if (sz > available()) - { - JLOG(j_.error()) << - "Insufficient disk space"; - return false; - } + return fail("insufficient storage space available"); } // Add to shards prepared @@ -409,6 +366,7 @@ DatabaseShardImp::removePreShard(std::uint32_t shardIndex) { std::lock_guard lock(m_); assert(init_); + preShards_.erase(shardIndex); } @@ -419,6 +377,7 @@ DatabaseShardImp::getPreShards() { std::lock_guard lock(m_); assert(init_); + if (preShards_.empty()) return {}; for (auto const& ps : preShards_) @@ -436,15 +395,14 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, { if (!is_directory(srcDir) || is_empty(srcDir)) { - JLOG(j_.error()) << - "Invalid source directory " << srcDir.string(); + JLOG(j_.error()) << "invalid source directory " << srcDir.string(); return false; } } catch (std::exception const& e) { JLOG(j_.error()) << - "exception: " << e.what(); + "exception " << e.what() << " in function " << __func__; return false; } @@ -457,86 +415,84 @@ DatabaseShardImp::importShard(std::uint32_t shardIndex, catch (std::exception const& e) { JLOG(j_.error()) << - "exception: " << e.what(); + "exception " << e.what() << " in function " << __func__; return false; } return true; }; - std::unique_lock lock(m_); - assert(init_); - - // Check shard is prepared - auto it {preShards_.find(shardIndex)}; - if(it == preShards_.end()) { - JLOG(j_.error()) << - "Invalid shard index " << shardIndex; - return false; - } + std::unique_lock lock(m_); + assert(init_); - // Move source directory to the shard database directory - auto const dstDir {dir_ / std::to_string(shardIndex)}; - if (!move(srcDir, dstDir)) - return false; - - // Create the new shard - auto shard {std::make_unique( - *this, shardIndex, cacheSz_, cacheAge_, j_)}; - auto fail = [&](std::string const& msg) - { - if (!msg.empty()) + // Check shard is prepared + auto it {preShards_.find(shardIndex)}; + if(it == preShards_.end()) { - JLOG(j_.error()) << - "Import shard " << shardIndex << ": " << msg; + JLOG(j_.error()) << "shard " << shardIndex << " is an invalid index"; + return false; } - shard.reset(); - move(dstDir, srcDir); - return false; - }; - if (!shard->open( - app_.config().section(ConfigSection::shardDatabase()), - scheduler_, - *ctx_)) - { - return fail({}); - } - if (!shard->complete()) - return fail("incomplete shard"); + // Move source directory to the shard database directory + auto const dstDir {dir_ / std::to_string(shardIndex)}; + if (!move(srcDir, dstDir)) + return false; - try - { - // Verify database integrity - shard->getBackend()->verify(); - } - catch (std::exception const& e) - { - return fail(e.what()); - } - - // Validate shard ledgers - if (validate) - { - // Shard validation requires releasing the lock - // so the database can fetch data from it - it->second = shard.get(); - lock.unlock(); - auto const valid {shard->validate(app_)}; - lock.lock(); - if (!valid) + // Create the new shard + auto shard {std::make_unique(app_, *this, shardIndex, j_)}; + auto fail = [&](std::string const& msg) { - it = preShards_.find(shardIndex); - if(it != preShards_.end()) - it->second = nullptr; - return fail("failed validation"); + if (!msg.empty()) + { + JLOG(j_.error()) << "shard " << shardIndex << " " << msg; + } + shard.reset(); + move(dstDir, srcDir); + return false; + }; + + if (!shard->open(scheduler_, *ctx_)) + return fail({}); + if (!shard->complete()) + return fail("is incomplete"); + + try + { + // Verify database integrity + shard->getBackend()->verify(); } + catch (std::exception const& e) + { + return fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + + // Validate shard ledgers + if (validate) + { + // Shard validation requires releasing the lock + // so the database can fetch data from it + it->second = shard.get(); + lock.unlock(); + auto const valid {shard->validate()}; + lock.lock(); + if (!valid) + { + it = preShards_.find(shardIndex); + if(it != preShards_.end()) + it->second = nullptr; + return fail("failed validation"); + } + } + + // Add the shard + complete_.emplace(shardIndex, std::move(shard)); + preShards_.erase(shardIndex); } - // Add the shard - usedDiskSpace_ += shard->fileSize(); - complete_.emplace(shardIndex, std::move(shard)); - preShards_.erase(shardIndex); + std::lock_guard lock(m_); + setFileStats(lock); + updateStatus(lock); return true; } @@ -545,42 +501,48 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) { if (!contains(seq)) return {}; + auto nObj = fetch(hash, seq); if (!nObj) return {}; - auto ledger = std::make_shared( - InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), - app_.config(), *app_.shardFamily()); - if (ledger->info().hash != hash || ledger->info().seq != seq) + auto fail = [this, seq](std::string const& msg)-> std::shared_ptr { - JLOG(j_.error()) << - "shard " << seqToShardIndex(seq) << - " ledger seq " << seq << - " hash " << hash << - " has corrupt data"; + JLOG(j_.error()) << "shard " << seqToShardIndex(seq) << " " << msg; return {}; + }; + + auto ledger {std::make_shared( + InboundLedger::deserializeHeader(makeSlice(nObj->getData()), true), + app_.config(), + *app_.shardFamily())}; + + if (ledger->info().seq != seq) + { + return fail("encountered invalid ledger sequence " + + std::to_string(seq)); } + if (ledger->info().hash != hash) + { + return fail("encountered invalid ledger hash " + + to_string(hash) + " on sequence " + std::to_string(seq)); + } + ledger->setFull(); if (!ledger->stateMap().fetchRoot( SHAMapHash {ledger->info().accountHash}, nullptr)) { - JLOG(j_.error()) << - "shard " << seqToShardIndex(seq) << - " ledger seq " << seq << - " missing Account State root"; - return {}; + return fail("is missing root STATE node on hash " + + to_string(hash) + " on sequence " + std::to_string(seq)); } + if (ledger->info().txHash.isNonZero()) { if (!ledger->txMap().fetchRoot( SHAMapHash {ledger->info().txHash}, nullptr)) { - JLOG(j_.error()) << - "shard " << seqToShardIndex(seq) << - " ledger seq " << seq << - " missing TX root"; - return {}; + return fail("is missing root TXN node on hash " + + to_string(hash) + " on sequence " + std::to_string(seq)); } } return ledger; @@ -589,39 +551,38 @@ DatabaseShardImp::fetchLedger(uint256 const& hash, std::uint32_t seq) void DatabaseShardImp::setStored(std::shared_ptr const& ledger) { - if (ledger->info().hash.isZero() || - ledger->info().accountHash.isZero()) - { - assert(false); - JLOG(j_.error()) << - "Invalid ledger"; - return; - } auto const shardIndex {seqToShardIndex(ledger->info().seq)}; + auto fail = [this, shardIndex](std::string const& msg) + { + JLOG(j_.error()) << "shard " << shardIndex << " " << msg; + }; + + if (ledger->info().hash.isZero()) + { + return fail("encountered a zero ledger hash on sequence " + + std::to_string(ledger->info().seq)); + } + if (ledger->info().accountHash.isZero()) + { + return fail("encountered a zero account hash on sequence " + + std::to_string(ledger->info().seq)); + } + std::lock_guard lock(m_); assert(init_); + if (!incomplete_ || shardIndex != incomplete_->index()) { - JLOG(j_.warn()) << - "ledger seq " << ledger->info().seq << - " is not being acquired"; - return; + return fail("ledger sequence " + std::to_string(ledger->info().seq) + + " is not being acquired"); } - - auto const before {incomplete_->fileSize()}; if (!incomplete_->setStored(ledger)) return; - auto const after {incomplete_->fileSize()}; - if(after > before) - usedDiskSpace_ += (after - before); - else if(after < before) - usedDiskSpace_ -= std::min(before - after, usedDiskSpace_); - if (incomplete_->complete()) { complete_.emplace(incomplete_->index(), std::move(incomplete_)); incomplete_.reset(); - updateStats(lock); + updateStatus(lock); // Update peers with new shard index protocol::TMPeerShardInfo message; @@ -631,6 +592,8 @@ DatabaseShardImp::setStored(std::shared_ptr const& ledger) app_.overlay().foreach(send_always( std::make_shared(message, protocol::mtPEER_SHARD_INFO))); } + + setFileStats(lock); } bool @@ -639,6 +602,7 @@ DatabaseShardImp::contains(std::uint32_t seq) auto const shardIndex {seqToShardIndex(seq)}; std::lock_guard lock(m_); assert(init_); + if (complete_.find(shardIndex) != complete_.end()) return true; if (incomplete_ && incomplete_->index() == shardIndex) @@ -651,6 +615,7 @@ DatabaseShardImp::getCompleteShards() { std::lock_guard lock(m_); assert(init_); + return status_; } @@ -660,15 +625,15 @@ DatabaseShardImp::validate() { std::lock_guard lock(m_); assert(init_); + if (complete_.empty() && !incomplete_) { - JLOG(j_.error()) << - "No shards to validate"; + JLOG(j_.error()) << "no shards found to validate"; return; } std::string s {"Found shards "}; - for (auto& e : complete_) + for (auto const& e : complete_) s += std::to_string(e.second->index()) + ","; if (incomplete_) s += std::to_string(incomplete_->index()); @@ -680,12 +645,12 @@ DatabaseShardImp::validate() for (auto& e : complete_) { app_.shardFamily()->reset(); - e.second->validate(app_); + e.second->validate(); } if (incomplete_) { app_.shardFamily()->reset(); - incomplete_->validate(app_); + incomplete_->validate(); } app_.shardFamily()->reset(); } @@ -693,198 +658,184 @@ DatabaseShardImp::validate() void DatabaseShardImp::import(Database& source) { - std::unique_lock lock(m_); - assert(init_); - - // Only the application local node store can be imported - if (&source != &app_.getNodeStore()) { - assert(false); - JLOG(j_.error()) << - "Invalid source database"; - return; - } + std::lock_guard lock(m_); + assert(init_); - std::uint32_t earliestIndex; - std::uint32_t latestIndex; - { - auto loadLedger = [&](bool ascendSort = true) -> - boost::optional + // Only the application local node store can be imported + if (&source != &app_.getNodeStore()) { - auto const [ledger, seq, _] = loadLedgerHelper( - "WHERE LedgerSeq >= " + std::to_string(earliestSeq()) + - " order by LedgerSeq " + (ascendSort ? "asc" : "desc") + - " limit 1", app_, false); - (void)_; - if (!ledger || seq == 0) + assert(false); + JLOG(j_.error()) << "invalid source database"; + return; + } + + std::uint32_t earliestIndex; + std::uint32_t latestIndex; + { + auto loadLedger = [&](bool ascendSort = true) -> + boost::optional + { + std::shared_ptr ledger; + std::uint32_t seq; + std::tie(ledger, seq, std::ignore) = loadLedgerHelper( + "WHERE LedgerSeq >= " + std::to_string(earliestSeq()) + + " order by LedgerSeq " + (ascendSort ? "asc" : "desc") + + " limit 1", app_, false); + if (!ledger || seq == 0) + { + JLOG(j_.error()) << + "no suitable ledgers were found in" + " the SQLite database to import"; + return boost::none; + } + return seq; + }; + + // Find earliest ledger sequence stored + auto seq {loadLedger()}; + if (!seq) + return; + earliestIndex = seqToShardIndex(*seq); + + // Consider only complete shards + if (seq != firstLedgerSeq(earliestIndex)) + ++earliestIndex; + + // Find last ledger sequence stored + seq = loadLedger(false); + if (!seq) + return; + latestIndex = seqToShardIndex(*seq); + + // Consider only complete shards + if (seq != lastLedgerSeq(latestIndex)) + --latestIndex; + + if (latestIndex < earliestIndex) { JLOG(j_.error()) << - "No suitable ledgers were found in" << - " the sqlite database to import"; - return boost::none; + "no suitable ledgers were found in" + " the SQLite database to import"; + return; } - return seq; - }; - - // Find earliest ledger sequence stored - auto seq {loadLedger()}; - if (!seq) - return; - earliestIndex = seqToShardIndex(*seq); - - // Consider only complete shards - if (seq != firstLedgerSeq(earliestIndex)) - ++earliestIndex; - - // Find last ledger sequence stored - seq = loadLedger(false); - if (!seq) - return; - latestIndex = seqToShardIndex(*seq); - - // Consider only complete shards - if (seq != lastLedgerSeq(latestIndex)) - --latestIndex; - - if (latestIndex < earliestIndex) - { - JLOG(j_.error()) << - "No suitable ledgers were found in" << - " the sqlite database to import"; - return; - } - } - - // Import the shards - for (std::uint32_t shardIndex = earliestIndex; - shardIndex <= latestIndex; ++shardIndex) - { - if (usedDiskSpace_ + avgShardSz_ > maxDiskSpace_) - { - JLOG(j_.error()) << - "Maximum size reached"; - canAdd_ = false; - break; - } - if (avgShardSz_ > available()) - { - JLOG(j_.error()) << - "Insufficient disk space"; - canAdd_ = false; - break; } - // Skip if already stored - if (complete_.find(shardIndex) != complete_.end() || - (incomplete_ && incomplete_->index() == shardIndex)) + // Import the shards + for (std::uint32_t shardIndex = earliestIndex; + shardIndex <= latestIndex; ++shardIndex) { - JLOG(j_.debug()) << - "shard " << shardIndex << - " already exists"; - continue; - } - - // Verify sqlite ledgers are in the node store - { - auto const firstSeq {firstLedgerSeq(shardIndex)}; - auto const lastSeq {std::max(firstSeq, lastLedgerSeq(shardIndex))}; - auto const numLedgers {shardIndex == earliestShardIndex() - ? lastSeq - firstSeq + 1 : ledgersPerShard_}; - auto ledgerHashes{getHashesByIndex(firstSeq, lastSeq, app_)}; - if (ledgerHashes.size() != numLedgers) - continue; - - bool valid {true}; - for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256) + if (fileSz_ + avgShardFileSz_ > maxFileSz_) { - if (!source.fetch(ledgerHashes[n].first, n)) + JLOG(j_.error()) << "maximum storage size reached"; + canAdd_ = false; + break; + } + if (avgShardFileSz_ > available()) + { + JLOG(j_.error()) << "insufficient storage space available"; + canAdd_ = false; + break; + } + + // Skip if already stored + if (complete_.find(shardIndex) != complete_.end() || + (incomplete_ && incomplete_->index() == shardIndex)) + { + JLOG(j_.debug()) << "shard " << shardIndex << " already exists"; + continue; + } + + // Verify SQLite ledgers are in the node store + { + auto const firstSeq {firstLedgerSeq(shardIndex)}; + auto const lastSeq { + std::max(firstSeq, lastLedgerSeq(shardIndex))}; + auto const numLedgers {shardIndex == earliestShardIndex() + ? lastSeq - firstSeq + 1 : ledgersPerShard_}; + auto ledgerHashes{getHashesByIndex(firstSeq, lastSeq, app_)}; + if (ledgerHashes.size() != numLedgers) + continue; + + bool valid {true}; + for (std::uint32_t n = firstSeq; n <= lastSeq; n += 256) { - JLOG(j_.warn()) << - "SQL DB ledger sequence " << n << - " mismatches node store"; - valid = false; + if (!source.fetch(ledgerHashes[n].first, n)) + { + JLOG(j_.warn()) << + "SQLite ledger sequence " << n << + " mismatches node store"; + valid = false; + break; + } + } + if (!valid) + continue; + } + + // Create the new shard + app_.shardFamily()->reset(); + auto const shardDir {dir_ / std::to_string(shardIndex)}; + auto shard {std::make_unique(app_, *this, shardIndex, j_)}; + if (!shard->open(scheduler_, *ctx_)) + { + shard.reset(); + continue; + } + + // Create a marker file to signify an import in progress + auto const markerFile {shardDir / importMarker_}; + std::ofstream ofs {markerFile.string()}; + if (!ofs.is_open()) + { + JLOG(j_.error()) << + "shard " << shardIndex << + " is unable to create temp marker file"; + shard.reset(); + removeAll(shardDir, j_); + continue; + } + ofs.close(); + + // Copy the ledgers from node store + while (auto seq = shard->prepare()) + { + auto ledger = loadByIndex(*seq, app_, false); + if (!ledger || ledger->info().seq != seq || + !Database::copyLedger(*shard->getBackend(), *ledger, + nullptr, nullptr, shard->lastStored())) + break; + + if (!shard->setStored(ledger)) + break; + if (shard->complete()) + { + JLOG(j_.debug()) << + "shard " << shardIndex << " was successfully imported"; + removeAll(markerFile, j_); break; } } - if (!valid) - continue; - } - // Create the new shard - app_.shardFamily()->reset(); - auto const shardDir {dir_ / std::to_string(shardIndex)}; - auto shard = std::make_unique( - *this, shardIndex, shardCacheSz, cacheAge_, j_); - if (!shard->open( - app_.config().section(ConfigSection::shardDatabase()), - scheduler_, - *ctx_)) - { - shard.reset(); - continue; - } - - // Create a marker file to signify an import in progress - auto const markerFile {shardDir / importMarker_}; - std::ofstream ofs {markerFile.string()}; - if (!ofs.is_open()) - { - JLOG(j_.error()) << - "shard " << shardIndex << - " unable to create temp marker file"; - shard.reset(); - removeAll(shardDir, j_); - continue; - } - ofs.close(); - - // Copy the ledgers from node store - while (auto seq = shard->prepare()) - { - auto ledger = loadByIndex(*seq, app_, false); - if (!ledger || ledger->info().seq != seq || - !Database::copyLedger(*shard->getBackend(), *ledger, - nullptr, nullptr, shard->lastStored())) - break; - - auto const before {shard->fileSize()}; - if (!shard->setStored(ledger)) - break; - auto const after {shard->fileSize()}; - if (after > before) - usedDiskSpace_ += (after - before); - else if(after < before) - usedDiskSpace_ -= std::min(before - after, usedDiskSpace_); - - if (shard->complete()) + if (!shard->complete()) { - JLOG(j_.debug()) << - "shard " << shardIndex << - " successfully imported"; - removeAll(markerFile, j_); - break; + JLOG(j_.error()) << + "shard " << shardIndex << " failed to import"; + shard.reset(); + removeAll(shardDir, j_); } + else + setFileStats(lock); } - if (!shard->complete()) - { - JLOG(j_.error()) << - "shard " << shardIndex << - " failed to import"; - shard.reset(); - removeAll(shardDir, j_); - } + // Re initialize the shard store + init_ = false; + complete_.clear(); + incomplete_.reset(); } - // Re initialize the shard store - init_ = false; - complete_.clear(); - incomplete_.reset(); - usedDiskSpace_ = 0; - lock.unlock(); - if (!init()) - Throw("Failed to initialize"); + Throw("import: failed to initialize"); } std::int32_t @@ -894,8 +845,9 @@ DatabaseShardImp::getWriteLoad() const { std::lock_guard lock(m_); assert(init_); - for (auto const& c : complete_) - wl += c.second->getBackend()->getWriteLoad(); + + for (auto const& e : complete_) + wl += e.second->getBackend()->getWriteLoad(); if (incomplete_) wl += incomplete_->getBackend()->getWriteLoad(); } @@ -914,10 +866,12 @@ DatabaseShardImp::store(NodeObjectType type, { std::lock_guard lock(m_); assert(init_); + if (!incomplete_ || shardIndex != incomplete_->index()) { JLOG(j_.warn()) << - "ledger seq " << seq << + "shard " << shardIndex << + " ledger sequence " << seq << " is not being acquired"; return; } @@ -962,10 +916,12 @@ DatabaseShardImp::copyLedger(std::shared_ptr const& ledger) auto const shardIndex {seqToShardIndex(ledger->info().seq)}; std::lock_guard lock(m_); assert(init_); + if (!incomplete_ || shardIndex != incomplete_->index()) { JLOG(j_.warn()) << - "source ledger seq " << ledger->info().seq << + "shard " << shardIndex << + " source ledger sequence " << ledger->info().seq << " is not being acquired"; return false; } @@ -977,21 +933,16 @@ DatabaseShardImp::copyLedger(std::shared_ptr const& ledger) return false; } - auto const before {incomplete_->fileSize()}; if (!incomplete_->setStored(ledger)) return false; - auto const after {incomplete_->fileSize()}; - if(after > before) - usedDiskSpace_ += (after - before); - else if(after < before) - usedDiskSpace_ -= std::min(before - after, usedDiskSpace_); - if (incomplete_->complete()) { complete_.emplace(incomplete_->index(), std::move(incomplete_)); incomplete_.reset(); - updateStats(lock); + updateStatus(lock); } + + setFileStats(lock); return true; } @@ -1002,6 +953,7 @@ DatabaseShardImp::getDesiredAsyncReadCount(std::uint32_t seq) { std::lock_guard lock(m_); assert(init_); + auto it = complete_.find(shardIndex); if (it != complete_.end()) return it->second->pCache()->getTargetSize() / asyncDivider; @@ -1018,9 +970,10 @@ DatabaseShardImp::getCacheHitRate() { std::lock_guard lock(m_); assert(init_); + sz = complete_.size(); - for (auto const& c : complete_) - f += c.second->pCache()->getHitRate(); + for (auto const& e : complete_) + f += e.second->pCache()->getHitRate(); if (incomplete_) { f += incomplete_->pCache()->getHitRate(); @@ -1030,50 +983,17 @@ DatabaseShardImp::getCacheHitRate() return f / std::max(1.0f, sz); } -void -DatabaseShardImp::tune(int size, std::chrono::seconds age) -{ - std::lock_guard lock(m_); - assert(init_); - cacheSz_ = size; - cacheAge_ = age; - int const sz {calcTargetCacheSz(lock)}; - for (auto const& c : complete_) - { - c.second->pCache()->setTargetSize(sz); - c.second->pCache()->setTargetAge(cacheAge_); - c.second->nCache()->setTargetSize(sz); - c.second->nCache()->setTargetAge(cacheAge_); - } - if (incomplete_) - { - incomplete_->pCache()->setTargetSize(sz); - incomplete_->pCache()->setTargetAge(cacheAge_); - incomplete_->nCache()->setTargetSize(sz); - incomplete_->nCache()->setTargetAge(cacheAge_); - } -} - void DatabaseShardImp::sweep() { std::lock_guard lock(m_); assert(init_); - int const sz {calcTargetCacheSz(lock)}; - for (auto const& c : complete_) - { - c.second->pCache()->sweep(); - c.second->nCache()->sweep(); - if (c.second->pCache()->getTargetSize() > sz) - c.second->pCache()->setTargetSize(sz); - } + + for (auto const& e : complete_) + e.second->sweep(); + if (incomplete_) - { - incomplete_->pCache()->sweep(); - incomplete_->nCache()->sweep(); - if (incomplete_->pCache()->getTargetSize() > sz) - incomplete_->pCache()->setTargetSize(sz); - } + incomplete_->sweep(); } std::shared_ptr @@ -1153,65 +1073,71 @@ DatabaseShardImp::findShardIndexToAdd( } void -DatabaseShardImp::updateStats(std::lock_guard&) +DatabaseShardImp::setFileStats(std::lock_guard&) { - // Calculate shard file sizes and update status string - std::uint32_t filesPerShard {0}; + fileSz_ = 0; + fdRequired_ = 0; if (!complete_.empty()) { - status_.clear(); - filesPerShard = complete_.begin()->second->fdlimit(); - std::uint64_t avgShardSz {0}; - for (auto it = complete_.begin(); it != complete_.end(); ++it) + for (auto const& e : complete_) { - if (it == complete_.begin()) - status_ = std::to_string(it->first); - else - { - if (it->first - std::prev(it)->first > 1) - { - if (status_.back() == '-') - status_ += std::to_string(std::prev(it)->first); - status_ += "," + std::to_string(it->first); - } - else - { - if (status_.back() != '-') - status_ += "-"; - if (std::next(it) == complete_.end()) - status_ += std::to_string(it->first); - } - } - avgShardSz += it->second->fileSize(); + fileSz_ += e.second->fileSize(); + fdRequired_ += e.second->fdRequired(); } - if (backed_) - avgShardSz_ = avgShardSz / complete_.size(); + avgShardFileSz_ = fileSz_ / complete_.size(); + } + else + avgShardFileSz_ = 0; + + if (incomplete_) + { + fileSz_ += incomplete_->fileSize(); + fdRequired_ += incomplete_->fdRequired(); } - else if(incomplete_) - filesPerShard = incomplete_->fdlimit(); if (!backed_) return; - fdLimit_ = 1 + (filesPerShard * - (complete_.size() + (incomplete_ ? 1 : 0))); + // Require at least 15 file descriptors + fdRequired_ = std::max(fdRequired_, 15); - if (usedDiskSpace_ >= maxDiskSpace_) + if (fileSz_ >= maxFileSz_) { - JLOG(j_.warn()) << - "Maximum size reached"; + JLOG(j_.warn()) << "maximum storage size reached"; canAdd_ = false; } - else + else if (maxFileSz_ - fileSz_ > available()) { - auto const sz = maxDiskSpace_ - usedDiskSpace_; - if (sz > available()) + JLOG(j_.warn()) << + "maximum shard store size exceeds available storage space"; + } +} + +void +DatabaseShardImp::updateStatus(std::lock_guard&) +{ + status_.clear(); + status_.reserve(complete_.size() * 8); + for (auto it = complete_.begin(); it != complete_.end(); ++it) + { + if (it == complete_.begin()) + status_ = std::to_string(it->first); + else { - JLOG(j_.warn()) << - "Max Shard Store size exceeds " - "remaining free disk space"; + if (it->first - std::prev(it)->first > 1) + { + if (status_.back() == '-') + status_ += std::to_string(std::prev(it)->first); + status_ += ',' + std::to_string(it->first); + } + else + { + if (status_.back() != '-') + status_ += '-'; + if (std::next(it) == complete_.end()) + status_ += std::to_string(it->first); + } } - fdLimit_ += (filesPerShard * (sz / avgShardSz_)); } } @@ -1221,6 +1147,7 @@ DatabaseShardImp::selectCache(std::uint32_t seq) auto const shardIndex {seqToShardIndex(seq)}; std::lock_guard lock(m_); assert(init_); + { auto it = complete_.find(shardIndex); if (it != complete_.end()) @@ -1229,6 +1156,7 @@ DatabaseShardImp::selectCache(std::uint32_t seq) it->second->nCache()); } } + if (incomplete_ && incomplete_->index() == shardIndex) { return std::make_pair(incomplete_->pCache(), @@ -1238,10 +1166,7 @@ DatabaseShardImp::selectCache(std::uint32_t seq) // Used to validate import shards auto it = preShards_.find(shardIndex); if (it != preShards_.end() && it->second) - { - return std::make_pair(it->second->pCache(), - it->second->nCache()); - } + return std::make_pair(it->second->pCache(), it->second->nCache()); return {}; } @@ -1254,8 +1179,8 @@ DatabaseShardImp::available() const } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); + JLOG(j_.error()) << "exception " << e.what() << + " in function " << __func__; return 0; } } diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 841c17b0c3..40fb1d2a79 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -157,7 +157,7 @@ public: getCacheHitRate() override; void - tune(int size, std::chrono::seconds age) override; + tune(int size, std::chrono::seconds age) override {}; void sweep() override; @@ -194,11 +194,11 @@ private: // The name associated with the backend used with the shard store std::string backendName_; - // Maximum disk space the DB can use (in bytes) - std::uint64_t maxDiskSpace_; + // Maximum storage space the shard store can utilize (in bytes) + std::uint64_t maxFileSz_; - // Disk space used to store the shards (in bytes) - std::uint64_t usedDiskSpace_ {0}; + // Storage space utilized by the shard store (in bytes) + std::uint64_t fileSz_ {0}; // Each shard stores 16384 ledgers. The earliest shard may store // less if the earliest ledger sequence truncates its beginning. @@ -208,16 +208,12 @@ private: // The earliest shard index std::uint32_t const earliestShardIndex_; - // Average disk space a shard requires (in bytes) - std::uint64_t avgShardSz_; - - // Shard cache tuning - int cacheSz_ {shardCacheSz}; - std::chrono::seconds cacheAge_ {shardCacheAge}; + // Average storage space required by a shard (in bytes) + std::uint64_t avgShardFileSz_; // File name used to mark shards being imported from node store static constexpr auto importMarker_ = "import"; - + std::shared_ptr fetchFrom(uint256 const& hash, std::uint32_t seq) override; @@ -233,23 +229,19 @@ private: findShardIndexToAdd(std::uint32_t validLedgerSeq, std::lock_guard&); - // Updates stats + // Set storage and file descriptor usage stats // Lock must be held void - updateStats(std::lock_guard&); + setFileStats(std::lock_guard&); + + // Update status string + // Lock must be held + void + updateStatus(std::lock_guard&); std::pair, std::shared_ptr> selectCache(std::uint32_t seq); - // Returns the tune cache size divided by the number of shards - // Lock must be held - int - calcTargetCacheSz(std::lock_guard&) const - { - return std::max(shardCacheSz, cacheSz_ / std::max( - 1, static_cast(complete_.size() + (incomplete_ ? 1 : 0)))); - } - // Returns available storage space std::uint64_t available() const; diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index f9e6b662e1..759ceceebd 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -19,27 +19,31 @@ #include #include +#include +#include +#include #include #include +#include +#include + #include namespace ripple { namespace NodeStore { -Shard::Shard(DatabaseShard const& db, std::uint32_t index, - int cacheSz, std::chrono::seconds cacheAge, beast::Journal& j) - : index_(index) +Shard::Shard( + Application& app, + DatabaseShard const& db, + std::uint32_t index, + beast::Journal& j) + : app_(app) + , index_(index) , firstSeq_(db.firstLedgerSeq(index)) , lastSeq_(std::max(firstSeq_, db.lastLedgerSeq(index))) , maxLedgers_(index == db.earliestShardIndex() ? lastSeq_ - firstSeq_ + 1 : db.ledgersPerShard()) - , pCache_(std::make_shared( - "shard " + std::to_string(index_), - cacheSz, cacheAge, stopwatch(), j)) - , nCache_(std::make_shared( - "shard " + std::to_string(index_), - stopwatch(), cacheSz, cacheAge)) , dir_(db.getRootDir() / std::to_string(index_)) , control_(dir_ / controlFileName) , j_(j) @@ -49,38 +53,47 @@ Shard::Shard(DatabaseShard const& db, std::uint32_t index, } bool -Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx) +Shard::open(Scheduler& scheduler, nudb::context& ctx) { assert(!backend_); using namespace boost::filesystem; using namespace boost::beast::detail; - std::string const type (get(config, "type", "nudb")); + Config const& config {app_.config()}; + Section section {config.section(ConfigSection::shardDatabase())}; + std::string const type (get(section, "type", "nudb")); auto factory {Manager::instance().find(type)}; if (!factory) { JLOG(j_.error()) << "shard " << index_ << - ": failed to create shard store type " << type; + " failed to create backend type " << type; return false; } - config.set("path", dir_.string()); + section.set("path", dir_.string()); backend_ = factory->createInstance( - NodeObject::keyBytes, config, scheduler, ctx, j_); + NodeObject::keyBytes, section, scheduler, ctx, j_); auto const preexist {exists(dir_)}; - auto fail = [&](std::string msg) + auto fail = [this, preexist](std::string const& msg) { + pCache_.reset(); + nCache_.reset(); + backend_.reset(); + lgrSQLiteDB_.reset(); + txSQLiteDB_.reset(); + storedSeqs_.clear(); + lastStored_.reset(); + + if (!preexist) + removeAll(dir_, j_); + if (!msg.empty()) { JLOG(j_.error()) << - "shard " << index_ << ": " << msg; + "shard " << index_ << " " << msg; } - if (backend_) - backend_->close(); - if (!preexist) - removeAll(dir_, j_); return false; }; @@ -112,100 +125,84 @@ Shard::open(Section config, Scheduler& scheduler, nudb::context& ctx) if (boost::icl::first(storedSeqs_) < firstSeq_ || boost::icl::last(storedSeqs_) > lastSeq_) { - return fail("invalid control file"); + return fail("has an invalid control file"); } if (boost::icl::length(storedSeqs_) >= maxLedgers_) { - JLOG(j_.error()) << + JLOG(j_.warn()) << "shard " << index_ << - ": found control file for complete shard"; - storedSeqs_.clear(); - complete_ = true; + " has a control file for complete shard"; + setComplete(); remove_all(control_); } } } else - complete_ = true; + setComplete(); - // Calculate file foot print of backend files - for (auto const& p : recursive_directory_iterator(dir_)) - if (!is_directory(p)) - fileSize_ += file_size(p); + setCache(); + if (!initSQLite() || !setFileStats()) + return fail({}); } catch (std::exception const& e) { - return fail(e.what()); + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } return true; } bool -Shard::setStored(std::shared_ptr const& l) +Shard::setStored(std::shared_ptr const& ledger) { assert(backend_&& !complete_); - if (boost::icl::contains(storedSeqs_, l->info().seq)) + if (boost::icl::contains(storedSeqs_, ledger->info().seq)) { JLOG(j_.debug()) << "shard " << index_ << - " ledger seq " << l->info().seq << - " already stored"; + " has ledger sequence " << ledger->info().seq << " already stored"; return false; } + + if (!setSQLiteStored(ledger)) + return false; + + // Check if the shard is complete if (boost::icl::length(storedSeqs_) >= maxLedgers_ - 1) { + setComplete(); if (backend_->backed()) { if (!removeAll(control_, j_)) return false; - // Update file foot print of backend files - using namespace boost::filesystem; - std::uint64_t sz {0}; - try - { - for (auto const& p : recursive_directory_iterator(dir_)) - if (!is_directory(p)) - sz += file_size(p); - } - catch (const filesystem_error& e) - { - JLOG(j_.error()) << - "exception: " << e.what(); - fileSize_ = std::max(fileSize_, sz); + setCache(); + if (!initSQLite() || !setFileStats()) return false; - } - fileSize_ = sz; } - complete_ = true; - storedSeqs_.clear(); - - JLOG(j_.debug()) << - "shard " << index_ << - " ledger seq " << l->info().seq << - " stored. Shard complete"; } else { - storedSeqs_.insert(l->info().seq); - lastStored_ = l; + storedSeqs_.insert(ledger->info().seq); if (backend_->backed() && !saveControl()) return false; - - JLOG(j_.debug()) << - "shard " << index_ << - " ledger seq " << l->info().seq << - " stored"; } + JLOG(j_.debug()) << + "shard " << index_ << + " stored ledger sequence " << ledger->info().seq << + (complete_ ? " and is complete" : ""); + + lastStored_ = ledger; return true; } boost::optional Shard::prepare() { + assert(backend_); if (storedSeqs_.empty()) return lastSeq_; return prevMissing(storedSeqs_, 1 + lastSeq_, firstSeq_); @@ -214,6 +211,7 @@ Shard::prepare() bool Shard::contains(std::uint32_t seq) const { + assert(backend_); if (seq < firstSeq_ || seq > lastSeq_) return false; if (complete_) @@ -221,44 +219,53 @@ Shard::contains(std::uint32_t seq) const return boost::icl::contains(storedSeqs_, seq); } +void +Shard::sweep() +{ + assert(backend_); + pCache_->sweep(); + nCache_->sweep(); +} + bool -Shard::validate(Application& app) +Shard::validate() { uint256 hash; std::uint32_t seq; - std::shared_ptr l; + std::shared_ptr ledger; + auto fail = [this](std::string const& msg) + { + JLOG(j_.error()) << "shard " << index_ << " " << msg; + return false; + }; + // Find the hash of the last ledger in this shard { - std::tie(l, seq, hash) = loadLedgerHelper( + std::tie(ledger, seq, hash) = loadLedgerHelper( "WHERE LedgerSeq >= " + std::to_string(lastSeq_) + - " order by LedgerSeq desc limit 1", app, false); - if (!l) - { - JLOG(j_.error()) << - "shard " << index_ << - " unable to validate. No lookup data"; - return false; - } + " order by LedgerSeq desc limit 1", app_, false); + if (!ledger) + return fail("is unable to validate due to lacking lookup data"); + if (seq != lastSeq_) { - l->setImmutable(app.config()); + ledger->setImmutable(app_.config()); boost::optional h; + try { - h = hashOfSeq(*l, lastSeq_, j_); + h = hashOfSeq(*ledger, lastSeq_, j_); } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); - return false; + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } + if (!h) { - JLOG(j_.error()) << - "shard " << index_ << - " No hash for last ledger seq " << lastSeq_; - return false; + return fail("is missing hash for last ledger sequence " + + std::to_string(lastSeq_)); } hash = *h; seq = lastSeq_; @@ -266,9 +273,8 @@ Shard::validate(Application& app) } JLOG(j_.debug()) << - "Validating shard " << index_ << - " ledgers " << firstSeq_ << - "-" << lastSeq_; + "shard " << index_ << + " has ledger sequences " << firstSeq_ << "-" << lastSeq_; // Use a short age to keep memory consumption low auto const savedAge {pCache_->getTargetAge()}; @@ -282,44 +288,45 @@ Shard::validate(Application& app) auto nObj = valFetch(hash); if (!nObj) break; - l = std::make_shared( + ledger = std::make_shared( InboundLedger::deserializeHeader(makeSlice(nObj->getData()), - true), app.config(), *app.shardFamily()); - if (l->info().hash != hash || l->info().seq != seq) + true), app_.config(), *app_.shardFamily()); + if (ledger->info().seq != seq) { - JLOG(j_.error()) << - "ledger seq " << seq << - " hash " << hash << - " cannot be a ledger"; + fail("encountered invalid ledger sequence " + std::to_string(seq)); break; } - l->stateMap().setLedgerSeq(seq); - l->txMap().setLedgerSeq(seq); - l->setImmutable(app.config()); - if (!l->stateMap().fetchRoot( - SHAMapHash {l->info().accountHash}, nullptr)) + if (ledger->info().hash != hash) { - JLOG(j_.error()) << - "ledger seq " << seq << - " missing Account State root"; + fail("encountered invalid ledger hash " + to_string(hash) + + " on sequence " + std::to_string(seq)); break; } - if (l->info().txHash.isNonZero()) + ledger->stateMap().setLedgerSeq(seq); + ledger->txMap().setLedgerSeq(seq); + ledger->setImmutable(app_.config()); + if (!ledger->stateMap().fetchRoot( + SHAMapHash {ledger->info().accountHash}, nullptr)) { - if (!l->txMap().fetchRoot( - SHAMapHash {l->info().txHash}, nullptr)) + fail("is missing root STATE node on sequence " + + std::to_string(seq)); + break; + } + if (ledger->info().txHash.isNonZero()) + { + if (!ledger->txMap().fetchRoot( + SHAMapHash {ledger->info().txHash}, nullptr)) { - JLOG(j_.error()) << - "ledger seq " << seq << - " missing TX root"; + fail("is missing root TXN node on sequence " + + std::to_string(seq)); break; } } - if (!valLedger(l, next)) + if (!valLedger(ledger, next)) break; - hash = l->info().parentHash; + hash = ledger->info().parentHash; --seq; - next = l; + next = ledger; if (seq % 128 == 0) pCache_->sweep(); } @@ -330,79 +337,87 @@ Shard::validate(Application& app) if (seq >= firstSeq_) { - JLOG(j_.error()) << - "shard " << index_ << - (complete_ ? " is invalid, failed" : " is incomplete, stopped") << - " at seq " << seq << - " hash " << hash; - return false; + return fail(std::string(" is ") + + (complete_ ? "invalid, failed" : "incomplete, stopped") + + " on hash " + to_string(hash) + " on sequence " + + std::to_string(seq)); } JLOG(j_.debug()) << - "shard " << index_ << - " is complete."; + "shard " << index_ << " is valid and complete"; return true; } bool -Shard::valLedger(std::shared_ptr const& l, +Shard::valLedger(std::shared_ptr const& ledger, std::shared_ptr const& next) { - if (l->info().hash.isZero() || l->info().accountHash.isZero()) + auto fail = [this](std::string const& msg) { - JLOG(j_.error()) << - "invalid ledger"; + JLOG(j_.error()) << "shard " << index_ << " " << msg; return false; + }; + + if (ledger->info().hash.isZero()) + { + return fail("encountered a zero ledger hash on sequence " + + std::to_string(ledger->info().seq)); } + if (ledger->info().accountHash.isZero()) + { + return fail("encountered a zero account hash on sequence " + + std::to_string(ledger->info().seq)); + } + bool error {false}; - auto f = [&, this](SHAMapAbstractNode& node) { + auto f = [this, &error](SHAMapAbstractNode& node) + { if (!valFetch(node.getNodeHash().as_uint256())) error = true; return !error; }; + // Validate the state map - if (l->stateMap().getHash().isNonZero()) + if (ledger->stateMap().getHash().isNonZero()) { - if (!l->stateMap().isValid()) + if (!ledger->stateMap().isValid()) { - JLOG(j_.error()) << - "invalid state map"; - return false; + return fail("has an invalid state map on sequence " + + std::to_string(ledger->info().seq)); } + try { - if (next && next->info().parentHash == l->info().hash) - l->stateMap().visitDifferences(&next->stateMap(), f); + if (next && next->info().parentHash == ledger->info().hash) + ledger->stateMap().visitDifferences(&next->stateMap(), f); else - l->stateMap().visitNodes(f); + ledger->stateMap().visitNodes(f); } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); - return false; + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } if (error) return false; } - // Validate the tx map - if (l->info().txHash.isNonZero()) + // Validate the transaction map + if (ledger->info().txHash.isNonZero()) { - if (!l->txMap().isValid()) + if (!ledger->txMap().isValid()) { - JLOG(j_.error()) << - "invalid transaction map"; - return false; + return fail("has an invalid transaction map on sequence " + + std::to_string(ledger->info().seq)); } + try { - l->txMap().visitNodes(f); + ledger->txMap().visitNodes(f); } catch (std::exception const& e) { - JLOG(j_.error()) << - "exception: " << e.what(); - return false; + return fail(std::string("exception ") + + e.what() + " in function " + __func__); } if (error) return false; @@ -415,6 +430,11 @@ Shard::valFetch(uint256 const& hash) { assert(backend_); std::shared_ptr nObj; + auto fail = [this](std::string const& msg) + { + JLOG(j_.error()) << "shard " << index_ << " " << msg; + }; + try { switch (backend_->fetch(hash.begin(), &nObj)) @@ -423,29 +443,324 @@ Shard::valFetch(uint256 const& hash) break; case notFound: { - JLOG(j_.error()) << - "NodeObject not found. hash " << hash; + fail("is missing node object on hash " + to_string(hash)); break; } case dataCorrupt: { - JLOG(j_.error()) << - "NodeObject is corrupt. hash " << hash; + fail("has a corrupt node object on hash " + to_string(hash)); break; } default: - { - JLOG(j_.error()) << - "unknown error. hash " << hash; + fail("encountered unknown error on hash " + to_string(hash)); } + } + catch (std::exception const& e) + { + fail(std::string("exception ") + + e.what() + " in function " + __func__); + } + return nObj; +} + +void +Shard::setComplete() +{ + storedSeqs_.clear(); + complete_ = true; +} + +void +Shard::setCache() +{ + // complete shards use the smallest cache and + // fastest expiration to reduce memory consumption. + // The incomplete shard is set according to configuration. + if (!pCache_) + { + auto const name {"shard " + std::to_string(index_)}; + auto const sz {complete_ ? + Config::getSize(siNodeCacheSize, 0) : + app_.config().getSize(siNodeCacheSize)}; + auto const age {std::chrono::seconds{complete_ ? + Config::getSize(siNodeCacheAge, 0) : + app_.config().getSize(siNodeCacheAge)}}; + + pCache_ = std::make_shared(name, sz, age, stopwatch(), j_); + nCache_ = std::make_shared(name, stopwatch(), sz, age); + } + else + { + auto const sz {Config::getSize(siNodeCacheSize, 0)}; + pCache_->setTargetSize(sz); + nCache_->setTargetSize(sz); + + auto const age {std::chrono::seconds{ + Config::getSize(siNodeCacheAge, 0)}}; + pCache_->setTargetAge(age); + nCache_->setTargetAge(age); + } +} + +bool +Shard::initSQLite() +{ + Config const& config {app_.config()}; + DatabaseCon::Setup setup; + setup.startUp = config.START_UP; + setup.standAlone = config.standalone(); + setup.dataDir = dir_; + + try + { + if (complete_) + { + using namespace boost::filesystem; + + // Remove WAL files if they exist + for (auto const& d : directory_iterator(dir_)) + { + if (is_regular_file(d) && + boost::iends_with(extension(d), "-wal")) + { + // Closing the session forces a checkpoint + if (!lgrSQLiteDB_) + { + lgrSQLiteDB_ = std::make_unique ( + setup, + LgrDBName, + LgrDBPragma, + LgrDBInit); + } + lgrSQLiteDB_->getSession().close(); + + if (!txSQLiteDB_) + { + txSQLiteDB_ = std::make_unique ( + setup, + TxDBName, + TxDBPragma, + TxDBInit); + } + txSQLiteDB_->getSession().close(); + break; + } + } + + lgrSQLiteDB_ = std::make_unique ( + setup, + LgrDBName, + CompleteShardDBPragma, + LgrDBInit); + lgrSQLiteDB_->getSession() << + boost::str(boost::format("PRAGMA cache_size=-%d;") % + kilobytes(Config::getSize(siLgrDBCache, 0))); + + txSQLiteDB_ = std::make_unique ( + setup, + TxDBName, + CompleteShardDBPragma, + TxDBInit); + txSQLiteDB_->getSession() << + boost::str(boost::format("PRAGMA cache_size=-%d;") % + kilobytes(Config::getSize(siTxnDBCache, 0))); + } + else + { + // The incomplete shard uses a Write Ahead Log for performance + lgrSQLiteDB_ = std::make_unique ( + setup, + LgrDBName, + LgrDBPragma, + LgrDBInit); + lgrSQLiteDB_->getSession() << + boost::str(boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getSize(siLgrDBCache))); + lgrSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs()); + + txSQLiteDB_ = std::make_unique ( + setup, + TxDBName, + TxDBPragma, + TxDBInit); + txSQLiteDB_->getSession() << + boost::str(boost::format("PRAGMA cache_size=-%d;") % + kilobytes(config.getSize(siTxnDBCache))); + txSQLiteDB_->setupCheckpointing(&app_.getJobQueue(), app_.logs()); } } catch (std::exception const& e) { JLOG(j_.error()) << - "exception: " << e.what(); + "shard " << index_ << + " exception " << e.what() << + " in function " << __func__; + return false; } - return nObj; + return true; +} + +bool +Shard::setSQLiteStored(std::shared_ptr const& ledger) +{ + auto const seq {ledger->info().seq}; + assert(backend_ && !complete_); + assert(!boost::icl::contains(storedSeqs_, seq)); + + try + { + { + auto& session {txSQLiteDB_->getSession()}; + soci::transaction tr(session); + + session << + "DELETE FROM Transactions WHERE LedgerSeq = :seq;" + , soci::use(seq); + session << + "DELETE FROM AccountTransactions WHERE LedgerSeq = :seq;" + , soci::use(seq); + + if (ledger->info().txHash.isNonZero()) + { + auto const sSeq {std::to_string(seq)}; + if (!ledger->txMap().isValid()) + { + JLOG(j_.error()) << + "shard " << index_ << + " has an invalid transaction map" << + " on sequence " << sSeq; + return false; + } + + for (auto const& item : ledger->txs) + { + auto const txID {item.first->getTransactionID()}; + auto const sTxID {to_string(txID)}; + auto const txMeta {std::make_shared( + txID, ledger->seq(), *item.second)}; + + session << + "DELETE FROM AccountTransactions WHERE TransID = :txID;" + , soci::use(sTxID); + + auto const& accounts = txMeta->getAffectedAccounts(j_); + if (!accounts.empty()) + { + auto const s(boost::str(boost::format( + "('%s','%s',%s,%s)") + % sTxID + % "%s" + % sSeq + % std::to_string(txMeta->getIndex()))); + std::string sql; + sql.reserve((accounts.size() + 1) * 128); + sql = "INSERT INTO AccountTransactions " + "(TransID, Account, LedgerSeq, TxnSeq) VALUES "; + sql += boost::algorithm::join( + accounts | boost::adaptors::transformed( + [&](AccountID const& accountID) + { + return boost::str(boost::format(s) + % ripple::toBase58(accountID)); + }), + ","); + sql += ';'; + session << sql; + + JLOG(j_.trace()) << + "shard " << index_ << + " account transaction: " << sql; + } + else + { + JLOG(j_.warn()) << + "shard " << index_ << + " transaction in ledger " << sSeq << + " affects no accounts"; + } + + Serializer s; + item.second->add(s); + session << + (STTx::getMetaSQLInsertReplaceHeader() + + item.first->getMetaSQL( + seq, + sqlEscape(std::move(s.modData()))) + + ';'); + } + } + + tr.commit (); + } + + auto& session {lgrSQLiteDB_->getSession()}; + soci::transaction tr(session); + + session << + "DELETE FROM Ledgers WHERE LedgerSeq = :seq;" + , soci::use(seq); + session << + "INSERT OR REPLACE INTO Ledgers (" + "LedgerHash, LedgerSeq, PrevHash, TotalCoins, ClosingTime," + "PrevClosingTime, CloseTimeRes, CloseFlags, AccountSetHash," + "TransSetHash)" + "VALUES (" + ":ledgerHash, :ledgerSeq, :prevHash, :totalCoins, :closingTime," + ":prevClosingTime, :closeTimeRes, :closeFlags, :accountSetHash," + ":transSetHash);", + soci::use(to_string(ledger->info().hash)), + soci::use(seq), + soci::use(to_string(ledger->info().parentHash)), + soci::use(to_string(ledger->info().drops)), + soci::use(ledger->info().closeTime.time_since_epoch().count()), + soci::use(ledger->info().parentCloseTime.time_since_epoch().count()), + soci::use(ledger->info().closeTimeResolution.count()), + soci::use(ledger->info().closeFlags), + soci::use(to_string(ledger->info().accountHash)), + soci::use(to_string(ledger->info().txHash)); + + tr.commit(); + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "shard " << index_ << + " exception " << e.what() << + " in function " << __func__; + return false; + } + return true; +} + +bool +Shard::setFileStats() +{ + fileSz_ = 0; + fdRequired_ = 0; + if (backend_->backed()) + { + try + { + using namespace boost::filesystem; + for (auto const& d : directory_iterator(dir_)) + { + if (is_regular_file(d)) + { + fileSz_ += file_size(d); + ++fdRequired_; + } + } + } + catch (std::exception const& e) + { + JLOG(j_.error()) << + "shard " << index_ << + " exception " << e.what() << + " in function " << __func__; + return false; + } + } + return true; } bool @@ -455,10 +770,10 @@ Shard::saveControl() if (!ofs.is_open()) { JLOG(j_.fatal()) << - "shard " << index_ << - " unable to save control file"; + "shard " << index_ << " is unable to save control file"; return false; } + boost::archive::text_oarchive ar(ofs); ar & storedSeqs_; return true; diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 1e5666e38d..31fe047d4b 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -23,6 +23,7 @@ #include #include #include +#include #include #include @@ -66,14 +67,17 @@ class DatabaseShard; class Shard { public: - Shard(DatabaseShard const& db, std::uint32_t index, int cacheSz, - std::chrono::seconds cacheAge, beast::Journal& j); + Shard( + Application& app, + DatabaseShard const& db, + std::uint32_t index, + beast::Journal& j); bool - open(Section config, Scheduler& scheduler, nudb::context& ctx); + open(Scheduler& scheduler, nudb::context& ctx); bool - setStored(std::shared_ptr const& l); + setStored(std::shared_ptr const& ledger); boost::optional prepare(); @@ -81,40 +85,35 @@ public: bool contains(std::uint32_t seq) const; + void + sweep(); + bool - validate(Application& app); + validate(); std::uint32_t index() const {return index_;} bool - complete() const {return complete_;} + complete() const {assert(backend_); return complete_;} std::shared_ptr& - pCache() {return pCache_;} + pCache() {assert(backend_); return pCache_;} std::shared_ptr& - nCache() {return nCache_;} + nCache() {assert(backend_); return nCache_;} std::uint64_t - fileSize() const {return fileSize_;} - - std::shared_ptr const& - getBackend() const - { - assert(backend_); - return backend_; - } + fileSize() const {assert(backend_); return fileSz_;} std::uint32_t - fdlimit() const - { - assert(backend_); - return backend_->fdlimit(); - } + fdRequired() const {assert(backend_); return fdRequired_;} + + std::shared_ptr const& + getBackend() const {assert(backend_); return backend_;} std::shared_ptr - lastStored() {return lastStored_;} + lastStored() {assert(backend_); return lastStored_;} private: friend class boost::serialization::access; @@ -126,6 +125,8 @@ private: static constexpr auto controlFileName = "control.txt"; + Application& app_; + // Shard Index std::uint32_t const index_; @@ -152,12 +153,21 @@ private: // Path to control file boost::filesystem::path const control_; - // Disk space utilized by the shard - std::uint64_t fileSize_ {0}; + // Storage space utilized by the shard + std::uint64_t fileSz_; + + // Number of file descriptors required by the shard + std::uint32_t fdRequired_; // NuDB key/value store for node objects std::shared_ptr backend_; + // Ledger SQLite database used for indexes + std::unique_ptr lgrSQLiteDB_; + + // Transaction SQLite database used for indexes + std::unique_ptr txSQLiteDB_; + beast::Journal j_; // True if shard has its entire ledger range stored @@ -172,7 +182,7 @@ private: // Validate this ledger by walking its SHAMaps // and verifying each merkle tree bool - valLedger(std::shared_ptr const& l, + valLedger(std::shared_ptr const& ledger, std::shared_ptr const& next); // Fetches from the backend and will log @@ -180,9 +190,25 @@ private: std::shared_ptr valFetch(uint256 const& hash); - // Calculate the file foot print of the backend files + // Marks shard immutable, having stored all of its ledgers void - updateFileSize(); + setComplete(); + + // Set the backend cache + void + setCache(); + + // Open/Create SQLite databases + bool + initSQLite(); + + // Create SQLite entries for a ledger stored in this shard's backend + bool + setSQLiteStored(std::shared_ptr const& ledger); + + // Set storage and file descriptor usage stats + bool + setFileStats(); // Save the control file for an incomplete shard bool diff --git a/src/ripple/nodestore/impl/Tuning.h b/src/ripple/nodestore/impl/Tuning.h index 8b6a1b1220..ffa5b852d2 100644 --- a/src/ripple/nodestore/impl/Tuning.h +++ b/src/ripple/nodestore/impl/Tuning.h @@ -34,8 +34,6 @@ enum // Expiration time for cached nodes std::chrono::seconds constexpr cacheTargetAge = std::chrono::minutes{5}; -auto constexpr shardCacheSz = 16384; -std::chrono::seconds constexpr shardCacheAge = std::chrono::minutes{1}; } } diff --git a/src/ripple/unity/app_main1.cpp b/src/ripple/unity/app_main1.cpp index 188ea41dd6..b69669db7d 100644 --- a/src/ripple/unity/app_main1.cpp +++ b/src/ripple/unity/app_main1.cpp @@ -21,4 +21,3 @@ #include #include #include -#include diff --git a/src/test/app/Manifest_test.cpp b/src/test/app/Manifest_test.cpp index 75e42d4956..6f8759b15e 100644 --- a/src/test/app/Manifest_test.cpp +++ b/src/test/app/Manifest_test.cpp @@ -227,7 +227,11 @@ public: { DatabaseCon::Setup setup; setup.dataDir = getDatabasePath (); - DatabaseCon dbCon(setup, dbName, WalletDBInit, WalletDBCount); + DatabaseCon dbCon( + setup, + dbName.data(), + std::array(), + WalletDBInit); auto getPopulatedManifests = [](ManifestCache const& cache) -> std::vector