Add Reporting Mode

* Add a new operating mode to rippled called reporting mode
* Add ETL mechanism for a reporting node to extract data from a p2p node
* Add new gRPC methods to faciliate ETL
* Use Postgres in place of SQLite in reporting mode
* Add Cassandra as a nodestore option
* Update logic of RPC handlers when running in reporting mode
* Add ability to forward RPCs to a p2p node
This commit is contained in:
CJ Cobb
2020-09-15 16:54:43 -04:00
committed by manojsdoshi
parent b0a39c5f86
commit 27543170d0
136 changed files with 9943 additions and 755 deletions

View File

@@ -33,6 +33,7 @@
#include <ripple/app/main/LoadManager.h>
#include <ripple/app/main/NodeIdentity.h>
#include <ripple/app/main/NodeStoreScheduler.h>
#include <ripple/app/main/Tuning.h>
#include <ripple/app/misc/AmendmentTable.h>
#include <ripple/app/misc/HashRouter.h>
#include <ripple/app/misc/LoadFeeTrack.h>
@@ -42,6 +43,7 @@
#include <ripple/app/misc/ValidatorKeys.h>
#include <ripple/app/misc/ValidatorSite.h>
#include <ripple/app/paths/PathRequests.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <ripple/app/tx/apply.h>
#include <ripple/basics/ByteUtilities.h>
#include <ripple/basics/PerfLog.h>
@@ -50,6 +52,7 @@
#include <ripple/beast/asio/io_latency_probe.h>
#include <ripple/beast/core/LexicalCast.h>
#include <ripple/core/DatabaseCon.h>
#include <ripple/core/Pg.h>
#include <ripple/core/Stoppable.h>
#include <ripple/json/json_reader.h>
#include <ripple/nodestore/DatabaseShard.h>
@@ -74,7 +77,10 @@
#include <condition_variable>
#include <cstring>
#include <iostream>
#include <limits>
#include <mutex>
#include <utility>
#include <variant>
namespace ripple {
@@ -158,6 +164,9 @@ public:
// Required by the SHAMapStore
TransactionMaster m_txMaster;
#ifdef RIPPLED_REPORTING
std::shared_ptr<PgPool> pgPool_;
#endif
NodeStoreScheduler m_nodeStoreScheduler;
std::unique_ptr<SHAMapStore> m_shaMapStore;
PendingSaves pendingSaves_;
@@ -225,6 +234,7 @@ public:
io_latency_sampler m_io_latency_sampler;
std::unique_ptr<GRPCServer> grpcServer_;
std::unique_ptr<ReportingETL> reportingETL_;
//--------------------------------------------------------------------------
@@ -269,9 +279,16 @@ public:
[this]() { signalStop(); }))
, m_txMaster(*this)
#ifdef RIPPLED_REPORTING
, pgPool_(
config_->reporting() ? make_PgPool(
config_->section("ledger_tx_tables"),
*this,
logs_->journal("PgPool"))
: nullptr)
#endif
, m_nodeStoreScheduler(*this)
, m_shaMapStore(make_SHAMapStore(
*this,
*this,
@@ -440,6 +457,7 @@ public:
std::chrono::milliseconds(100),
get_io_service())
, grpcServer_(std::make_unique<GRPCServer>(*this, *m_jobQueue))
, reportingETL_(std::make_unique<ReportingETL>(*this, *m_ledgerMaster))
{
add(m_resourceManager.get());
@@ -797,12 +815,16 @@ public:
OpenLedger&
openLedger() override
{
if (config_->reporting())
Throw<ReportingShouldProxy>();
return *openLedger_;
}
OpenLedger const&
openLedger() const override
{
if (config_->reporting())
Throw<ReportingShouldProxy>();
return *openLedger_;
}
@@ -832,6 +854,16 @@ public:
assert(mLedgerDB.get() != nullptr);
return *mLedgerDB;
}
#ifdef RIPPLED_REPORTING
std::shared_ptr<PgPool> const&
getPgPool() override
{
assert(pgPool_);
return pgPool_;
}
#endif
DatabaseCon&
getWalletDB() override
{
@@ -839,6 +871,13 @@ public:
return *mWalletDB;
}
ReportingETL&
getReportingETL() override
{
assert(reportingETL_.get() != nullptr);
return *reportingETL_;
}
bool
serverOkay(std::string& reason) override;
@@ -848,7 +887,7 @@ public:
//--------------------------------------------------------------------------
bool
initSQLiteDBs()
initRDBMS()
{
assert(mTxnDB.get() == nullptr);
assert(mLedgerDB.get() == nullptr);
@@ -857,59 +896,70 @@ public:
try
{
auto setup = setup_DatabaseCon(*config_, m_journal);
// transaction database
mTxnDB = std::make_unique<DatabaseCon>(
setup,
TxDBName,
TxDBPragma,
TxDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mTxnDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::txnDBCache)));
if (!setup.standAlone || setup.startUp == Config::LOAD ||
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
if (!config_->reporting())
{
// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
soci::indicator ind;
soci::statement st =
(mTxnDB->getSession().prepare
<< ("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name),
soci::into(type),
soci::into(notnull),
soci::into(dflt_value, ind),
soci::into(pk));
st.execute();
while (st.fetch())
if (config_->useTxTables())
{
if (pk == 1)
// transaction database
mTxnDB = std::make_unique<DatabaseCon>(
setup,
TxDBName,
TxDBPragma,
TxDBInit,
DatabaseCon::CheckpointerSetup{
m_jobQueue.get(), &logs()});
mTxnDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::txnDBCache)));
if (!setup.standAlone || setup.startUp == Config::LOAD ||
setup.startUp == Config::LOAD_FILE ||
setup.startUp == Config::REPLAY)
{
JLOG(m_journal.fatal())
<< "AccountTransactions database "
"should not have a primary key";
return false;
// Check if AccountTransactions has primary key
std::string cid, name, type;
std::size_t notnull, dflt_value, pk;
soci::indicator ind;
soci::statement st =
(mTxnDB->getSession().prepare
<< ("PRAGMA table_info(AccountTransactions);"),
soci::into(cid),
soci::into(name),
soci::into(type),
soci::into(notnull),
soci::into(dflt_value, ind),
soci::into(pk));
st.execute();
while (st.fetch())
{
if (pk == 1)
{
JLOG(m_journal.fatal())
<< "AccountTransactions database "
"should not have a primary key";
return false;
}
}
}
}
}
// ledger database
mLedgerDB = std::make_unique<DatabaseCon>(
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mLedgerDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::lgrDBCache)));
// ledger database
mLedgerDB = std::make_unique<DatabaseCon>(
setup,
LgrDBName,
LgrDBPragma,
LgrDBInit,
DatabaseCon::CheckpointerSetup{m_jobQueue.get(), &logs()});
mLedgerDB->getSession() << boost::str(
boost::format("PRAGMA cache_size=-%d;") %
kilobytes(config_->getValueFor(SizedItem::lgrDBCache)));
}
else if (!config_->reportingReadOnly()) // use pg
{
#ifdef RIPPLED_REPORTING
initSchema(pgPool_);
#endif
}
// wallet database
setup.useGlobalPragma = false;
@@ -922,7 +972,7 @@ public:
catch (std::exception const& e)
{
JLOG(m_journal.fatal())
<< "Failed to initialize SQLite databases: " << e.what();
<< "Failed to initialize SQL databases: " << e.what();
return false;
}
@@ -1150,53 +1200,58 @@ public:
signalStop();
}
DatabaseCon::Setup dbSetup = setup_DatabaseCon(*config_);
boost::filesystem::path dbPath = dbSetup.dataDir / TxDBName;
boost::system::error_code ec;
boost::optional<std::uint64_t> dbSize =
boost::filesystem::file_size(dbPath, ec);
if (ec)
if (!config_->reporting() && config_->useTxTables())
{
JLOG(m_journal.error())
<< "Error checking transaction db file size: "
<< ec.message();
dbSize.reset();
}
DatabaseCon::Setup dbSetup = setup_DatabaseCon(*config_);
boost::filesystem::path dbPath = dbSetup.dataDir / TxDBName;
boost::system::error_code ec;
boost::optional<std::uint64_t> dbSize =
boost::filesystem::file_size(dbPath, ec);
if (ec)
{
JLOG(m_journal.error())
<< "Error checking transaction db file size: "
<< ec.message();
dbSize.reset();
}
auto db = mTxnDB->checkoutDb();
static auto const pageSize = [&] {
std::uint32_t ps;
*db << "PRAGMA page_size;", soci::into(ps);
return ps;
}();
static auto const maxPages = [&] {
std::uint32_t mp;
*db << "PRAGMA max_page_count;", soci::into(mp);
return mp;
}();
std::uint32_t pageCount;
*db << "PRAGMA page_count;", soci::into(pageCount);
std::uint32_t freePages = maxPages - pageCount;
std::uint64_t freeSpace =
safe_cast<std::uint64_t>(freePages) * pageSize;
JLOG(m_journal.info())
<< "Transaction DB pathname: " << dbPath.string()
<< "; file size: " << dbSize.value_or(-1) << " bytes"
<< "; SQLite page size: " << pageSize << " bytes"
<< "; Free pages: " << freePages
<< "; Free space: " << freeSpace << " bytes; "
<< "Note that this does not take into account available disk "
"space.";
auto db = mTxnDB->checkoutDb();
static auto const pageSize = [&] {
std::uint32_t ps;
*db << "PRAGMA page_size;", soci::into(ps);
return ps;
}();
static auto const maxPages = [&] {
std::uint32_t mp;
*db << "PRAGMA max_page_count;", soci::into(mp);
return mp;
}();
std::uint32_t pageCount;
*db << "PRAGMA page_count;", soci::into(pageCount);
std::uint32_t freePages = maxPages - pageCount;
std::uint64_t freeSpace =
safe_cast<std::uint64_t>(freePages) * pageSize;
JLOG(m_journal.info())
<< "Transaction DB pathname: " << dbPath.string()
<< "; file size: " << dbSize.value_or(-1) << " bytes"
<< "; SQLite page size: " << pageSize << " bytes"
<< "; Free pages: " << freePages
<< "; Free space: " << freeSpace << " bytes; "
<< "Note that this does not take into account available "
"disk "
"space.";
if (freeSpace < megabytes(512))
{
JLOG(m_journal.fatal())
<< "Free SQLite space for transaction db is less than "
"512MB. To fix this, rippled must be executed with the "
"\"--vacuum\" parameter before restarting. "
"Note that this activity can take multiple days, "
"depending on database size.";
signalStop();
if (freeSpace < megabytes(512))
{
JLOG(m_journal.fatal())
<< "Free SQLite space for transaction db is less than "
"512MB. To fix this, rippled must be executed with "
"the "
"vacuum parameter before restarting. "
"Note that this activity can take multiple days, "
"depending on database size.";
signalStop();
}
}
}
@@ -1218,6 +1273,11 @@ public:
m_acceptedLedgerCache.sweep();
cachedSLEs_.expire();
#ifdef RIPPLED_REPORTING
if (config().reporting())
pgPool_->idleSweeper();
#endif
// Set timer to do another sweep later.
setSweepTimer();
}
@@ -1308,12 +1368,13 @@ ApplicationImp::setup()
// Optionally turn off logging to console.
logs_->silent(config_->silent());
m_jobQueue->setThreadCount(config_->WORKERS, config_->standalone());
m_jobQueue->setThreadCount(
config_->WORKERS, config_->standalone() && !config_->reporting());
if (!config_->standalone())
timeKeeper_->run(config_->SNTP_SERVERS);
if (!initSQLiteDBs() || !initNodeStore())
if (!initRDBMS() || !initNodeStore())
return false;
if (shardStore_)
@@ -1363,43 +1424,47 @@ ApplicationImp::setup()
Pathfinder::initPathTable();
auto const startUp = config_->START_UP;
if (startUp == Config::FRESH)
if (!config_->reporting())
{
JLOG(m_journal.info()) << "Starting new Ledger";
startGenesisLedger();
}
else if (
startUp == Config::LOAD || startUp == Config::LOAD_FILE ||
startUp == Config::REPLAY)
{
JLOG(m_journal.info()) << "Loading specified Ledger";
if (!loadOldLedger(
config_->START_LEDGER,
startUp == Config::REPLAY,
startUp == Config::LOAD_FILE))
if (startUp == Config::FRESH)
{
JLOG(m_journal.error())
<< "The specified ledger could not be loaded.";
return false;
JLOG(m_journal.info()) << "Starting new Ledger";
startGenesisLedger();
}
else if (
startUp == Config::LOAD || startUp == Config::LOAD_FILE ||
startUp == Config::REPLAY)
{
JLOG(m_journal.info()) << "Loading specified Ledger";
if (!loadOldLedger(
config_->START_LEDGER,
startUp == Config::REPLAY,
startUp == Config::LOAD_FILE))
{
JLOG(m_journal.error())
<< "The specified ledger could not be loaded.";
return false;
}
}
else if (startUp == Config::NETWORK)
{
// This should probably become the default once we have a stable
// network.
if (!config_->standalone())
m_networkOPs->setNeedNetworkLedger();
startGenesisLedger();
}
else
{
startGenesisLedger();
}
}
else if (startUp == Config::NETWORK)
{
// This should probably become the default once we have a stable
// network.
if (!config_->standalone())
m_networkOPs->setNeedNetworkLedger();
startGenesisLedger();
}
else
{
startGenesisLedger();
}
m_orderBookDB.setup(getLedgerMaster().getCurrentLedger());
if (!config().reporting())
m_orderBookDB.setup(getLedgerMaster().getCurrentLedger());
nodeIdentity_ = loadNodeIdentity(*this);
@@ -1409,42 +1474,47 @@ ApplicationImp::setup()
return false;
}
if (!config().reporting())
{
if (validatorKeys_.configInvalid())
return false;
if (!validatorManifests_->load(
getWalletDB(),
"ValidatorManifests",
validatorKeys_.manifest,
config().section(SECTION_VALIDATOR_KEY_REVOCATION).values()))
{
JLOG(m_journal.fatal()) << "Invalid configured validator manifest.";
return false;
if (validatorKeys_.configInvalid())
return false;
if (!validatorManifests_->load(
getWalletDB(),
"ValidatorManifests",
validatorKeys_.manifest,
config()
.section(SECTION_VALIDATOR_KEY_REVOCATION)
.values()))
{
JLOG(m_journal.fatal())
<< "Invalid configured validator manifest.";
return false;
}
publisherManifests_->load(getWalletDB(), "PublisherManifests");
// Setup trusted validators
if (!validators_->load(
validatorKeys_.publicKey,
config().section(SECTION_VALIDATORS).values(),
config().section(SECTION_VALIDATOR_LIST_KEYS).values()))
{
JLOG(m_journal.fatal())
<< "Invalid entry in validator configuration.";
return false;
}
}
publisherManifests_->load(getWalletDB(), "PublisherManifests");
// Setup trusted validators
if (!validators_->load(
validatorKeys_.publicKey,
config().section(SECTION_VALIDATORS).values(),
config().section(SECTION_VALIDATOR_LIST_KEYS).values()))
if (!validatorSites_->load(
config().section(SECTION_VALIDATOR_LIST_SITES).values()))
{
JLOG(m_journal.fatal())
<< "Invalid entry in validator configuration.";
<< "Invalid entry in [" << SECTION_VALIDATOR_LIST_SITES << "]";
return false;
}
}
if (!validatorSites_->load(
config().section(SECTION_VALIDATOR_LIST_SITES).values()))
{
JLOG(m_journal.fatal())
<< "Invalid entry in [" << SECTION_VALIDATOR_LIST_SITES << "]";
return false;
}
//----------------------------------------------------------------------
//
// Server
@@ -1456,17 +1526,20 @@ ApplicationImp::setup()
// move the instantiation inside a conditional:
//
// if (!config_.standalone())
overlay_ = make_Overlay(
*this,
setup_Overlay(*config_),
*m_jobQueue,
*serverHandler_,
*m_resourceManager,
*m_resolver,
get_io_service(),
*config_,
m_collectorManager->collector());
add(*overlay_); // add to PropertyStream
if (!config_->reporting())
{
overlay_ = make_Overlay(
*this,
setup_Overlay(*config_),
*m_jobQueue,
*serverHandler_,
*m_resourceManager,
*m_resolver,
get_io_service(),
*config_,
m_collectorManager->collector());
add(*overlay_); // add to PropertyStream
}
if (!config_->standalone())
{
@@ -1476,7 +1549,8 @@ ApplicationImp::setup()
}
// start first consensus round
if (!m_networkOPs->beginConsensus(
if (!config_->reporting() &&
!m_networkOPs->beginConsensus(
m_ledgerMaster->getClosedLedger()->info().hash))
{
JLOG(m_journal.fatal()) << "Unable to start consensus";
@@ -1622,6 +1696,11 @@ ApplicationImp::setup()
validatorSites_->start();
if (config_->reporting())
{
reportingETL_->run();
}
return true;
}
@@ -1696,7 +1775,8 @@ ApplicationImp::fdRequired() const
int needed = 128;
// 2x the configured peer limit for peer connections:
needed += 2 * overlay_->limit();
if (overlay_)
needed += 2 * overlay_->limit();
// the number of fds needed by the backend (internally
// doubled if online delete is enabled).
@@ -1743,8 +1823,7 @@ ApplicationImp::getLastFullLedger()
try
{
auto const [ledger, seq, hash] =
loadLedgerHelper("order by LedgerSeq desc limit 1", *this);
auto const [ledger, seq, hash] = getLatestLedger(*this);
if (!ledger)
return ledger;
@@ -1764,7 +1843,7 @@ ApplicationImp::getLastFullLedger()
{
stream << "Failed on ledger";
Json::Value p;
addJson(p, {*ledger, LedgerFill::full});
addJson(p, {*ledger, nullptr, LedgerFill::full});
stream << p;
}
@@ -2171,13 +2250,24 @@ ApplicationImp::nodeToShards()
void
ApplicationImp::setMaxDisallowedLedger()
{
boost::optional<LedgerIndex> seq;
if (config().reporting())
{
auto db = getLedgerDB().checkoutDb();
*db << "SELECT MAX(LedgerSeq) FROM Ledgers;", soci::into(seq);
#ifdef RIPPLED_REPORTING
auto seq = PgQuery(pgPool_)("SELECT max_ledger()");
if (seq && !seq.isNull())
maxDisallowedLedger_ = seq.asBigInt();
#endif
}
else
{
boost::optional<LedgerIndex> seq;
{
auto db = getLedgerDB().checkoutDb();
*db << "SELECT MAX(LedgerSeq) FROM Ledgers;", soci::into(seq);
}
if (seq)
maxDisallowedLedger_ = *seq;
}
if (seq)
maxDisallowedLedger_ = *seq;
JLOG(m_journal.trace())
<< "Max persisted ledger is " << maxDisallowedLedger_;