Add lookup DBs to the Relational DB Interface

* Create SQLite database for mapping transaction IDs to shard indexes
* Create SQLite database for mapping ledger hashes to shard indexes
* Create additional test cases for the shard database
This commit is contained in:
Devon White
2021-02-12 14:37:01 -05:00
committed by manojsdoshi
parent 80c2302fd3
commit d95aab1139
13 changed files with 792 additions and 301 deletions

View File

@@ -114,6 +114,54 @@ inline constexpr std::array<char const*, 8> TxDBInit{
////////////////////////////////////////////////////////////////////////////////
// The Ledger Meta database maps ledger hashes to shard indexes
inline constexpr auto LgrMetaDBName{"ledger_meta.db"};
inline constexpr std::array LgrMetaDBPragma
{
"PRAGMA page_size=4096;", "PRAGMA journal_size_limit=1582080;",
"PRAGMA max_page_count=2147483646;",
#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP)
"PRAGMA mmap_size=17179869184;"
#endif
};
inline constexpr std::array<char const*, 3> LgrMetaDBInit{
{"BEGIN TRANSACTION;",
"CREATE TABLE IF NOT EXISTS LedgerMeta ( \
LedgerHash CHARACTER(64) PRIMARY KEY, \
ShardIndex INTEGER \
);",
"END TRANSACTION;"}};
////////////////////////////////////////////////////////////////////////////////
// Transaction Meta database maps transaction IDs to shard indexes
inline constexpr auto TxMetaDBName{"transaction_meta.db"};
inline constexpr std::array TxMetaDBPragma
{
"PRAGMA page_size=4096;", "PRAGMA journal_size_limit=1582080;",
"PRAGMA max_page_count=2147483646;",
#if (ULONG_MAX > UINT_MAX) && !defined(NO_SQLITE_MMAP)
"PRAGMA mmap_size=17179869184;"
#endif
};
inline constexpr std::array<char const*, 3> TxMetaDBInit{
{"BEGIN TRANSACTION;",
"CREATE TABLE IF NOT EXISTS TransactionMeta ( \
TransID CHARACTER(64) PRIMARY KEY, \
ShardIndex INTEGER \
);",
"END TRANSACTION;"}};
////////////////////////////////////////////////////////////////////////////////
// Temporary database used with an incomplete shard that is being acquired
inline constexpr auto AcquireShardDBName{"acquire.db"};

View File

@@ -28,9 +28,8 @@
#include <ripple/protocol/STTx.h>
#include <ripple/protocol/TER.h>
#include <boost/optional.hpp>
#include <boost/variant.hpp>
#include <optional>
#include <variant>
namespace ripple {

View File

@@ -23,6 +23,7 @@
#include <ripple/app/ledger/Ledger.h>
#include <ripple/app/rdb/RelationalDBInterface.h>
#include <ripple/core/Config.h>
#include <ripple/protocol/RippleLedgerHash.h>
#include <boost/filesystem.hpp>
namespace ripple {
@@ -35,6 +36,60 @@ struct DatabasePair
/* Shard DB */
/**
* @brief makeMetaDBs Opens ledger and transaction 'meta' databases which
* map ledger hashes and transaction IDs to the index of the shard
* that holds the ledger or transaction.
* @param config Config object.
* @param setup Path to database and opening parameters.
* @param checkpointerSetup Database checkpointer setup.
* @return Struct DatabasePair which contains unique pointers to the ledger
* and transaction databases.
*/
DatabasePair
makeMetaDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup);
/**
* @brief saveLedgerMeta Stores (transaction ID -> shard index) and
* (ledger hash -> shard index) mappings in the meta databases.
* @param ledger The ledger.
* @param app Application object.
* @param lgrMetaSession Session to ledger meta database.
* @param txnMetaSession Session to transaction meta database.
* @param shardIndex The index of the shard that contains this ledger.
* @return True on success.
*/
bool
saveLedgerMeta(
std::shared_ptr<Ledger const> const& ledger,
Application& app,
soci::session& lgrMetaSession,
soci::session& txnMetaSession,
std::uint32_t shardIndex);
/**
* @brief getShardIndexforLedger Queries the ledger meta database to
* retrieve the index of the shard that contains this ledger.
* @param session Session to the database.
* @param hash Hash of the ledger.
* @return The index of the shard on success, otherwise an unseated value.
*/
std::optional<std::uint32_t>
getShardIndexforLedger(soci::session& session, LedgerHash const& hash);
/**
* @brief getShardIndexforTransaction Queries the transaction meta database to
* retrieve the index of the shard that contains this transaction.
* @param session Session to the database.
* @param id ID of the transaction.
* @return The index of the shard on success, otherwise an unseated value.
*/
std::optional<std::uint32_t>
getShardIndexforTransaction(soci::session& session, TxID const& id);
/**
* @brief makeShardCompleteLedgerDBs Opens shard databases for already
* verified shard and returns its descriptors.

View File

@@ -25,6 +25,7 @@
#include <ripple/app/misc/impl/AccountTxPaging.h>
#include <ripple/app/rdb/RelationalDBInterface_nodes.h>
#include <ripple/app/rdb/RelationalDBInterface_postgres.h>
#include <ripple/app/rdb/RelationalDBInterface_shards.h>
#include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
#include <ripple/basics/BasicConfig.h>
#include <ripple/basics/StringUtilities.h>
@@ -48,16 +49,27 @@ public:
: app_(app), j_(app_.journal("Ledger"))
{
DatabaseCon::Setup setup = setup_DatabaseCon(config, j_);
auto res = makeLedgerDBs(
config,
setup,
DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()});
if (!res)
if (!makeLedgerDBs(
config,
setup,
DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()}))
{
JLOG(app_.journal("Application").fatal())
<< "AccountTransactions database "
"should not have a primary key";
Throw<std::exception>();
JLOG(app_.journal("RelationalDBInterfaceSqlite").fatal())
<< "AccountTransactions database should not have a primary key";
Throw<std::runtime_error>(
"AccountTransactions database initialization failed.");
}
if (app.getShardStore() &&
!makeMetaDBs(
config,
setup,
DatabaseCon::CheckpointerSetup{&jobQueue, &app_.logs()}))
{
JLOG(app_.journal("RelationalDBInterfaceSqlite").fatal())
<< "Error during meta DB init";
Throw<std::runtime_error>(
"Shard meta database initialization failed.");
}
}
@@ -153,7 +165,7 @@ public:
std::variant<AccountTx, TxSearched>
getTransaction(
uint256 const& id,
std::optional<ClosedInterval<uint32_t>> const& range,
std::optional<ClosedInterval<std::uint32_t>> const& range,
error_code_i& ec) override;
bool
@@ -171,10 +183,17 @@ public:
int
getKBUsedTransaction() override;
void
closeLedgerDB() override;
void
closeTransactionDB() override;
private:
Application& app_;
beast::Journal j_;
std::unique_ptr<DatabaseCon> lgrdb_, txdb_;
std::unique_ptr<DatabaseCon> lgrMetaDB_, txMetaDB_;
/**
* @brief makeLedgerDBs Opens node ledger and transaction databases,
@@ -190,6 +209,20 @@ private:
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup);
/**
* @brief makeMetaDBs Opens shard index lookup databases, and saves
* their descriptors into internal variables.
* @param config Config object.
* @param setup Path to database and other opening parameters.
* @param checkpointerSetup Checkpointer parameters.
* @return True if node databases opened successfully.
*/
bool
makeMetaDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup);
/**
* @brief seqToShardIndex Converts ledgers sequence to shard index.
* @param ledgerSeq Ledger sequence.
@@ -244,7 +277,7 @@ private:
}
/**
* @brief checkoutTransaction Checkouts and returns node ledger DB.
* @brief checkoutTransaction Checks out and returns node ledger DB.
* @return Session to node ledger DB.
*/
auto
@@ -254,7 +287,7 @@ private:
}
/**
* @brief checkoutTransaction Checkouts and returns node transaction DB.
* @brief checkoutTransaction Checks out and returns node transaction DB.
* @return Session to node transaction DB.
*/
auto
@@ -264,7 +297,7 @@ private:
}
/**
* @brief doLedger Checkouts ledger database for shard
* @brief doLedger Checks out ledger database for shard
* containing given ledger and calls given callback function passing
* shard index and session with the database to it.
* @param ledgerSeq Ledger sequence.
@@ -274,14 +307,14 @@ private:
bool
doLedger(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
std::function<bool(soci::session& session)> const& callback)
{
return app_.getShardStore()->callForLedgerSQL(ledgerSeq, callback);
return app_.getShardStore()->callForLedgerSQLByLedgerSeq(
ledgerSeq, callback);
}
/**
* @brief doTransaction Checkouts transaction database for shard
* @brief doTransaction Checks out transaction database for shard
* containing given ledger and calls given callback function passing
* shard index and session with the database to it.
* @param ledgerSeq Ledger sequence.
@@ -291,14 +324,14 @@ private:
bool
doTransaction(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
std::function<bool(soci::session& session)> const& callback)
{
return app_.getShardStore()->callForTransactionSQL(ledgerSeq, callback);
return app_.getShardStore()->callForTransactionSQLByLedgerSeq(
ledgerSeq, callback);
}
/**
* @brief iterateLedgerForward Checkouts ledger databases for
* @brief iterateLedgerForward Checks out ledger databases for
* all shards in ascending order starting from given shard index
* until shard with the largest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -311,7 +344,8 @@ private:
bool
iterateLedgerForward(
std::optional<std::uint32_t> firstIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return app_.getShardStore()->iterateLedgerSQLsForward(
@@ -319,7 +353,7 @@ private:
}
/**
* @brief iterateTransactionForward Checkouts transaction databases for
* @brief iterateTransactionForward Checks out transaction databases for
* all shards in ascending order starting from given shard index
* until shard with the largest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -332,7 +366,8 @@ private:
bool
iterateTransactionForward(
std::optional<std::uint32_t> firstIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return app_.getShardStore()->iterateLedgerSQLsForward(
@@ -340,7 +375,7 @@ private:
}
/**
* @brief iterateLedgerBack Checkouts ledger databases for
* @brief iterateLedgerBack Checks out ledger databases for
* all shards in descending order starting from given shard index
* until shard with the smallest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -353,7 +388,8 @@ private:
bool
iterateLedgerBack(
std::optional<std::uint32_t> firstIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return app_.getShardStore()->iterateLedgerSQLsBack(
@@ -361,7 +397,7 @@ private:
}
/**
* @brief iterateTransactionForward Checkouts transaction databases for
* @brief iterateTransactionForward Checks out transaction databases for
* all shards in descending order starting from given shard index
* until shard with the smallest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -374,7 +410,8 @@ private:
bool
iterateTransactionBack(
std::optional<std::uint32_t> firstIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return app_.getShardStore()->iterateLedgerSQLsBack(
@@ -395,6 +432,21 @@ RelationalDBInterfaceSqliteImp::makeLedgerDBs(
return res;
}
bool
RelationalDBInterfaceSqliteImp::makeMetaDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup)
{
auto [lgrMetaDB, txMetaDB] =
ripple::makeMetaDBs(config, setup, checkpointerSetup);
txMetaDB_ = std::move(txMetaDB);
lgrMetaDB_ = std::move(lgrMetaDB);
return true;
}
std::optional<LedgerIndex>
RelationalDBInterfaceSqliteImp::getMinLedgerSeq()
{
@@ -407,10 +459,11 @@ RelationalDBInterfaceSqliteImp::getMinLedgerSeq()
/* else use shard databases */
std::optional<LedgerIndex> res;
iterateLedgerForward({}, [&](soci::session& session, std::uint32_t index) {
res = ripple::getMinLedgerSeq(session, TableType::Ledgers);
return !res;
});
iterateLedgerForward(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
res = ripple::getMinLedgerSeq(session, TableType::Ledgers);
return !res;
});
return res;
}
@@ -427,7 +480,7 @@ RelationalDBInterfaceSqliteImp::getTransactionsMinLedgerSeq()
/* else use shard databases */
std::optional<LedgerIndex> res;
iterateTransactionForward(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
res = ripple::getMinLedgerSeq(session, TableType::Transactions);
return !res;
});
@@ -447,7 +500,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionsMinLedgerSeq()
/* else use shard databases */
std::optional<LedgerIndex> res;
iterateTransactionForward(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
res = ripple::getMinLedgerSeq(
session, TableType::AccountTransactions);
return !res;
@@ -467,10 +520,11 @@ RelationalDBInterfaceSqliteImp::getMaxLedgerSeq()
/* else use shard databases */
std::optional<LedgerIndex> res;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
res = ripple::getMaxLedgerSeq(session, TableType::Ledgers);
return !res;
});
iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
res = ripple::getMaxLedgerSeq(session, TableType::Ledgers);
return !res;
});
return res;
}
@@ -478,7 +532,7 @@ void
RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq(
LedgerIndex ledgerSeq)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -487,7 +541,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq(
}
/* else use shard database */
doTransaction(ledgerSeq, [&](soci::session& session, std::uint32_t index) {
doTransaction(ledgerSeq, [&](soci::session& session) {
ripple::deleteByLedgerSeq(session, TableType::Transactions, ledgerSeq);
return true;
});
@@ -496,7 +550,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionByLedgerSeq(
void
RelationalDBInterfaceSqliteImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -507,7 +561,7 @@ RelationalDBInterfaceSqliteImp::deleteBeforeLedgerSeq(LedgerIndex ledgerSeq)
/* else use shard databases */
iterateLedgerBack(
seqToShardIndex(ledgerSeq),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
ripple::deleteBeforeLedgerSeq(
session, TableType::Ledgers, ledgerSeq);
return true;
@@ -518,7 +572,7 @@ void
RelationalDBInterfaceSqliteImp::deleteTransactionsBeforeLedgerSeq(
LedgerIndex ledgerSeq)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -529,7 +583,7 @@ RelationalDBInterfaceSqliteImp::deleteTransactionsBeforeLedgerSeq(
/* else use shard databases */
iterateTransactionBack(
seqToShardIndex(ledgerSeq),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
ripple::deleteBeforeLedgerSeq(
session, TableType::Transactions, ledgerSeq);
return true;
@@ -540,7 +594,7 @@ void
RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq(
LedgerIndex ledgerSeq)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -552,7 +606,7 @@ RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq(
/* else use shard databases */
iterateTransactionBack(
seqToShardIndex(ledgerSeq),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
ripple::deleteBeforeLedgerSeq(
session, TableType::AccountTransactions, ledgerSeq);
return true;
@@ -562,7 +616,7 @@ RelationalDBInterfaceSqliteImp::deleteAccountTransactionsBeforeLedgerSeq(
std::size_t
RelationalDBInterfaceSqliteImp::getTransactionCount()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -572,7 +626,7 @@ RelationalDBInterfaceSqliteImp::getTransactionCount()
/* else use shard databases */
std::size_t rows = 0;
iterateTransactionForward(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
rows += ripple::getRows(session, TableType::Transactions);
return true;
});
@@ -582,7 +636,7 @@ RelationalDBInterfaceSqliteImp::getTransactionCount()
std::size_t
RelationalDBInterfaceSqliteImp::getAccountTransactionCount()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -592,7 +646,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionCount()
/* else use shard databases */
std::size_t rows = 0;
iterateTransactionForward(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
rows += ripple::getRows(session, TableType::AccountTransactions);
return true;
});
@@ -602,7 +656,7 @@ RelationalDBInterfaceSqliteImp::getAccountTransactionCount()
RelationalDBInterface::CountMinMax
RelationalDBInterfaceSqliteImp::getLedgerCountMinMax()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -611,17 +665,18 @@ RelationalDBInterfaceSqliteImp::getLedgerCountMinMax()
/* else use shard databases */
CountMinMax res{0, 0, 0};
iterateLedgerForward({}, [&](soci::session& session, std::uint32_t index) {
auto r = ripple::getRowsMinMax(session, TableType::Ledgers);
if (r.numberOfRows)
{
res.numberOfRows += r.numberOfRows;
if (res.minLedgerSequence == 0)
res.minLedgerSequence = r.minLedgerSequence;
res.maxLedgerSequence = r.maxLedgerSequence;
}
return true;
});
iterateLedgerForward(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
auto r = ripple::getRowsMinMax(session, TableType::Ledgers);
if (r.numberOfRows)
{
res.numberOfRows += r.numberOfRows;
if (res.minLedgerSequence == 0)
res.minLedgerSequence = r.minLedgerSequence;
res.maxLedgerSequence = r.maxLedgerSequence;
}
return true;
});
return res;
}
@@ -630,23 +685,42 @@ RelationalDBInterfaceSqliteImp::saveValidatedLedger(
std::shared_ptr<Ledger const> const& ledger,
bool current)
{
/* if databases exists, use it */
/* if databases exists, use them */
if (existsLedger() && existsTransaction())
{
return ripple::saveValidatedLedger(
*lgrdb_, *txdb_, app_, ledger, current);
if (!ripple::saveValidatedLedger(
*lgrdb_, *txdb_, app_, ledger, current))
return false;
}
/* Todo: use shard databases. Skipped in this PR by propose of Mickey
* Portilla. */
if (auto shardStore = app_.getShardStore())
{
if (ledger->info().seq < shardStore->earliestLedgerSeq())
// For the moment return false only when the ShardStore
// should accept the ledger, but fails when attempting
// to do so, i.e. when saveLedgerMeta fails. Later when
// the ShardStore supercedes the NodeStore, change this
// line to return false if the ledger is too early.
return true;
return false;
auto lgrMetaSession = lgrMetaDB_->checkoutDb();
auto txMetaSession = txMetaDB_->checkoutDb();
return ripple::saveLedgerMeta(
ledger,
app_,
*lgrMetaSession,
*txMetaSession,
shardStore->seqToShardIndex(ledger->info().seq));
}
return true;
}
std::optional<LedgerInfo>
RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -655,7 +729,7 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq)
/* else use shard database */
std::optional<LedgerInfo> res;
doLedger(ledgerSeq, [&](soci::session& session, std::uint32_t index) {
doLedger(ledgerSeq, [&](soci::session& session) {
res = ripple::getLedgerInfoByIndex(session, ledgerSeq, j_);
return true;
});
@@ -665,7 +739,7 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByIndex(LedgerIndex ledgerSeq)
std::optional<LedgerInfo>
RelationalDBInterfaceSqliteImp::getNewestLedgerInfo()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -674,14 +748,15 @@ RelationalDBInterfaceSqliteImp::getNewestLedgerInfo()
/* else use shard databases */
std::optional<LedgerInfo> res;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
if (auto info = ripple::getNewestLedgerInfo(session, j_))
{
res = info;
return false;
}
return true;
});
iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
if (auto info = ripple::getNewestLedgerInfo(session, j_))
{
res = info;
return false;
}
return true;
});
return res;
}
@@ -690,7 +765,7 @@ std::optional<LedgerInfo>
RelationalDBInterfaceSqliteImp::getLimitedOldestLedgerInfo(
LedgerIndex ledgerFirstIndex)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -701,7 +776,7 @@ RelationalDBInterfaceSqliteImp::getLimitedOldestLedgerInfo(
std::optional<LedgerInfo> res;
iterateLedgerForward(
seqToShardIndex(ledgerFirstIndex),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
if (auto info = ripple::getLimitedOldestLedgerInfo(
session, ledgerFirstIndex, j_))
{
@@ -718,7 +793,7 @@ std::optional<LedgerInfo>
RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo(
LedgerIndex ledgerFirstIndex)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -727,17 +802,16 @@ RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo(
/* else use shard databases */
std::optional<LedgerInfo> res;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
if (auto info = ripple::getLimitedNewestLedgerInfo(
session, ledgerFirstIndex, j_))
{
res = info;
return false;
}
if (index < seqToShardIndex(ledgerFirstIndex))
return false;
return true;
});
iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
if (auto info = ripple::getLimitedNewestLedgerInfo(
session, ledgerFirstIndex, j_))
{
res = info;
return false;
}
return shardIndex >= seqToShardIndex(ledgerFirstIndex);
});
return res;
}
@@ -745,7 +819,7 @@ RelationalDBInterfaceSqliteImp::getLimitedNewestLedgerInfo(
std::optional<LedgerInfo>
RelationalDBInterfaceSqliteImp::getLedgerInfoByHash(uint256 const& ledgerHash)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -753,17 +827,25 @@ RelationalDBInterfaceSqliteImp::getLedgerInfoByHash(uint256 const& ledgerHash)
}
/* else use shard databases */
std::optional<LedgerInfo> res;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
if (auto info = ripple::getLedgerInfoByHash(session, ledgerHash, j_))
{
res = info;
return false;
}
return true;
});
if (auto shardStore = app_.getShardStore())
{
std::optional<LedgerInfo> res;
auto lgrMetaSession = lgrMetaDB_->checkoutDb();
return res;
if (auto const shardIndex =
ripple::getShardIndexforLedger(*lgrMetaSession, ledgerHash))
{
shardStore->callForLedgerSQLByShardIndex(
*shardIndex, [&](soci::session& session) {
res = ripple::getLedgerInfoByHash(session, ledgerHash, j_);
return false; // unused
});
}
return res;
}
return {};
}
uint256
@@ -778,7 +860,7 @@ RelationalDBInterfaceSqliteImp::getHashByIndex(LedgerIndex ledgerIndex)
/* else use shard database */
uint256 hash;
doLedger(ledgerIndex, [&](soci::session& session, std::uint32_t index) {
doLedger(ledgerIndex, [&](soci::session& session) {
hash = ripple::getHashByIndex(session, ledgerIndex);
return true;
});
@@ -797,7 +879,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex(LedgerIndex ledgerIndex)
/* else use shard database */
std::optional<LedgerHashPair> res;
doLedger(ledgerIndex, [&](soci::session& session, std::uint32_t index) {
doLedger(ledgerIndex, [&](soci::session& session) {
res = ripple::getHashesByIndex(session, ledgerIndex, j_);
return true;
});
@@ -823,7 +905,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex(
LedgerIndex shardMaxSeq = lastLedgerSeq(seqToShardIndex(minSeq));
if (shardMaxSeq > maxSeq)
shardMaxSeq = maxSeq;
doLedger(minSeq, [&](soci::session& session, std::uint32_t index) {
doLedger(minSeq, [&](soci::session& session) {
auto r = ripple::getHashesByIndex(session, minSeq, shardMaxSeq, j_);
res.insert(r.begin(), r.end());
return true;
@@ -837,7 +919,7 @@ RelationalDBInterfaceSqliteImp::getHashesByIndex(
std::vector<std::shared_ptr<Transaction>>
RelationalDBInterfaceSqliteImp::getTxHistory(LedgerIndex startIndex)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -848,7 +930,7 @@ RelationalDBInterfaceSqliteImp::getTxHistory(LedgerIndex startIndex)
std::vector<std::shared_ptr<Transaction>> txs;
int quantity = 20;
iterateTransactionBack(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
auto [tx, total] =
ripple::getTxHistory(session, app_, startIndex, quantity, true);
txs.insert(txs.end(), tx.begin(), tx.end());
@@ -875,7 +957,7 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxs(
{
LedgerMaster& ledgerMaster = app_.getLedgerMaster();
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -891,8 +973,8 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxs(
iterateTransactionForward(
opt.minLedger ? seqToShardIndex(opt.minLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.maxLedger && index > seqToShardIndex(opt.maxLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.maxLedger && shardIndex > seqToShardIndex(opt.maxLedger))
return false;
auto [r, total] = ripple::getOldestAccountTxs(
session, app_, ledgerMaster, opt, limit_used, j_);
@@ -924,7 +1006,7 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxs(
{
LedgerMaster& ledgerMaster = app_.getLedgerMaster();
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -940,8 +1022,8 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxs(
iterateTransactionBack(
opt.maxLedger ? seqToShardIndex(opt.maxLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.minLedger && index < seqToShardIndex(opt.minLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger))
return false;
auto [r, total] = ripple::getNewestAccountTxs(
session, app_, ledgerMaster, opt, limit_used, j_);
@@ -971,7 +1053,7 @@ RelationalDBInterface::MetaTxsList
RelationalDBInterfaceSqliteImp::getOldestAccountTxsB(
AccountTxOptions const& options)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -985,8 +1067,8 @@ RelationalDBInterfaceSqliteImp::getOldestAccountTxsB(
iterateTransactionForward(
opt.minLedger ? seqToShardIndex(opt.minLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.maxLedger && index > seqToShardIndex(opt.maxLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.maxLedger && shardIndex > seqToShardIndex(opt.maxLedger))
return false;
auto [r, total] = ripple::getOldestAccountTxsB(
session, app_, opt, limit_used, j_);
@@ -1016,7 +1098,7 @@ RelationalDBInterface::MetaTxsList
RelationalDBInterfaceSqliteImp::getNewestAccountTxsB(
AccountTxOptions const& options)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1030,8 +1112,8 @@ RelationalDBInterfaceSqliteImp::getNewestAccountTxsB(
iterateTransactionBack(
opt.maxLedger ? seqToShardIndex(opt.maxLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.minLedger && index < seqToShardIndex(opt.minLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger))
return false;
auto [r, total] = ripple::getNewestAccountTxsB(
session, app_, opt, limit_used, j_);
@@ -1077,7 +1159,7 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPage(
convertBlobsToTxResult(ret, ledger_index, status, rawTxn, rawMeta, app);
};
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1099,9 +1181,9 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPage(
iterateTransactionForward(
opt.minLedger ? seqToShardIndex(opt.minLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.maxLedger != UINT32_MAX &&
index > seqToShardIndex(opt.minLedger))
shardIndex > seqToShardIndex(opt.minLedger))
return false;
auto [marker, total] = ripple::oldestAccountTxPage(
session,
@@ -1141,7 +1223,7 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPage(
convertBlobsToTxResult(ret, ledger_index, status, rawTxn, rawMeta, app);
};
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1163,8 +1245,8 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPage(
iterateTransactionBack(
opt.maxLedger != UINT32_MAX ? seqToShardIndex(opt.maxLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.minLedger && index < seqToShardIndex(opt.minLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger))
return false;
auto [marker, total] = ripple::newestAccountTxPage(
session,
@@ -1203,7 +1285,7 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPageB(
ret.emplace_back(std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1225,9 +1307,9 @@ RelationalDBInterfaceSqliteImp::oldestAccountTxPageB(
iterateTransactionForward(
opt.minLedger ? seqToShardIndex(opt.minLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.maxLedger != UINT32_MAX &&
index > seqToShardIndex(opt.minLedger))
shardIndex > seqToShardIndex(opt.minLedger))
return false;
auto [marker, total] = ripple::oldestAccountTxPage(
session,
@@ -1266,7 +1348,7 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB(
ret.emplace_back(std::move(rawTxn), std::move(rawMeta), ledgerIndex);
};
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1288,8 +1370,8 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB(
iterateTransactionBack(
opt.maxLedger != UINT32_MAX ? seqToShardIndex(opt.maxLedger)
: std::optional<std::uint32_t>(),
[&](soci::session& session, std::uint32_t index) {
if (opt.minLedger && index < seqToShardIndex(opt.minLedger))
[&](soci::session& session, std::uint32_t shardIndex) {
if (opt.minLedger && shardIndex < seqToShardIndex(opt.minLedger))
return false;
auto [marker, total] = ripple::newestAccountTxPage(
session,
@@ -1312,10 +1394,10 @@ RelationalDBInterfaceSqliteImp::newestAccountTxPageB(
std::variant<RelationalDBInterface::AccountTx, TxSearched>
RelationalDBInterfaceSqliteImp::getTransaction(
uint256 const& id,
std::optional<ClosedInterval<uint32_t>> const& range,
std::optional<ClosedInterval<std::uint32_t>> const& range,
error_code_i& ec)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1323,30 +1405,44 @@ RelationalDBInterfaceSqliteImp::getTransaction(
}
/* else use shard databases */
std::variant<AccountTx, TxSearched> res(TxSearched::unknown);
iterateTransactionBack(
{}, [&](soci::session& session, std::uint32_t index) {
std::optional<ClosedInterval<uint32_t>> range1;
if (range)
{
uint32_t low = std::max(range->lower(), firstLedgerSeq(index));
uint32_t high = std::min(range->upper(), lastLedgerSeq(index));
if (low <= high)
range1 = ClosedInterval<uint32_t>(low, high);
}
res = ripple::getTransaction(session, app_, id, range1, ec);
/* finish iterations if transaction found or error detected */
return res.index() == 1 &&
std::get<TxSearched>(res) != TxSearched::unknown;
});
if (auto shardStore = app_.getShardStore())
{
std::variant<AccountTx, TxSearched> res(TxSearched::unknown);
auto txMetaSession = txMetaDB_->checkoutDb();
return res;
if (auto const shardIndex =
ripple::getShardIndexforTransaction(*txMetaSession, id))
{
shardStore->callForTransactionSQLByShardIndex(
*shardIndex, [&](soci::session& session) {
std::optional<ClosedInterval<std::uint32_t>> range1;
if (range)
{
std::uint32_t const low = std::max(
range->lower(), firstLedgerSeq(*shardIndex));
std::uint32_t const high = std::min(
range->upper(), lastLedgerSeq(*shardIndex));
if (low <= high)
range1 = ClosedInterval<std::uint32_t>(low, high);
}
res = ripple::getTransaction(session, app_, id, range1, ec);
return res.index() == 1 &&
std::get<TxSearched>(res) !=
TxSearched::unknown; // unused
});
}
return res;
}
return {TxSearched::unknown};
}
bool
RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
auto db = checkoutLedger();
@@ -1355,7 +1451,7 @@ RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config)
/* else use shard databases */
return iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
return ripple::dbHasSpace(session, config, j_);
});
}
@@ -1363,7 +1459,7 @@ RelationalDBInterfaceSqliteImp::ledgerDbHasSpace(Config const& config)
bool
RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config)
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
auto db = checkoutTransaction();
@@ -1372,7 +1468,7 @@ RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config)
/* else use shard databases */
return iterateTransactionBack(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
return ripple::dbHasSpace(session, config, j_);
});
}
@@ -1380,7 +1476,7 @@ RelationalDBInterfaceSqliteImp::transactionDbHasSpace(Config const& config)
int
RelationalDBInterfaceSqliteImp::getKBUsedAll()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
return ripple::getKBUsedAll(lgrdb_->getSession());
@@ -1388,17 +1484,18 @@ RelationalDBInterfaceSqliteImp::getKBUsedAll()
/* else use shard databases */
int sum = 0;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
sum += ripple::getKBUsedAll(session);
return true;
});
iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
sum += ripple::getKBUsedAll(session);
return true;
});
return sum;
}
int
RelationalDBInterfaceSqliteImp::getKBUsedLedger()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsLedger())
{
return ripple::getKBUsedDB(lgrdb_->getSession());
@@ -1406,17 +1503,18 @@ RelationalDBInterfaceSqliteImp::getKBUsedLedger()
/* else use shard databases */
int sum = 0;
iterateLedgerBack({}, [&](soci::session& session, std::uint32_t index) {
sum += ripple::getKBUsedDB(session);
return true;
});
iterateLedgerBack(
{}, [&](soci::session& session, std::uint32_t shardIndex) {
sum += ripple::getKBUsedDB(session);
return true;
});
return sum;
}
int
RelationalDBInterfaceSqliteImp::getKBUsedTransaction()
{
/* if databases exists, use it */
/* if database exists, use it */
if (existsTransaction())
{
return ripple::getKBUsedDB(txdb_->getSession());
@@ -1425,13 +1523,25 @@ RelationalDBInterfaceSqliteImp::getKBUsedTransaction()
/* else use shard databases */
int sum = 0;
iterateTransactionBack(
{}, [&](soci::session& session, std::uint32_t index) {
{}, [&](soci::session& session, std::uint32_t shardIndex) {
sum += ripple::getKBUsedDB(session);
return true;
});
return sum;
}
void
RelationalDBInterfaceSqliteImp::closeLedgerDB()
{
lgrdb_.reset();
}
void
RelationalDBInterfaceSqliteImp::closeTransactionDB()
{
txdb_.reset();
}
std::unique_ptr<RelationalDBInterface>
getRelationalDBInterfaceSqlite(
Application& app,

View File

@@ -283,6 +283,18 @@ public:
*/
virtual int
getKBUsedTransaction() = 0;
/**
* @brief Closes the ledger database
*/
virtual void
closeLedgerDB() = 0;
/**
* @brief Closes the transaction database
*/
virtual void
closeTransactionDB() = 0;
};
} // namespace ripple

View File

@@ -32,6 +32,130 @@
namespace ripple {
DatabasePair
makeMetaDBs(
Config const& config,
DatabaseCon::Setup const& setup,
DatabaseCon::CheckpointerSetup const& checkpointerSetup)
{
// ledger meta database
auto lgrMetaDB{std::make_unique<DatabaseCon>(
setup,
LgrMetaDBName,
LgrMetaDBPragma,
LgrMetaDBInit,
checkpointerSetup)};
if (config.useTxTables())
{
// transaction meta database
auto txMetaDB{std::make_unique<DatabaseCon>(
setup,
TxMetaDBName,
TxMetaDBPragma,
TxMetaDBInit,
checkpointerSetup)};
return {std::move(lgrMetaDB), std::move(txMetaDB)};
}
return {std::move(lgrMetaDB), nullptr};
}
bool
saveLedgerMeta(
std::shared_ptr<Ledger const> const& ledger,
Application& app,
soci::session& lgrMetaSession,
soci::session& txnMetaSession,
std::uint32_t const shardIndex)
{
std::string_view constexpr lgrSQL =
R"sql(INSERT OR REPLACE INTO LedgerMeta VALUES
(:ledgerHash,:shardIndex);)sql";
auto const hash = to_string(ledger->info().hash);
lgrMetaSession << lgrSQL, soci::use(hash), soci::use(shardIndex);
if (app.config().useTxTables())
{
AcceptedLedger::pointer const aLedger = [&app, ledger] {
try
{
auto aLedger =
app.getAcceptedLedgerCache().fetch(ledger->info().hash);
if (!aLedger)
{
aLedger = std::make_shared<AcceptedLedger>(ledger, app);
app.getAcceptedLedgerCache().canonicalize_replace_client(
ledger->info().hash, aLedger);
}
return aLedger;
}
catch (std::exception const&)
{
JLOG(app.journal("Ledger").warn())
<< "An accepted ledger was missing nodes";
}
return AcceptedLedger::pointer{nullptr};
}();
if (!aLedger)
return false;
soci::transaction tr(txnMetaSession);
for (auto const& [_, acceptedLedgerTx] : aLedger->getMap())
{
(void)_;
std::string_view constexpr txnSQL =
R"sql(INSERT OR REPLACE INTO TransactionMeta VALUES
(:transactionID,:shardIndex);)sql";
auto const transactionID =
to_string(acceptedLedgerTx->getTransactionID());
txnMetaSession << txnSQL, soci::use(transactionID),
soci::use(shardIndex);
}
tr.commit();
}
return true;
}
std::optional<std::uint32_t>
getShardIndexforLedger(soci::session& session, LedgerHash const& hash)
{
std::uint32_t shardIndex;
session << "SELECT ShardIndex FROM LedgerMeta WHERE LedgerHash = '" << hash
<< "';",
soci::into(shardIndex);
if (!session.got_data())
return std::nullopt;
return shardIndex;
}
std::optional<std::uint32_t>
getShardIndexforTransaction(soci::session& session, TxID const& id)
{
std::uint32_t shardIndex;
session << "SELECT ShardIndex FROM TransactionMeta WHERE TransID = '" << id
<< "';",
soci::into(shardIndex);
if (!session.got_data())
return std::nullopt;
return shardIndex;
}
DatabasePair
makeShardCompleteLedgerDBs(
Config const& config,

View File

@@ -128,36 +128,48 @@ public:
virtual void
setStored(std::shared_ptr<Ledger const> const& ledger) = 0;
/**
* @brief callForLedgerSQL Checkouts ledger database for shard
* containing given ledger and calls given callback function passing
* shard index and session with the database to it.
* @param ledgerSeq Ledger sequence.
* @param callback Callback function to call.
* @return Value returned by callback function.
*/
/** Invoke a callback on the SQLite db holding the
corresponding ledger
@return Value returned by callback function.
*/
virtual bool
callForLedgerSQL(
callForLedgerSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback) = 0;
std::function<bool(soci::session& session)> const& callback) = 0;
/** Invoke a callback on the ledger SQLite db for the
corresponding shard
@return Value returned by callback function.
*/
virtual bool
callForLedgerSQLByShardIndex(
std::uint32_t shardIndex,
std::function<bool(soci::session& session)> const& callback) = 0;
/** Invoke a callback on the transaction SQLite db
for the corresponding ledger
@return Value returned by callback function.
*/
virtual bool
callForTransactionSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session)> const& callback) = 0;
/** Invoke a callback on the transaction SQLite db
for the corresponding shard
@return Value returned by callback function.
*/
virtual bool
callForTransactionSQLByShardIndex(
std::uint32_t shardIndex,
std::function<bool(soci::session& session)> const& callback) = 0;
/**
* @brief callForTransactionSQL Checkouts transaction database for shard
* containing given ledger and calls given callback function passing
* shard index and session with the database to it.
* @param ledgerSeq Ledger sequence.
* @param callback Callback function to call.
* @return Value returned by callback function.
*/
virtual bool
callForTransactionSQL(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback) = 0;
/**
* @brief iterateLedgerSQLsForward Checkouts ledger databases for all
* @brief iterateLedgerSQLsForward Checks out ledger databases for all
* shards in ascending order starting from given shard index until
* shard with the largest index visited or callback returned false.
* For each visited shard calls given callback function passing
@@ -170,11 +182,12 @@ public:
virtual bool
iterateLedgerSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) = 0;
/**
* @brief iterateTransactionSQLsForward Checkouts transaction databases for
* @brief iterateTransactionSQLsForward Checks out transaction databases for
* all shards in ascending order starting from given shard index
* until shard with the largest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -187,11 +200,12 @@ public:
virtual bool
iterateTransactionSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) = 0;
/**
* @brief iterateLedgerSQLsBack Checkouts ledger databases for
* @brief iterateLedgerSQLsBack Checks out ledger databases for
* all shards in descending order starting from given shard index
* until shard with the smallest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -204,11 +218,12 @@ public:
virtual bool
iterateLedgerSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) = 0;
/**
* @brief iterateTransactionSQLsBack Checkouts transaction databases for
* @brief iterateTransactionSQLsBack Checks out transaction databases for
* all shards in descending order starting from given shard index
* until shard with the smallest index visited or callback returned
* false. For each visited shard calls given callback function
@@ -221,7 +236,8 @@ public:
virtual bool
iterateTransactionSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) = 0;
/** Query information about shards held

View File

@@ -895,13 +895,13 @@ DatabaseShardImp::doImportDatabase()
std::uint32_t const lastSeq =
std::max(firstSeq, lastLedgerSeq(shardIndex));
// Verify SQLite ledgers are in the node store
{
auto const ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != maxLedgers(shardIndex))
continue;
// Verify SQLite ledgers are in the node store
{
auto const ledgerHashes{
app_.getRelationalDBInterface().getHashesByIndex(
firstSeq, lastSeq)};
if (ledgerHashes.size() != maxLedgers(shardIndex))
continue;
auto& source = app_.getNodeStore();
bool valid{true};
@@ -1948,39 +1948,48 @@ DatabaseShardImp::checkHistoricalPaths(std::lock_guard<std::mutex> const&) const
}
bool
DatabaseShardImp::callForLedgerSQL(
DatabaseShardImp::callForLedgerSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
std::function<bool(soci::session& session)> const& callback)
{
std::lock_guard lock(mutex_);
auto shardIndex = seqToShardIndex(ledgerSeq);
if (shards_.count(shardIndex) &&
shards_[shardIndex]->getState() == ShardState::finalized)
{
return shards_[shardIndex]->callForLedgerSQL(callback);
}
return false;
return callForLedgerSQLByShardIndex(seqToShardIndex(ledgerSeq), callback);
}
bool
DatabaseShardImp::callForTransactionSQL(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
DatabaseShardImp::callForLedgerSQLByShardIndex(
const uint32_t shardIndex,
std::function<bool(soci::session& session)> const& callback)
{
std::lock_guard lock(mutex_);
auto shardIndex = seqToShardIndex(ledgerSeq);
if (shards_.count(shardIndex) &&
shards_[shardIndex]->getState() == ShardState::finalized)
{
return shards_[shardIndex]->callForTransactionSQL(callback);
}
auto const it{shards_.find(shardIndex)};
return false;
return it != shards_.end() &&
it->second->getState() == ShardState::finalized &&
it->second->callForLedgerSQL(callback);
}
bool
DatabaseShardImp::callForTransactionSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session)> const& callback)
{
return callForTransactionSQLByShardIndex(
seqToShardIndex(ledgerSeq), callback);
}
bool
DatabaseShardImp::callForTransactionSQLByShardIndex(
std::uint32_t const shardIndex,
std::function<bool(soci::session& session)> const& callback)
{
std::lock_guard lock(mutex_);
auto const it{shards_.find(shardIndex)};
return it != shards_.end() &&
it->second->getState() == ShardState::finalized &&
it->second->callForTransactionSQL(callback);
}
bool
@@ -2010,10 +2019,11 @@ DatabaseShardImp::iterateShardsForward(
return true;
}
bool
DatabaseShardImp::iterateLedgerSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return iterateShardsForward(
@@ -2025,7 +2035,7 @@ DatabaseShardImp::iterateLedgerSQLsForward(
bool
DatabaseShardImp::iterateTransactionSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return iterateShardsForward(
@@ -2066,7 +2076,7 @@ DatabaseShardImp::iterateShardsBack(
bool
DatabaseShardImp::iterateLedgerSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {
@@ -2077,7 +2087,7 @@ DatabaseShardImp::iterateLedgerSQLsBack(
bool
DatabaseShardImp::iterateTransactionSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
callback)
{
return iterateShardsBack(maxShardIndex, [&callback](Shard& shard) -> bool {

View File

@@ -139,39 +139,51 @@ public:
getDatabaseImportSequence() const override;
bool
callForLedgerSQL(
callForLedgerSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback) override;
std::function<bool(soci::session& session)> const& callback) override;
bool
callForTransactionSQL(
callForLedgerSQLByShardIndex(
std::uint32_t const shardIndex,
std::function<bool(soci::session& session)> const& callback) override;
bool
callForTransactionSQLByLedgerSeq(
LedgerIndex ledgerSeq,
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback) override;
std::function<bool(soci::session& session)> const& callback) override;
bool
callForTransactionSQLByShardIndex(
std::uint32_t const shardIndex,
std::function<bool(soci::session& session)> const& callback) override;
bool
iterateLedgerSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) override;
bool
iterateTransactionSQLsForward(
std::optional<std::uint32_t> minShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) override;
bool
iterateLedgerSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) override;
bool
iterateTransactionSQLsBack(
std::optional<std::uint32_t> maxShardIndex,
std::function<bool(soci::session& session, std::uint32_t index)> const&
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback) override;
private:

View File

@@ -1238,28 +1238,19 @@ Shard::makeBackendCount()
}
bool
Shard::callForLedgerSQL(
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
Shard::doCallForSQL(
std::function<bool(soci::session& session)> const& callback,
LockedSociSession&& db)
{
auto const scopedCount{makeBackendCount()};
if (!scopedCount)
return false;
auto db = lgrSQLiteDB_->checkoutDb();
return callback(*db, index_);
return callback(*db);
}
bool
Shard::callForTransactionSQL(
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback)
Shard::doCallForSQL(
std::function<bool(soci::session& session, std::uint32_t shardIndex)> const&
callback,
LockedSociSession&& db)
{
auto const scopedCount{makeBackendCount()};
if (!scopedCount)
return false;
auto db = txSQLiteDB_->checkoutDb();
return callback(*db, index_);
}

View File

@@ -215,29 +215,29 @@ public:
return to_string(acquireInfo_->storedSeqs);
}
/**
* @brief callForLedgerSQL Checks out ledger database for the shard and
* calls given callback function passing shard index and session
* with the database to it.
* @param callback Callback function to call.
* @return Value returned by callback function.
*/
bool
callForLedgerSQL(
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback);
/** Invoke a callback on the ledger SQLite db
/**
* @brief callForTransactionSQL Checks out transaction database for the
* shard and calls given callback function passing shard index and
* session with the database to it.
* @param callback Callback function to call.
* @return Value returned by callback function.
*/
@param callback Callback function to call.
@return Value returned by callback function.
*/
template <typename... Args>
bool
callForTransactionSQL(
std::function<bool(soci::session& session, std::uint32_t index)> const&
callback);
callForLedgerSQL(std::function<bool(Args... args)> const& callback)
{
return callForSQL(callback, lgrSQLiteDB_->checkoutDb());
}
/** Invoke a callback on the transaction SQLite db
@param callback Callback function to call.
@return Value returned by callback function.
*/
template <typename... Args>
bool
callForTransactionSQL(std::function<bool(Args... args)> const& callback)
{
return callForSQL(callback, txSQLiteDB_->checkoutDb());
}
// Current shard version
static constexpr std::uint32_t version{2};
@@ -389,6 +389,35 @@ private:
// Open databases if they are closed
[[nodiscard]] Shard::Count
makeBackendCount();
// Invoke a callback on the supplied session parameter
template <typename... Args>
bool
callForSQL(
std::function<bool(Args... args)> const& callback,
LockedSociSession&& db)
{
auto const scopedCount{makeBackendCount()};
if (!scopedCount)
return false;
return doCallForSQL(callback, std::move(db));
}
// Invoke a callback that accepts a SQLite session parameter
bool
doCallForSQL(
std::function<bool(soci::session& session)> const& callback,
LockedSociSession&& db);
// Invoke a callback that accepts a SQLite session and the
// shard index as parameters
bool
doCallForSQL(
std::function<
bool(soci::session& session, std::uint32_t shardIndex)> const&
callback,
LockedSociSession&& db);
};
} // namespace NodeStore

View File

@@ -20,6 +20,7 @@
#include <ripple/app/ledger/LedgerMaster.h>
#include <ripple/app/ledger/LedgerToJson.h>
#include <ripple/app/misc/SHAMapStore.h>
#include <ripple/app/rdb/backend/RelationalDBInterfaceSqlite.h>
#include <ripple/basics/Slice.h>
#include <ripple/basics/random.h>
#include <ripple/beast/hash/hash_append.h>
@@ -1729,6 +1730,101 @@ class DatabaseShard_test : public TestBase
BEAST_EXPECT(msg.peerchain_size() == 0);
}
void
testRelationalDBInterfaceSqlite(std::uint64_t const seedValue)
{
testcase("Relational DB Interface SQLite");
using namespace test::jtx;
beast::temp_dir shardDir;
Env env{*this, testConfig(shardDir.path())};
auto shardStore{env.app().getShardStore()};
BEAST_EXPECT(shardStore);
auto const shardCount = 3;
TestData data(seedValue, 3, shardCount);
if (!BEAST_EXPECT(data.makeLedgers(env)))
return;
BEAST_EXPECT(shardStore->getShardInfo()->finalized().empty());
BEAST_EXPECT(shardStore->getShardInfo()->incompleteToString().empty());
auto rdb = dynamic_cast<RelationalDBInterfaceSqlite*>(
&env.app().getRelationalDBInterface());
BEAST_EXPECT(rdb);
for (std::uint32_t i = 0; i < shardCount; ++i)
{
// Populate the shard store
auto n = createShard(data, *shardStore, shardCount);
if (!BEAST_EXPECT(n && *n >= 1 && *n <= shardCount))
return;
}
// Close these databases to force the RelationalDBInterfaceSqlite
// to use the shard databases and lookup tables.
rdb->closeLedgerDB();
rdb->closeTransactionDB();
// Lambda for comparing Ledger objects
auto infoCmp = [](auto const& a, auto const& b) {
return a.hash == b.hash && a.txHash == b.txHash &&
a.accountHash == b.accountHash &&
a.parentHash == b.parentHash && a.drops == b.drops &&
a.accepted == b.accepted && a.closeFlags == b.closeFlags &&
a.closeTimeResolution == b.closeTimeResolution &&
a.closeTime == b.closeTime;
};
for (auto const ledger : data.ledgers_)
{
// Compare each test ledger to the data retrieved
// from the RelationalDBInterfaceSqlite class
if (shardStore->seqToShardIndex(ledger->seq()) <
shardStore->earliestShardIndex() ||
ledger->info().seq < shardStore->earliestLedgerSeq())
continue;
auto info = rdb->getLedgerInfoByHash(ledger->info().hash);
BEAST_EXPECT(info);
BEAST_EXPECT(infoCmp(*info, ledger->info()));
for (auto const& transaction : ledger->txs)
{
// Compare each test transaction to the data
// retrieved from the RelationalDBInterfaceSqlite
// class
error_code_i error{rpcSUCCESS};
auto reference = rdb->getTransaction(
transaction.first->getTransactionID(), {}, error);
BEAST_EXPECT(error == rpcSUCCESS);
if (!BEAST_EXPECT(reference.index() == 0))
continue;
auto txn = std::get<0>(reference).first->getSTransaction();
BEAST_EXPECT(
transaction.first->getFullText() == txn->getFullText());
}
}
// Create additional ledgers to test a pathway in
// 'ripple::saveLedgerMeta' wherein fetching the
// accepted ledger fails
data = TestData(seedValue * 2, 4, 1);
if (!BEAST_EXPECT(data.makeLedgers(env, shardCount)))
return;
}
public:
DatabaseShard_test() : journal_("DatabaseShard_test", *this)
{
@@ -1758,6 +1854,7 @@ public:
testPrepareWithHistoricalPaths(seedValue());
testOpenShardManagement(seedValue());
testShardInfo(seedValue());
testRelationalDBInterfaceSqlite(seedValue());
}
};

View File

@@ -621,11 +621,7 @@ public:
{
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
megabytes(4), scheduler, 2, nodeParams, journal_);
BEAST_EXPECT(
db->ledgersPerShard() == DEFAULT_LEDGERS_PER_SHARD);
}
@@ -636,11 +632,7 @@ public:
nodeParams.set("ledgers_per_shard", "100");
std::unique_ptr<Database> db =
Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
megabytes(4), scheduler, 2, nodeParams, journal_);
}
catch (std::runtime_error const& e)
{
@@ -651,11 +643,7 @@ public:
// Set a valid ledgers per shard
nodeParams.set("ledgers_per_shard", "256");
std::unique_ptr<Database> db = Manager::instance().make_Database(
megabytes(4),
scheduler,
2,
nodeParams,
journal_);
megabytes(4), scheduler, 2, nodeParams, journal_);
// Verify database uses the ledgers per shard
BEAST_EXPECT(db->ledgersPerShard() == 256);