Relational DB interface

This commit is contained in:
cdy20
2020-06-03 17:57:07 -04:00
committed by manojsdoshi
parent 207e1730e9
commit 6d82fb83a0
66 changed files with 8528 additions and 3593 deletions

View File

@@ -44,6 +44,8 @@
#include <ripple/app/misc/ValidatorKeys.h>
#include <ripple/app/misc/ValidatorSite.h>
#include <ripple/app/paths/PathRequests.h>
#include <ripple/app/rdb/RelationalDBInterface_global.h>
#include <ripple/app/rdb/backend/RelationalDBInterfacePostgres.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/ByteUtilities.h>
@@ -53,7 +55,6 @@
#include <ripple/beast/asio/io_latency_probe.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/Pg.h>
#include <ripple/core/Stoppable.h>
#include <ripple/json/json_reader.h>
#include <ripple/nodestore/DatabaseShard.h>
@@ -169,9 +170,6 @@ public:
// Required by the SHAMapStore
TransactionMaster m_txMaster;
#ifdef RIPPLED_REPORTING
std::shared_ptr<PgPool> pgPool_;
#endif
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr<SHAMapStore> m_shaMapStore;
PendingSaves pendingSaves_;
@@ -221,8 +219,7 @@ public:
boost::asio::steady_timer entropyTimer_;
bool startTimers_;
std::unique_ptr<DatabaseCon> mTxnDB;
std::unique_ptr<DatabaseCon> mLedgerDB;
std::unique_ptr<RelationalDBInterface> mRelationalDBInterface;
std::unique_ptr<DatabaseCon> mWalletDB;
std::unique_ptr<Overlay> overlay_;
std::vector<std::unique_ptr<Stoppable>> websocketServers_;
@@ -285,14 +282,6 @@ public:
[this]() { signalStop(); }))
, m_txMaster(*this)
#ifdef RIPPLED_REPORTING
, pgPool_(
config_->reporting() ? make_PgPool(
config_->section("ledger_tx_tables"),
*this,
logs_->journal("PgPool"))
: nullptr)
#endif
, m_nodeStoreScheduler(*this)
, m_shaMapStore(make_SHAMapStore(
@@ -860,27 +849,12 @@ public:
return *txQ_;
}
DatabaseCon&
getTxnDB() override
RelationalDBInterface&
getRelationalDBInterface() override
{
assert(mTxnDB.get() != nullptr);
return *mTxnDB;
assert(mRelationalDBInterface.get() != nullptr);
return *mRelationalDBInterface;
}
DatabaseCon&
getLedgerDB() override
{
assert(mLedgerDB.get() != nullptr);
return *mLedgerDB;
}
#ifdef RIPPLED_REPORTING
std::shared_ptr<PgPool> const&
getPgPool() override
{
assert(pgPool_);
return pgPool_;
}
#endif
DatabaseCon&
getWalletDB() override
@@ -907,85 +881,18 @@ public:
bool
initRDBMS()
{
assert(mTxnDB.get() == nullptr);
assert(mLedgerDB.get() == nullptr);
assert(mWalletDB.get() == nullptr);
try
{
auto setup = setup_DatabaseCon(*config_, m_journal);
if (!config_->reporting())
{
if (config_->useTxTables())
{
// transaction database
mTxnDB = std::make_unique<DatabaseCon>(
setup,
TxDBName,
TxDBPragma,
TxDBInit,
DatabaseCon::CheckpointerSetup{
m_jobQueue.get(), &logs()});
mTxnDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::txnDBCache)));
if (!setup.standAlone || setup.startUp == Config::LOAD ||
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
{
// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
soci::indicator ind;
soci::statement st =
(mTxnDB->getSession().prepare
<< ("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name),
soci::into(type),
soci::into(notnull),
soci::into(dflt_value, ind),
soci::into(pk));
st.execute();
while (st.fetch())
{
if (pk == 1)
{
JLOG(m_journal.fatal())
<< "AccountTransactions database "
"should not have a primary key";
return false;
}
}
}
}
// ledger database
mLedgerDB = std::make_unique<DatabaseCon>(
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mLedgerDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::lgrDBCache)));
}
else if (!config_->reportingReadOnly()) // use pg
{
#ifdef RIPPLED_REPORTING
initSchema(pgPool_);
#endif
}
mRelationalDBInterface =
RelationalDBInterface::init(*this, *config_, *m_jobQueue);
// wallet database
auto setup = setup_DatabaseCon(*config_, m_journal);
setup.useGlobalPragma = false;
mWalletDB = std::make_unique<DatabaseCon>(
setup,
WalletDBName,
std::array<char const*, 0>(),
WalletDBInit);
mWalletDB = makeWalletDB(setup);
}
catch (std::exception const& e)
{
@@ -1206,71 +1113,10 @@ public:
void
doSweep()
{
if (!config_->standalone())
if (!config_->standalone() &&
!getRelationalDBInterface().transactionDbHasSpace(*config_))
{
boost::filesystem::space_info space =
boost::filesystem::space(config_->legacy("database_path"));
if (space.available < megabytes(512))
{
JLOG(m_journal.fatal())
<< "Remaining free disk space is less than 512MB";
signalStop();
}
if (!config_->reporting() && config_->useTxTables())
{
DatabaseCon::Setup dbSetup = setup_DatabaseCon(*config_);
boost::filesystem::path dbPath = dbSetup.dataDir / TxDBName;
boost::system::error_code ec;
std::optional<std::uint64_t> dbSize =
boost::filesystem::file_size(dbPath, ec);
if (ec)
{
JLOG(m_journal.error())
<< "Error checking transaction db file size: "
<< ec.message();
dbSize.reset();
}
auto db = mTxnDB->checkoutDb();
static auto const pageSize = [&] {
std::uint32_t ps;
*db << "PRAGMA page_size;", soci::into(ps);
return ps;
}();
static auto const maxPages = [&] {
std::uint32_t mp;
*db << "PRAGMA max_page_count;", soci::into(mp);
return mp;
}();
std::uint32_t pageCount;
*db << "PRAGMA page_count;", soci::into(pageCount);
std::uint32_t freePages = maxPages - pageCount;
std::uint64_t freeSpace =
safe_cast<std::uint64_t>(freePages) * pageSize;
JLOG(m_journal.info())
<< "Transaction DB pathname: " << dbPath.string()
<< "; file size: " << dbSize.value_or(-1) << " bytes"
<< "; SQLite page size: " << pageSize << " bytes"
<< "; Free pages: " << freePages
<< "; Free space: " << freeSpace << " bytes; "
<< "Note that this does not take into account available "
"disk "
"space.";
if (freeSpace < megabytes(512))
{
JLOG(m_journal.fatal())
<< "Free SQLite space for transaction db is less than "
"512MB. To fix this, rippled must be executed with "
"the "
"vacuum parameter before restarting. "
"Note that this activity can take multiple days, "
"depending on database size.";
signalStop();
}
}
signalStop();
}
// VFALCO NOTE Does the order of calls matter?
@@ -1294,7 +1140,9 @@ public:
#ifdef RIPPLED_REPORTING
if (config().reporting())
pgPool_->idleSweeper();
dynamic_cast<RelationalDBInterfacePostgres*>(
&*mRelationalDBInterface)
->sweep();
#endif
// Set timer to do another sweep later.
@@ -1359,8 +1207,6 @@ ApplicationImp::setup()
signalStop();
});
assert(mTxnDB == nullptr);
auto debug_log = config_->getDebugLogFile();
if (!debug_log.empty())
@@ -1485,7 +1331,7 @@ ApplicationImp::setup()
if (!config().reporting())
m_orderBookDB.setup(getLedgerMaster().getCurrentLedger());
nodeIdentity_ = loadNodeIdentity(*this);
nodeIdentity_ = getNodeIdentity(*this);
if (!cluster_->load(config().section(SECTION_CLUSTER_NODES)))
{
@@ -2271,25 +2117,9 @@ ApplicationImp::nodeToShards()
void
ApplicationImp::setMaxDisallowedLedger()
{
if (config().reporting())
{
#ifdef RIPPLED_REPORTING
auto seq = PgQuery(pgPool_)("SELECT max_ledger()");
if (seq && !seq.isNull())
maxDisallowedLedger_ = seq.asBigInt();
#endif
}
else
{
// SOCI requires boost::optional (not std::optional) as the parameter.
boost::optional<LedgerIndex> seq;
{
auto db = getLedgerDB().checkoutDb();
*db << "SELECT MAX(LedgerSeq) FROM Ledgers;", soci::into(seq);
}
if (seq)
maxDisallowedLedger_ = *seq;
}
auto seq = getRelationalDBInterface().getMaxLedgerSeq();
if (seq)
maxDisallowedLedger_ = *seq;
JLOG(m_journal.trace())
<< "Max persisted ledger is " << maxDisallowedLedger_;