From d95aab1139fa6332df3f47ab14bd8113e8311f4c Mon Sep 17 00:00:00 2001 From: Devon White Date: Fri, 12 Feb 2021 14:37:01 -0500 Subject: [PATCH] Add lookup DBs to the Relational DB Interface * Create SQLite database for mapping transaction IDs to shard indexes * Create SQLite database for mapping ledger hashes to shard indexes * Create additional test cases for the shard database --- src/ripple/app/main/DBInit.h | 48 ++ src/ripple/app/misc/Transaction.h | 3 +- .../app/rdb/RelationalDBInterface_shards.h | 55 +++ .../backend/RelationalDBInterfaceSqlite.cpp | 446 +++++++++++------- .../rdb/backend/RelationalDBInterfaceSqlite.h | 12 + .../rdb/impl/RelationalDBInterface_shards.cpp | 124 +++++ src/ripple/nodestore/DatabaseShard.h | 82 ++-- .../nodestore/impl/DatabaseShardImp.cpp | 80 ++-- src/ripple/nodestore/impl/DatabaseShardImp.h | 32 +- src/ripple/nodestore/impl/Shard.cpp | 25 +- src/ripple/nodestore/impl/Shard.h | 71 ++- src/test/nodestore/DatabaseShard_test.cpp | 97 ++++ src/test/nodestore/Database_test.cpp | 18 +- 13 files changed, 792 insertions(+), 301 deletions(-) diff --git a/src/ripple/app/main/DBInit.h b/src/ripple/app/main/DBInit.h index 29b3a19f4f..00cfc104df 100644 --- a/src/ripple/app/main/DBInit.h +++ b/src/ripple/app/main/DBInit.h @@ -114,6 +114,54 @@ inline constexpr std::array TxDBInit{ //////////////////////////////////////////////////////////////////////////////// +// The Ledger Meta database maps ledger hashes to shard indexes +inline constexpr auto LgrMetaDBName{"ledger_meta.db"}; + +inline constexpr std::array LgrMetaDBPragma +{ + "PRAGMA page_size=4096;", "PRAGMA journal_size_limit=1582080;", + "PRAGMA max_page_count=2147483646;", +#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP) + "PRAGMA mmap_size=17179869184;" +#endif +}; + +inline constexpr std::array LgrMetaDBInit{ + {"BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS LedgerMeta ( \ + LedgerHash CHARACTER(64) PRIMARY KEY, \ + ShardIndex INTEGER \ + );", + + "END TRANSACTION;"}}; + +//////////////////////////////////////////////////////////////////////////////// + +// Transaction Meta database maps transaction IDs to shard indexes +inline constexpr auto TxMetaDBName{"transaction_meta.db"}; + +inline constexpr std::array TxMetaDBPragma +{ + "PRAGMA page_size=4096;", "PRAGMA journal_size_limit=1582080;", + "PRAGMA max_page_count=2147483646;", +#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP) + "PRAGMA mmap_size=17179869184;" +#endif +}; + +inline constexpr std::array TxMetaDBInit{ + {"BEGIN TRANSACTION;", + + "CREATE TABLE IF NOT EXISTS TransactionMeta ( \ + TransID CHARACTER(64) PRIMARY KEY, \ + ShardIndex INTEGER \ + );", + + "END TRANSACTION;"}}; + +//////////////////////////////////////////////////////////////////////////////// + // Temporary database used with an incomplete shard that is being acquired inline constexpr auto AcquireShardDBName{"acquire.db"}; diff --git a/src/ripple/app/misc/Transaction.h b/src/ripple/app/misc/Transaction.h index 68a65ca502..15222d8d8f 100644 --- a/src/ripple/app/misc/Transaction.h +++ b/src/ripple/app/misc/Transaction.h @@ -28,9 +28,8 @@ #include #include #include -#include - #include +#include namespace ripple { diff --git a/src/ripple/app/rdb/RelationalDBInterface_shards.h b/src/ripple/app/rdb/RelationalDBInterface_shards.h index 51bbdce56f..16ef67d210 100644 --- a/src/ripple/app/rdb/RelationalDBInterface_shards.h +++ b/src/ripple/app/rdb/RelationalDBInterface_shards.h @@ -23,6 +23,7 @@ #include #include #include +#include #include namespace ripple { @@ -35,6 +36,60 @@ struct DatabasePair /* Shard DB */ +/** + * @brief makeMetaDBs Opens ledger and transaction 'meta' databases which + * map ledger hashes and transaction IDs to the index of the shard + * that holds the ledger or transaction. + * @param config Config object. + * @param setup Path to database and opening parameters. + * @param checkpointerSetup Database checkpointer setup. + * @return Struct DatabasePair which contains unique pointers to the ledger + * and transaction databases. + */ +DatabasePair +makeMetaDBs( + Config const& config, + DatabaseCon::Setup const& setup, + DatabaseCon::CheckpointerSetup const& checkpointerSetup); + +/** + * @brief saveLedgerMeta Stores (transaction ID -> shard index) and + * (ledger hash -> shard index) mappings in the meta databases. + * @param ledger The ledger. + * @param app Application object. + * @param lgrMetaSession Session to ledger meta database. + * @param txnMetaSession Session to transaction meta database. + * @param shardIndex The index of the shard that contains this ledger. + * @return True on success. + */ +bool +saveLedgerMeta( + std::shared_ptr const& ledger, + Application& app, + soci::session& lgrMetaSession, + soci::session& txnMetaSession, + std::uint32_t shardIndex); + +/** + * @brief getShardIndexforLedger Queries the ledger meta database to + * retrieve the index of the shard that contains this ledger. + * @param session Session to the database. + * @param hash Hash of the ledger. + * @return The index of the shard on success, otherwise an unseated value. + */ +std::optional +getShardIndexforLedger(soci::session& session, LedgerHash const& hash); + +/** + * @brief getShardIndexforTransaction Queries the transaction meta database to + * retrieve the index of the shard that contains this transaction. + * @param session Session to the database. + * @param id ID of the transaction. + * @return The index of the shard on success, otherwise an unseated value. + */ +std::optional +getShardIndexforTransaction(soci::session& session, TxID const& id); + /** * @brief makeShardCompleteLedgerDBs Opens shard databases for already * verified shard and returns its descriptors. diff --git a/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.cpp b/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.cpp index 81d422e54f..bece17cbe9 100644 --- a/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.cpp +++ b/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.cpp @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -48,16 +49,27 @@ public: : app_(app), j_(app_.journal("Ledger")) { DatabaseCon::Setup setup = setup_DatabaseCon(config, j_); - auto res = makeLedgerDBs( - config, - setup, - DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()}); - if (!res) + if (!makeLedgerDBs( + config, + setup, + DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()})) { - JLOG(app_.journal("Application").fatal()) - << "AccountTransactions database " - "should not have a primary key"; - Throw(); + JLOG(app_.journal("RelationalDBInterfaceSqlite").fatal()) + << "AccountTransactions database should not have a primary key"; + Throw( + "AccountTransactions database initialization failed."); + } + + if (app.getShardStore() && + !makeMetaDBs( + config, + setup, + DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()})) + { + JLOG(app_.journal("RelationalDBInterfaceSqlite").fatal()) + << "Error during meta DB init"; + Throw( + "Shard meta database initialization failed."); } } @@ -153,7 +165,7 @@ public: std::variant getTransaction( uint256 const& id, - std::optional> const& range, + std::optional> const& range, error_code_i& ec) override; bool @@ -171,10 +183,17 @@ public: int getKBUsedTransaction() override; + void + closeLedgerDB() override; + + void + closeTransactionDB() override; + private: Application& app_; beast::Journal j_; std::unique_ptr lgrdb_, txdb_; + std::unique_ptr lgrMetaDB_, txMetaDB_; /** * @brief makeLedgerDBs Opens node ledger and transaction databases, @@ -190,6 +209,20 @@ private: DatabaseCon::Setup const& setup, DatabaseCon::CheckpointerSetup const& checkpointerSetup); + /** + * @brief makeMetaDBs Opens shard index lookup databases, and saves + * their descriptors into internal variables. + * @param config Config object. + * @param setup Path to database and other opening parameters. + * @param checkpointerSetup Checkpointer parameters. + * @return True if node databases opened successfully. + */ + bool + makeMetaDBs( + Config const& config, + DatabaseCon::Setup const& setup, + DatabaseCon::CheckpointerSetup const& checkpointerSetup); + /** * @brief seqToShardIndex Converts ledgers sequence to shard index. * @param ledgerSeq Ledger sequence. @@ -244,7 +277,7 @@ private: } /** - * @brief checkoutTransaction Checkouts and returns node ledger DB. + * @brief checkoutTransaction Checks out and returns node ledger DB. * @return Session to node ledger DB. */ auto @@ -254,7 +287,7 @@ private: } /** - * @brief checkoutTransaction Checkouts and returns node transaction DB. + * @brief checkoutTransaction Checks out and returns node transaction DB. * @return Session to node transaction DB. */ auto @@ -264,7 +297,7 @@ private: } /** - * @brief doLedger Checkouts ledger database for shard + * @brief doLedger Checks out ledger database for shard * containing given ledger and calls given callback function passing * shard index and session with the database to it. * @param ledgerSeq Ledger sequence. @@ -274,14 +307,14 @@ private: bool doLedger( LedgerIndex ledgerSeq, - std::function const& - callback) + std::function const& callback) { - return app_.getShardStore()->callForLedgerSQL(ledgerSeq, callback); + return app_.getShardStore()->callForLedgerSQLByLedgerSeq( + ledgerSeq, callback); } /** - * @brief doTransaction Checkouts transaction database for shard + * @brief doTransaction Checks out transaction database for shard * containing given ledger and calls given callback function passing * shard index and session with the database to it. * @param ledgerSeq Ledger sequence. @@ -291,14 +324,14 @@ private: bool doTransaction( LedgerIndex ledgerSeq, - std::function const& - callback) + std::function const& callback) { - return app_.getShardStore()->callForTransactionSQL(ledgerSeq, callback); + return app_.getShardStore()->callForTransactionSQLByLedgerSeq( + ledgerSeq, callback); } /** - * @brief iterateLedgerForward Checkouts ledger databases for + * @brief iterateLedgerForward Checks out ledger databases for * all shards in ascending order starting from given shard index * until shard with the largest index visited or callback returned * false. For each visited shard calls given callback function @@ -311,7 +344,8 @@ private: bool iterateLedgerForward( std::optional firstIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) { return app_.getShardStore()->iterateLedgerSQLsForward( @@ -319,7 +353,7 @@ private: } /** - * @brief iterateTransactionForward Checkouts transaction databases for + * @brief iterateTransactionForward Checks out transaction databases for * all shards in ascending order starting from given shard index * until shard with the largest index visited or callback returned * false. For each visited shard calls given callback function @@ -332,7 +366,8 @@ private: bool iterateTransactionForward( std::optional firstIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) { return app_.getShardStore()->iterateLedgerSQLsForward( @@ -340,7 +375,7 @@ private: } /** - * @brief iterateLedgerBack Checkouts ledger databases for + * @brief iterateLedgerBack Checks out ledger databases for * all shards in descending order starting from given shard index * until shard with the smallest index visited or callback returned * false. For each visited shard calls given callback function @@ -353,7 +388,8 @@ private: bool iterateLedgerBack( std::optional firstIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) { return app_.getShardStore()->iterateLedgerSQLsBack( @@ -361,7 +397,7 @@ private: } /** - * @brief iterateTransactionForward Checkouts transaction databases for + * @brief iterateTransactionForward Checks out transaction databases for * all shards in descending order starting from given shard index * until shard with the smallest index visited or callback returned * false. For each visited shard calls given callback function @@ -374,7 +410,8 @@ private: bool iterateTransactionBack( std::optional firstIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) { return app_.getShardStore()->iterateLedgerSQLsBack( @@ -395,6 +432,21 @@ RelationalDBInterfaceSqliteImp::makeLedgerDBs( return res; } +bool +RelationalDBInterfaceSqliteImp::makeMetaDBs( + Config const& config, + DatabaseCon::Setup const& setup, + DatabaseCon::CheckpointerSetup const& checkpointerSetup) +{ + auto [lgrMetaDB, txMetaDB] = + ripple::makeMetaDBs(config, setup, checkpointerSetup); + + txMetaDB_ = std::move(txMetaDB); + lgrMetaDB_ = std::move(lgrMetaDB); + + return true; +} + std::optional RelationalDBInterfaceSqliteImp::getMinLedgerSeq() { @@ -407,10 +459,11 @@ RelationalDBInterfaceSqliteImp::getMinLedgerSeq() /* else use shard databases */ std::optional res; - iterateLedgerForward({}, [&](soci::session& session, std::uint32_t index) { - res = ripple::getMinLedgerSeq(session, TableType::Ledgers); - return !res; - }); + iterateLedgerForward( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + res = ripple::getMinLedgerSeq(session, TableType::Ledgers); + return !res; + }); return res; } @@ -427,7 +480,7 @@ RelationalDBInterfaceSqliteImp::getTransactionsMinLedgerSeq() /* else use shard databases */ std::optional res; iterateTransactionForward( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { res = ripple::getMinLedgerSeq(session, TableType::Transactions); return !res; }); @@ -447,7 +500,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionsMinLedgerSeq() /* else use shard databases */ std::optional res; iterateTransactionForward( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { res = ripple::getMinLedgerSeq( session, TableType::AccountTransactions); return !res; @@ -467,10 +520,11 @@ RelationalDBInterfaceSqliteImp::getMaxLedgerSeq() /* else use shard databases */ std::optional res; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - res = ripple::getMaxLedgerSeq(session, TableType::Ledgers); - return !res; - }); + iterateLedgerBack( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + res = ripple::getMaxLedgerSeq(session, TableType::Ledgers); + return !res; + }); return res; } @@ -478,7 +532,7 @@ void RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq( LedgerIndex ledgerSeq) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -487,7 +541,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq( } /* else use shard database */ - doTransaction(ledgerSeq, [&](soci::session& session, std::uint32_t index) { + doTransaction(ledgerSeq, [&](soci::session& session) { ripple::deleteByLedgerSeq(session, TableType::Transactions, ledgerSeq); return true; }); @@ -496,7 +550,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq( void RelationalDBInterfaceSqliteImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -507,7 +561,7 @@ RelationalDBInterfaceSqliteImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq) /* else use shard databases */ iterateLedgerBack( seqToShardIndex(ledgerSeq), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { ripple::deleteBeforeLedgerSeq( session, TableType::Ledgers, ledgerSeq); return true; @@ -518,7 +572,7 @@ void RelationalDBInterfaceSqliteImp::deleteTransactionsBeforeLedgerSeq( LedgerIndex ledgerSeq) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -529,7 +583,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionsBeforeLedgerSeq( /* else use shard databases */ iterateTransactionBack( seqToShardIndex(ledgerSeq), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { ripple::deleteBeforeLedgerSeq( session, TableType::Transactions, ledgerSeq); return true; @@ -540,7 +594,7 @@ void RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq( LedgerIndex ledgerSeq) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -552,7 +606,7 @@ RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq( /* else use shard databases */ iterateTransactionBack( seqToShardIndex(ledgerSeq), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { ripple::deleteBeforeLedgerSeq( session, TableType::AccountTransactions, ledgerSeq); return true; @@ -562,7 +616,7 @@ RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq( std::size_t RelationalDBInterfaceSqliteImp::getTransactionCount() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -572,7 +626,7 @@ RelationalDBInterfaceSqliteImp::getTransactionCount() /* else use shard databases */ std::size_t rows = 0; iterateTransactionForward( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { rows += ripple::getRows(session, TableType::Transactions); return true; }); @@ -582,7 +636,7 @@ RelationalDBInterfaceSqliteImp::getTransactionCount() std::size_t RelationalDBInterfaceSqliteImp::getAccountTransactionCount() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -592,7 +646,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionCount() /* else use shard databases */ std::size_t rows = 0; iterateTransactionForward( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { rows += ripple::getRows(session, TableType::AccountTransactions); return true; }); @@ -602,7 +656,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionCount() RelationalDBInterface::CountMinMax RelationalDBInterfaceSqliteImp::getLedgerCountMinMax() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -611,17 +665,18 @@ RelationalDBInterfaceSqliteImp::getLedgerCountMinMax() /* else use shard databases */ CountMinMax res{0, 0, 0}; - iterateLedgerForward({}, [&](soci::session& session, std::uint32_t index) { - auto r = ripple::getRowsMinMax(session, TableType::Ledgers); - if (r.numberOfRows) - { - res.numberOfRows += r.numberOfRows; - if (res.minLedgerSequence == 0) - res.minLedgerSequence = r.minLedgerSequence; - res.maxLedgerSequence = r.maxLedgerSequence; - } - return true; - }); + iterateLedgerForward( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + auto r = ripple::getRowsMinMax(session, TableType::Ledgers); + if (r.numberOfRows) + { + res.numberOfRows += r.numberOfRows; + if (res.minLedgerSequence == 0) + res.minLedgerSequence = r.minLedgerSequence; + res.maxLedgerSequence = r.maxLedgerSequence; + } + return true; + }); return res; } @@ -630,23 +685,42 @@ RelationalDBInterfaceSqliteImp::saveValidatedLedger( std::shared_ptr const& ledger, bool current) { - /* if databases exists, use it */ + /* if databases exists, use them */ if (existsLedger() && existsTransaction()) { - return ripple::saveValidatedLedger( - *lgrdb_, *txdb_, app_, ledger, current); + if (!ripple::saveValidatedLedger( + *lgrdb_, *txdb_, app_, ledger, current)) + return false; } - /* Todo: use shard databases. Skipped in this PR by propose of Mickey - * Portilla. */ + if (auto shardStore = app_.getShardStore()) + { + if (ledger->info().seq < shardStore->earliestLedgerSeq()) + // For the moment return false only when the ShardStore + // should accept the ledger, but fails when attempting + // to do so, i.e. when saveLedgerMeta fails. Later when + // the ShardStore supercedes the NodeStore, change this + // line to return false if the ledger is too early. + return true; - return false; + auto lgrMetaSession = lgrMetaDB_->checkoutDb(); + auto txMetaSession = txMetaDB_->checkoutDb(); + + return ripple::saveLedgerMeta( + ledger, + app_, + *lgrMetaSession, + *txMetaSession, + shardStore->seqToShardIndex(ledger->info().seq)); + } + + return true; } std::optional RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -655,7 +729,7 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq) /* else use shard database */ std::optional res; - doLedger(ledgerSeq, [&](soci::session& session, std::uint32_t index) { + doLedger(ledgerSeq, [&](soci::session& session) { res = ripple::getLedgerInfoByIndex(session, ledgerSeq, j_); return true; }); @@ -665,7 +739,7 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq) std::optional RelationalDBInterfaceSqliteImp::getNewestLedgerInfo() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -674,14 +748,15 @@ RelationalDBInterfaceSqliteImp::getNewestLedgerInfo() /* else use shard databases */ std::optional res; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - if (auto info = ripple::getNewestLedgerInfo(session, j_)) - { - res = info; - return false; - } - return true; - }); + iterateLedgerBack( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + if (auto info = ripple::getNewestLedgerInfo(session, j_)) + { + res = info; + return false; + } + return true; + }); return res; } @@ -690,7 +765,7 @@ std::optional RelationalDBInterfaceSqliteImp::getLimitedOldestLedgerInfo( LedgerIndex ledgerFirstIndex) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -701,7 +776,7 @@ RelationalDBInterfaceSqliteImp::getLimitedOldestLedgerInfo( std::optional res; iterateLedgerForward( seqToShardIndex(ledgerFirstIndex), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { if (auto info = ripple::getLimitedOldestLedgerInfo( session, ledgerFirstIndex, j_)) { @@ -718,7 +793,7 @@ std::optional RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo( LedgerIndex ledgerFirstIndex) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -727,17 +802,16 @@ RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo( /* else use shard databases */ std::optional res; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - if (auto info = ripple::getLimitedNewestLedgerInfo( - session, ledgerFirstIndex, j_)) - { - res = info; - return false; - } - if (index < seqToShardIndex(ledgerFirstIndex)) - return false; - return true; - }); + iterateLedgerBack( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + if (auto info = ripple::getLimitedNewestLedgerInfo( + session, ledgerFirstIndex, j_)) + { + res = info; + return false; + } + return shardIndex >= seqToShardIndex(ledgerFirstIndex); + }); return res; } @@ -745,7 +819,7 @@ RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo( std::optional RelationalDBInterfaceSqliteImp::getLedgerInfoByHash(uint256 const& ledgerHash) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -753,17 +827,25 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByHash(uint256 const& ledgerHash) } /* else use shard databases */ - std::optional res; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - if (auto info = ripple::getLedgerInfoByHash(session, ledgerHash, j_)) - { - res = info; - return false; - } - return true; - }); + if (auto shardStore = app_.getShardStore()) + { + std::optional res; + auto lgrMetaSession = lgrMetaDB_->checkoutDb(); - return res; + if (auto const shardIndex = + ripple::getShardIndexforLedger(*lgrMetaSession, ledgerHash)) + { + shardStore->callForLedgerSQLByShardIndex( + *shardIndex, [&](soci::session& session) { + res = ripple::getLedgerInfoByHash(session, ledgerHash, j_); + return false; // unused + }); + } + + return res; + } + + return {}; } uint256 @@ -778,7 +860,7 @@ RelationalDBInterfaceSqliteImp::getHashByIndex(LedgerIndex ledgerIndex) /* else use shard database */ uint256 hash; - doLedger(ledgerIndex, [&](soci::session& session, std::uint32_t index) { + doLedger(ledgerIndex, [&](soci::session& session) { hash = ripple::getHashByIndex(session, ledgerIndex); return true; }); @@ -797,7 +879,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex(LedgerIndex ledgerIndex) /* else use shard database */ std::optional res; - doLedger(ledgerIndex, [&](soci::session& session, std::uint32_t index) { + doLedger(ledgerIndex, [&](soci::session& session) { res = ripple::getHashesByIndex(session, ledgerIndex, j_); return true; }); @@ -823,7 +905,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex( LedgerIndex shardMaxSeq = lastLedgerSeq(seqToShardIndex(minSeq)); if (shardMaxSeq > maxSeq) shardMaxSeq = maxSeq; - doLedger(minSeq, [&](soci::session& session, std::uint32_t index) { + doLedger(minSeq, [&](soci::session& session) { auto r = ripple::getHashesByIndex(session, minSeq, shardMaxSeq, j_); res.insert(r.begin(), r.end()); return true; @@ -837,7 +919,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex( std::vector> RelationalDBInterfaceSqliteImp::getTxHistory(LedgerIndex startIndex) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -848,7 +930,7 @@ RelationalDBInterfaceSqliteImp::getTxHistory(LedgerIndex startIndex) std::vector> txs; int quantity = 20; iterateTransactionBack( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { auto [tx, total] = ripple::getTxHistory(session, app_, startIndex, quantity, true); txs.insert(txs.end(), tx.begin(), tx.end()); @@ -875,7 +957,7 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxs( { LedgerMaster& ledgerMaster = app_.getLedgerMaster(); - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -891,8 +973,8 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxs( iterateTransactionForward( opt.minLedger ? seqToShardIndex(opt.minLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.maxLedger && index > seqToShardIndex(opt.maxLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.maxLedger && shardIndex > seqToShardIndex(opt.maxLedger)) return false; auto [r, total] = ripple::getOldestAccountTxs( session, app_, ledgerMaster, opt, limit_used, j_); @@ -924,7 +1006,7 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxs( { LedgerMaster& ledgerMaster = app_.getLedgerMaster(); - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -940,8 +1022,8 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxs( iterateTransactionBack( opt.maxLedger ? seqToShardIndex(opt.maxLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.minLedger && index < seqToShardIndex(opt.minLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger)) return false; auto [r, total] = ripple::getNewestAccountTxs( session, app_, ledgerMaster, opt, limit_used, j_); @@ -971,7 +1053,7 @@ RelationalDBInterface::MetaTxsList RelationalDBInterfaceSqliteImp::getOldestAccountTxsB( AccountTxOptions const& options) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -985,8 +1067,8 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxsB( iterateTransactionForward( opt.minLedger ? seqToShardIndex(opt.minLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.maxLedger && index > seqToShardIndex(opt.maxLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.maxLedger && shardIndex > seqToShardIndex(opt.maxLedger)) return false; auto [r, total] = ripple::getOldestAccountTxsB( session, app_, opt, limit_used, j_); @@ -1016,7 +1098,7 @@ RelationalDBInterface::MetaTxsList RelationalDBInterfaceSqliteImp::getNewestAccountTxsB( AccountTxOptions const& options) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1030,8 +1112,8 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxsB( iterateTransactionBack( opt.maxLedger ? seqToShardIndex(opt.maxLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.minLedger && index < seqToShardIndex(opt.minLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger)) return false; auto [r, total] = ripple::getNewestAccountTxsB( session, app_, opt, limit_used, j_); @@ -1077,7 +1159,7 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPage( convertBlobsToTxResult(ret, ledger_index, status, rawTxn, rawMeta, app); }; - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1099,9 +1181,9 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPage( iterateTransactionForward( opt.minLedger ? seqToShardIndex(opt.minLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { if (opt.maxLedger != UINT32_MAX && - index > seqToShardIndex(opt.minLedger)) + shardIndex > seqToShardIndex(opt.minLedger)) return false; auto [marker, total] = ripple::oldestAccountTxPage( session, @@ -1141,7 +1223,7 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPage( convertBlobsToTxResult(ret, ledger_index, status, rawTxn, rawMeta, app); }; - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1163,8 +1245,8 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPage( iterateTransactionBack( opt.maxLedger != UINT32_MAX ? seqToShardIndex(opt.maxLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.minLedger && index < seqToShardIndex(opt.minLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger)) return false; auto [marker, total] = ripple::newestAccountTxPage( session, @@ -1203,7 +1285,7 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPageB( ret.emplace_back(std::move(rawTxn), std::move(rawMeta), ledgerIndex); }; - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1225,9 +1307,9 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPageB( iterateTransactionForward( opt.minLedger ? seqToShardIndex(opt.minLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { + [&](soci::session& session, std::uint32_t shardIndex) { if (opt.maxLedger != UINT32_MAX && - index > seqToShardIndex(opt.minLedger)) + shardIndex > seqToShardIndex(opt.minLedger)) return false; auto [marker, total] = ripple::oldestAccountTxPage( session, @@ -1266,7 +1348,7 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB( ret.emplace_back(std::move(rawTxn), std::move(rawMeta), ledgerIndex); }; - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1288,8 +1370,8 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB( iterateTransactionBack( opt.maxLedger != UINT32_MAX ? seqToShardIndex(opt.maxLedger) : std::optional(), - [&](soci::session& session, std::uint32_t index) { - if (opt.minLedger && index < seqToShardIndex(opt.minLedger)) + [&](soci::session& session, std::uint32_t shardIndex) { + if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger)) return false; auto [marker, total] = ripple::newestAccountTxPage( session, @@ -1312,10 +1394,10 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB( std::variant RelationalDBInterfaceSqliteImp::getTransaction( uint256 const& id, - std::optional> const& range, + std::optional> const& range, error_code_i& ec) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1323,30 +1405,44 @@ RelationalDBInterfaceSqliteImp::getTransaction( } /* else use shard databases */ - std::variant res(TxSearched::unknown); - iterateTransactionBack( - {}, [&](soci::session& session, std::uint32_t index) { - std::optional> range1; - if (range) - { - uint32_t low = std::max(range->lower(), firstLedgerSeq(index)); - uint32_t high = std::min(range->upper(), lastLedgerSeq(index)); - if (low <= high) - range1 = ClosedInterval(low, high); - } - res = ripple::getTransaction(session, app_, id, range1, ec); - /* finish iterations if transaction found or error detected */ - return res.index() == 1 && - std::get(res) != TxSearched::unknown; - }); + if (auto shardStore = app_.getShardStore()) + { + std::variant res(TxSearched::unknown); + auto txMetaSession = txMetaDB_->checkoutDb(); - return res; + if (auto const shardIndex = + ripple::getShardIndexforTransaction(*txMetaSession, id)) + { + shardStore->callForTransactionSQLByShardIndex( + *shardIndex, [&](soci::session& session) { + std::optional> range1; + if (range) + { + std::uint32_t const low = std::max( + range->lower(), firstLedgerSeq(*shardIndex)); + std::uint32_t const high = std::min( + range->upper(), lastLedgerSeq(*shardIndex)); + if (low <= high) + range1 = ClosedInterval(low, high); + } + res = ripple::getTransaction(session, app_, id, range1, ec); + + return res.index() == 1 && + std::get(res) != + TxSearched::unknown; // unused + }); + } + + return res; + } + + return {TxSearched::unknown}; } bool RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { auto db = checkoutLedger(); @@ -1355,7 +1451,7 @@ RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config) /* else use shard databases */ return iterateLedgerBack( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { return ripple::dbHasSpace(session, config, j_); }); } @@ -1363,7 +1459,7 @@ RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config) bool RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config) { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { auto db = checkoutTransaction(); @@ -1372,7 +1468,7 @@ RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config) /* else use shard databases */ return iterateTransactionBack( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { return ripple::dbHasSpace(session, config, j_); }); } @@ -1380,7 +1476,7 @@ RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config) int RelationalDBInterfaceSqliteImp::getKBUsedAll() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { return ripple::getKBUsedAll(lgrdb_->getSession()); @@ -1388,17 +1484,18 @@ RelationalDBInterfaceSqliteImp::getKBUsedAll() /* else use shard databases */ int sum = 0; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - sum += ripple::getKBUsedAll(session); - return true; - }); + iterateLedgerBack( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + sum += ripple::getKBUsedAll(session); + return true; + }); return sum; } int RelationalDBInterfaceSqliteImp::getKBUsedLedger() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsLedger()) { return ripple::getKBUsedDB(lgrdb_->getSession()); @@ -1406,17 +1503,18 @@ RelationalDBInterfaceSqliteImp::getKBUsedLedger() /* else use shard databases */ int sum = 0; - iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) { - sum += ripple::getKBUsedDB(session); - return true; - }); + iterateLedgerBack( + {}, [&](soci::session& session, std::uint32_t shardIndex) { + sum += ripple::getKBUsedDB(session); + return true; + }); return sum; } int RelationalDBInterfaceSqliteImp::getKBUsedTransaction() { - /* if databases exists, use it */ + /* if database exists, use it */ if (existsTransaction()) { return ripple::getKBUsedDB(txdb_->getSession()); @@ -1425,13 +1523,25 @@ RelationalDBInterfaceSqliteImp::getKBUsedTransaction() /* else use shard databases */ int sum = 0; iterateTransactionBack( - {}, [&](soci::session& session, std::uint32_t index) { + {}, [&](soci::session& session, std::uint32_t shardIndex) { sum += ripple::getKBUsedDB(session); return true; }); return sum; } +void +RelationalDBInterfaceSqliteImp::closeLedgerDB() +{ + lgrdb_.reset(); +} + +void +RelationalDBInterfaceSqliteImp::closeTransactionDB() +{ + txdb_.reset(); +} + std::unique_ptr getRelationalDBInterfaceSqlite( Application& app, diff --git a/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h b/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h index bb2dd0e41d..65b52f0e03 100644 --- a/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h +++ b/src/ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h @@ -283,6 +283,18 @@ public: */ virtual int getKBUsedTransaction() = 0; + + /** + * @brief Closes the ledger database + */ + virtual void + closeLedgerDB() = 0; + + /** + * @brief Closes the transaction database + */ + virtual void + closeTransactionDB() = 0; }; } // namespace ripple diff --git a/src/ripple/app/rdb/impl/RelationalDBInterface_shards.cpp b/src/ripple/app/rdb/impl/RelationalDBInterface_shards.cpp index 70a65ce122..c433b70254 100644 --- a/src/ripple/app/rdb/impl/RelationalDBInterface_shards.cpp +++ b/src/ripple/app/rdb/impl/RelationalDBInterface_shards.cpp @@ -32,6 +32,130 @@ namespace ripple { +DatabasePair +makeMetaDBs( + Config const& config, + DatabaseCon::Setup const& setup, + DatabaseCon::CheckpointerSetup const& checkpointerSetup) +{ + // ledger meta database + auto lgrMetaDB{std::make_unique( + setup, + LgrMetaDBName, + LgrMetaDBPragma, + LgrMetaDBInit, + checkpointerSetup)}; + + if (config.useTxTables()) + { + // transaction meta database + auto txMetaDB{std::make_unique( + setup, + TxMetaDBName, + TxMetaDBPragma, + TxMetaDBInit, + checkpointerSetup)}; + + return {std::move(lgrMetaDB), std::move(txMetaDB)}; + } + + return {std::move(lgrMetaDB), nullptr}; +} + +bool +saveLedgerMeta( + std::shared_ptr const& ledger, + Application& app, + soci::session& lgrMetaSession, + soci::session& txnMetaSession, + std::uint32_t const shardIndex) +{ + std::string_view constexpr lgrSQL = + R"sql(INSERT OR REPLACE INTO LedgerMeta VALUES + (:ledgerHash,:shardIndex);)sql"; + + auto const hash = to_string(ledger->info().hash); + lgrMetaSession << lgrSQL, soci::use(hash), soci::use(shardIndex); + + if (app.config().useTxTables()) + { + AcceptedLedger::pointer const aLedger = [&app, ledger] { + try + { + auto aLedger = + app.getAcceptedLedgerCache().fetch(ledger->info().hash); + if (!aLedger) + { + aLedger = std::make_shared(ledger, app); + app.getAcceptedLedgerCache().canonicalize_replace_client( + ledger->info().hash, aLedger); + } + + return aLedger; + } + catch (std::exception const&) + { + JLOG(app.journal("Ledger").warn()) + << "An accepted ledger was missing nodes"; + } + + return AcceptedLedger::pointer{nullptr}; + }(); + + if (!aLedger) + return false; + + soci::transaction tr(txnMetaSession); + + for (auto const& [_, acceptedLedgerTx] : aLedger->getMap()) + { + (void)_; + + std::string_view constexpr txnSQL = + R"sql(INSERT OR REPLACE INTO TransactionMeta VALUES + (:transactionID,:shardIndex);)sql"; + + auto const transactionID = + to_string(acceptedLedgerTx->getTransactionID()); + + txnMetaSession << txnSQL, soci::use(transactionID), + soci::use(shardIndex); + } + + tr.commit(); + } + + return true; +} + +std::optional +getShardIndexforLedger(soci::session& session, LedgerHash const& hash) +{ + std::uint32_t shardIndex; + session << "SELECT ShardIndex FROM LedgerMeta WHERE LedgerHash = '" << hash + << "';", + soci::into(shardIndex); + + if (!session.got_data()) + return std::nullopt; + + return shardIndex; +} + +std::optional +getShardIndexforTransaction(soci::session& session, TxID const& id) +{ + std::uint32_t shardIndex; + session << "SELECT ShardIndex FROM TransactionMeta WHERE TransID = '" << id + << "';", + soci::into(shardIndex); + + if (!session.got_data()) + return std::nullopt; + + return shardIndex; +} + DatabasePair makeShardCompleteLedgerDBs( Config const& config, diff --git a/src/ripple/nodestore/DatabaseShard.h b/src/ripple/nodestore/DatabaseShard.h index 7c0dfaf7ab..f45d265a1d 100644 --- a/src/ripple/nodestore/DatabaseShard.h +++ b/src/ripple/nodestore/DatabaseShard.h @@ -128,36 +128,48 @@ public: virtual void setStored(std::shared_ptr const& ledger) = 0; - /** - * @brief callForLedgerSQL Checkouts ledger database for shard - * containing given ledger and calls given callback function passing - * shard index and session with the database to it. - * @param ledgerSeq Ledger sequence. - * @param callback Callback function to call. - * @return Value returned by callback function. - */ + /** Invoke a callback on the SQLite db holding the + corresponding ledger + + @return Value returned by callback function. + */ virtual bool - callForLedgerSQL( + callForLedgerSQLByLedgerSeq( LedgerIndex ledgerSeq, - std::function const& - callback) = 0; + std::function const& callback) = 0; + + /** Invoke a callback on the ledger SQLite db for the + corresponding shard + + @return Value returned by callback function. + */ + virtual bool + callForLedgerSQLByShardIndex( + std::uint32_t shardIndex, + std::function const& callback) = 0; + + /** Invoke a callback on the transaction SQLite db + for the corresponding ledger + + @return Value returned by callback function. + */ + virtual bool + callForTransactionSQLByLedgerSeq( + LedgerIndex ledgerSeq, + std::function const& callback) = 0; + + /** Invoke a callback on the transaction SQLite db + for the corresponding shard + + @return Value returned by callback function. + */ + virtual bool + callForTransactionSQLByShardIndex( + std::uint32_t shardIndex, + std::function const& callback) = 0; /** - * @brief callForTransactionSQL Checkouts transaction database for shard - * containing given ledger and calls given callback function passing - * shard index and session with the database to it. - * @param ledgerSeq Ledger sequence. - * @param callback Callback function to call. - * @return Value returned by callback function. - */ - virtual bool - callForTransactionSQL( - LedgerIndex ledgerSeq, - std::function const& - callback) = 0; - - /** - * @brief iterateLedgerSQLsForward Checkouts ledger databases for all + * @brief iterateLedgerSQLsForward Checks out ledger databases for all * shards in ascending order starting from given shard index until * shard with the largest index visited or callback returned false. * For each visited shard calls given callback function passing @@ -170,11 +182,12 @@ public: virtual bool iterateLedgerSQLsForward( std::optional minShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) = 0; /** - * @brief iterateTransactionSQLsForward Checkouts transaction databases for + * @brief iterateTransactionSQLsForward Checks out transaction databases for * all shards in ascending order starting from given shard index * until shard with the largest index visited or callback returned * false. For each visited shard calls given callback function @@ -187,11 +200,12 @@ public: virtual bool iterateTransactionSQLsForward( std::optional minShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) = 0; /** - * @brief iterateLedgerSQLsBack Checkouts ledger databases for + * @brief iterateLedgerSQLsBack Checks out ledger databases for * all shards in descending order starting from given shard index * until shard with the smallest index visited or callback returned * false. For each visited shard calls given callback function @@ -204,11 +218,12 @@ public: virtual bool iterateLedgerSQLsBack( std::optional maxShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) = 0; /** - * @brief iterateTransactionSQLsBack Checkouts transaction databases for + * @brief iterateTransactionSQLsBack Checks out transaction databases for * all shards in descending order starting from given shard index * until shard with the smallest index visited or callback returned * false. For each visited shard calls given callback function @@ -221,7 +236,8 @@ public: virtual bool iterateTransactionSQLsBack( std::optional maxShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) = 0; /** Query information about shards held diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.cpp b/src/ripple/nodestore/impl/DatabaseShardImp.cpp index bd5e0486d1..14c2448d3f 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.cpp +++ b/src/ripple/nodestore/impl/DatabaseShardImp.cpp @@ -895,13 +895,13 @@ DatabaseShardImp::doImportDatabase() std::uint32_t const lastSeq = std::max(firstSeq, lastLedgerSeq(shardIndex)); - // Verify SQLite ledgers are in the node store - { - auto const ledgerHashes{ - app_.getRelationalDBInterface().getHashesByIndex( - firstSeq, lastSeq)}; - if (ledgerHashes.size() != maxLedgers(shardIndex)) - continue; + // Verify SQLite ledgers are in the node store + { + auto const ledgerHashes{ + app_.getRelationalDBInterface().getHashesByIndex( + firstSeq, lastSeq)}; + if (ledgerHashes.size() != maxLedgers(shardIndex)) + continue; auto& source = app_.getNodeStore(); bool valid{true}; @@ -1948,39 +1948,48 @@ DatabaseShardImp::checkHistoricalPaths(std::lock_guard const&) const } bool -DatabaseShardImp::callForLedgerSQL( +DatabaseShardImp::callForLedgerSQLByLedgerSeq( LedgerIndex ledgerSeq, - std::function const& - callback) + std::function const& callback) { - std::lock_guard lock(mutex_); - auto shardIndex = seqToShardIndex(ledgerSeq); - - if (shards_.count(shardIndex) && - shards_[shardIndex]->getState() == ShardState::finalized) - { - return shards_[shardIndex]->callForLedgerSQL(callback); - } - - return false; + return callForLedgerSQLByShardIndex(seqToShardIndex(ledgerSeq), callback); } bool -DatabaseShardImp::callForTransactionSQL( - LedgerIndex ledgerSeq, - std::function const& - callback) +DatabaseShardImp::callForLedgerSQLByShardIndex( + const uint32_t shardIndex, + std::function const& callback) { std::lock_guard lock(mutex_); - auto shardIndex = seqToShardIndex(ledgerSeq); - if (shards_.count(shardIndex) && - shards_[shardIndex]->getState() == ShardState::finalized) - { - return shards_[shardIndex]->callForTransactionSQL(callback); - } + auto const it{shards_.find(shardIndex)}; - return false; + return it != shards_.end() && + it->second->getState() == ShardState::finalized && + it->second->callForLedgerSQL(callback); +} + +bool +DatabaseShardImp::callForTransactionSQLByLedgerSeq( + LedgerIndex ledgerSeq, + std::function const& callback) +{ + return callForTransactionSQLByShardIndex( + seqToShardIndex(ledgerSeq), callback); +} + +bool +DatabaseShardImp::callForTransactionSQLByShardIndex( + std::uint32_t const shardIndex, + std::function const& callback) +{ + std::lock_guard lock(mutex_); + + auto const it{shards_.find(shardIndex)}; + + return it != shards_.end() && + it->second->getState() == ShardState::finalized && + it->second->callForTransactionSQL(callback); } bool @@ -2010,10 +2019,11 @@ DatabaseShardImp::iterateShardsForward( return true; } + bool DatabaseShardImp::iterateLedgerSQLsForward( std::optional minShardIndex, - std::function const& + std::function const& callback) { return iterateShardsForward( @@ -2025,7 +2035,7 @@ DatabaseShardImp::iterateLedgerSQLsForward( bool DatabaseShardImp::iterateTransactionSQLsForward( std::optional minShardIndex, - std::function const& + std::function const& callback) { return iterateShardsForward( @@ -2066,7 +2076,7 @@ DatabaseShardImp::iterateShardsBack( bool DatabaseShardImp::iterateLedgerSQLsBack( std::optional maxShardIndex, - std::function const& + std::function const& callback) { return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool { @@ -2077,7 +2087,7 @@ DatabaseShardImp::iterateLedgerSQLsBack( bool DatabaseShardImp::iterateTransactionSQLsBack( std::optional maxShardIndex, - std::function const& + std::function const& callback) { return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool { diff --git a/src/ripple/nodestore/impl/DatabaseShardImp.h b/src/ripple/nodestore/impl/DatabaseShardImp.h index 1a3a688ff6..fde27260da 100644 --- a/src/ripple/nodestore/impl/DatabaseShardImp.h +++ b/src/ripple/nodestore/impl/DatabaseShardImp.h @@ -139,39 +139,51 @@ public: getDatabaseImportSequence() const override; bool - callForLedgerSQL( + callForLedgerSQLByLedgerSeq( LedgerIndex ledgerSeq, - std::function const& - callback) override; + std::function const& callback) override; bool - callForTransactionSQL( + callForLedgerSQLByShardIndex( + std::uint32_t const shardIndex, + std::function const& callback) override; + + bool + callForTransactionSQLByLedgerSeq( LedgerIndex ledgerSeq, - std::function const& - callback) override; + std::function const& callback) override; + + bool + callForTransactionSQLByShardIndex( + std::uint32_t const shardIndex, + std::function const& callback) override; bool iterateLedgerSQLsForward( std::optional minShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) override; bool iterateTransactionSQLsForward( std::optional minShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) override; bool iterateLedgerSQLsBack( std::optional maxShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) override; bool iterateTransactionSQLsBack( std::optional maxShardIndex, - std::function const& + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& callback) override; private: diff --git a/src/ripple/nodestore/impl/Shard.cpp b/src/ripple/nodestore/impl/Shard.cpp index a66e00cb66..6558720613 100644 --- a/src/ripple/nodestore/impl/Shard.cpp +++ b/src/ripple/nodestore/impl/Shard.cpp @@ -1238,28 +1238,19 @@ Shard::makeBackendCount() } bool -Shard::callForLedgerSQL( - std::function const& - callback) +Shard::doCallForSQL( + std::function const& callback, + LockedSociSession&& db) { - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return false; - - auto db = lgrSQLiteDB_->checkoutDb(); - return callback(*db, index_); + return callback(*db); } bool -Shard::callForTransactionSQL( - std::function const& - callback) +Shard::doCallForSQL( + std::function const& + callback, + LockedSociSession&& db) { - auto const scopedCount{makeBackendCount()}; - if (!scopedCount) - return false; - - auto db = txSQLiteDB_->checkoutDb(); return callback(*db, index_); } diff --git a/src/ripple/nodestore/impl/Shard.h b/src/ripple/nodestore/impl/Shard.h index 54afe618d8..0f78f82c12 100644 --- a/src/ripple/nodestore/impl/Shard.h +++ b/src/ripple/nodestore/impl/Shard.h @@ -215,29 +215,29 @@ public: return to_string(acquireInfo_->storedSeqs); } - /** - * @brief callForLedgerSQL Checks out ledger database for the shard and - * calls given callback function passing shard index and session - * with the database to it. - * @param callback Callback function to call. - * @return Value returned by callback function. - */ - bool - callForLedgerSQL( - std::function const& - callback); + /** Invoke a callback on the ledger SQLite db - /** - * @brief callForTransactionSQL Checks out transaction database for the - * shard and calls given callback function passing shard index and - * session with the database to it. - * @param callback Callback function to call. - * @return Value returned by callback function. - */ + @param callback Callback function to call. + @return Value returned by callback function. + */ + template bool - callForTransactionSQL( - std::function const& - callback); + callForLedgerSQL(std::function const& callback) + { + return callForSQL(callback, lgrSQLiteDB_->checkoutDb()); + } + + /** Invoke a callback on the transaction SQLite db + + @param callback Callback function to call. + @return Value returned by callback function. + */ + template + bool + callForTransactionSQL(std::function const& callback) + { + return callForSQL(callback, txSQLiteDB_->checkoutDb()); + } // Current shard version static constexpr std::uint32_t version{2}; @@ -389,6 +389,35 @@ private: // Open databases if they are closed [[nodiscard]] Shard::Count makeBackendCount(); + + // Invoke a callback on the supplied session parameter + template + bool + callForSQL( + std::function const& callback, + LockedSociSession&& db) + { + auto const scopedCount{makeBackendCount()}; + if (!scopedCount) + return false; + + return doCallForSQL(callback, std::move(db)); + } + + // Invoke a callback that accepts a SQLite session parameter + bool + doCallForSQL( + std::function const& callback, + LockedSociSession&& db); + + // Invoke a callback that accepts a SQLite session and the + // shard index as parameters + bool + doCallForSQL( + std::function< + bool(soci::session& session, std::uint32_t shardIndex)> const& + callback, + LockedSociSession&& db); }; } // namespace NodeStore diff --git a/src/test/nodestore/DatabaseShard_test.cpp b/src/test/nodestore/DatabaseShard_test.cpp index 870e6cb593..d4f645b771 100644 --- a/src/test/nodestore/DatabaseShard_test.cpp +++ b/src/test/nodestore/DatabaseShard_test.cpp @@ -20,6 +20,7 @@ #include #include #include +#include #include #include #include @@ -1729,6 +1730,101 @@ class DatabaseShard_test : public TestBase BEAST_EXPECT(msg.peerchain_size() == 0); } + void + testRelationalDBInterfaceSqlite(std::uint64_t const seedValue) + { + testcase("Relational DB Interface SQLite"); + + using namespace test::jtx; + + beast::temp_dir shardDir; + Env env{*this, testConfig(shardDir.path())}; + + auto shardStore{env.app().getShardStore()}; + BEAST_EXPECT(shardStore); + + auto const shardCount = 3; + TestData data(seedValue, 3, shardCount); + if (!BEAST_EXPECT(data.makeLedgers(env))) + return; + + BEAST_EXPECT(shardStore->getShardInfo()->finalized().empty()); + BEAST_EXPECT(shardStore->getShardInfo()->incompleteToString().empty()); + + auto rdb = dynamic_cast( + &env.app().getRelationalDBInterface()); + + BEAST_EXPECT(rdb); + + for (std::uint32_t i = 0; i < shardCount; ++i) + { + // Populate the shard store + + auto n = createShard(data, *shardStore, shardCount); + if (!BEAST_EXPECT(n && *n >= 1 && *n <= shardCount)) + return; + } + + // Close these databases to force the RelationalDBInterfaceSqlite + // to use the shard databases and lookup tables. + rdb->closeLedgerDB(); + rdb->closeTransactionDB(); + + // Lambda for comparing Ledger objects + auto infoCmp = [](auto const& a, auto const& b) { + return a.hash == b.hash && a.txHash == b.txHash && + a.accountHash == b.accountHash && + a.parentHash == b.parentHash && a.drops == b.drops && + a.accepted == b.accepted && a.closeFlags == b.closeFlags && + a.closeTimeResolution == b.closeTimeResolution && + a.closeTime == b.closeTime; + }; + + for (auto const ledger : data.ledgers_) + { + // Compare each test ledger to the data retrieved + // from the RelationalDBInterfaceSqlite class + + if (shardStore->seqToShardIndex(ledger->seq()) < + shardStore->earliestShardIndex() || + ledger->info().seq < shardStore->earliestLedgerSeq()) + continue; + + auto info = rdb->getLedgerInfoByHash(ledger->info().hash); + + BEAST_EXPECT(info); + BEAST_EXPECT(infoCmp(*info, ledger->info())); + + for (auto const& transaction : ledger->txs) + { + // Compare each test transaction to the data + // retrieved from the RelationalDBInterfaceSqlite + // class + + error_code_i error{rpcSUCCESS}; + + auto reference = rdb->getTransaction( + transaction.first->getTransactionID(), {}, error); + + BEAST_EXPECT(error == rpcSUCCESS); + if (!BEAST_EXPECT(reference.index() == 0)) + continue; + + auto txn = std::get<0>(reference).first->getSTransaction(); + + BEAST_EXPECT( + transaction.first->getFullText() == txn->getFullText()); + } + } + + // Create additional ledgers to test a pathway in + // 'ripple::saveLedgerMeta' wherein fetching the + // accepted ledger fails + data = TestData(seedValue * 2, 4, 1); + if (!BEAST_EXPECT(data.makeLedgers(env, shardCount))) + return; + } + public: DatabaseShard_test() : journal_("DatabaseShard_test", *this) { @@ -1758,6 +1854,7 @@ public: testPrepareWithHistoricalPaths(seedValue()); testOpenShardManagement(seedValue()); testShardInfo(seedValue()); + testRelationalDBInterfaceSqlite(seedValue()); } }; diff --git a/src/test/nodestore/Database_test.cpp b/src/test/nodestore/Database_test.cpp index ecf3e82145..0cf2afb21a 100644 --- a/src/test/nodestore/Database_test.cpp +++ b/src/test/nodestore/Database_test.cpp @@ -621,11 +621,7 @@ public: { std::unique_ptr db = Manager::instance().make_Database( - megabytes(4), - scheduler, - 2, - nodeParams, - journal_); + megabytes(4), scheduler, 2, nodeParams, journal_); BEAST_EXPECT( db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD); } @@ -636,11 +632,7 @@ public: nodeParams.set("ledgers_per_shard", "100"); std::unique_ptr db = Manager::instance().make_Database( - megabytes(4), - scheduler, - 2, - nodeParams, - journal_); + megabytes(4), scheduler, 2, nodeParams, journal_); } catch (std::runtime_error const& e) { @@ -651,11 +643,7 @@ public: // Set a valid ledgers per shard nodeParams.set("ledgers_per_shard", "256"); std::unique_ptr db = Manager::instance().make_Database( - megabytes(4), - scheduler, - 2, - nodeParams, - journal_); + megabytes(4), scheduler, 2, nodeParams, journal_); // Verify database uses the ledgers per shard BEAST_EXPECT(db->ledgersPerShard() == 256);