checkpoint. handles account_info

This commit is contained in:
CJ Cobb
2020-12-22 13:12:26 -05:00
parent 80a8ed8d1b
commit bd7ae08bcc
14 changed files with 432 additions and 284 deletions

View File

@@ -17,12 +17,10 @@
*/
//==============================================================================
#include <ripple/app/reporting/DBHelpers.h>
#include <ripple/app/reporting/ReportingETL.h>
#include <reporting/DBHelpers.h>
#include <reporting/ReportingETL.h>
#include <ripple/beast/core/CurrentThreadName.h>
#include <ripple/json/json_reader.h>
#include <ripple/json/json_writer.h>
#include <boost/asio/connect.hpp>
#include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp>
@@ -32,12 +30,10 @@
#include <string>
#include <variant>
namespace ripple {
namespace detail {
/// Convenience function for printing out basic ledger info
std::string
toString(LedgerInfo const& info)
toString(ripple::LedgerInfo const& info)
{
std::stringstream ss;
ss << "LedgerInfo { Sequence : " << info.seq
@@ -46,6 +42,29 @@ toString(LedgerInfo const& info)
<< " ParentHash : " << strHex(info.parentHash) << " }";
return ss.str();
}
ripple::LedgerInfo
deserializeHeader(ripple::Slice data)
{
ripple::SerialIter sit(data.data(), data.size());
ripple::LedgerInfo info;
info.seq = sit.get32();
info.drops = sit.get64();
info.parentHash = sit.get256();
info.txHash = sit.get256();
info.accountHash = sit.get256();
info.parentCloseTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTime =
ripple::NetClock::time_point{ripple::NetClock::duration{sit.get32()}};
info.closeTimeResolution = ripple::NetClock::duration{sit.get8()};
info.closeFlags = sit.get8();
info.hash = sit.get256();
return info;
}
} // namespace detail
std::vector<AccountTransactionsData>
@@ -68,31 +87,32 @@ ReportingETL::insertTransactions(
ripple::TxMeta txMeta{
sttx.getTransactionID(), ledger.seq, txn.metadata_blob()};
auto metaSerializer =
std::make_shared<Serializer>(txMeta.getAsObject().getSerializer());
auto metaSerializer = std::make_shared<ripple::Serializer>(
txMeta.getAsObject().getSerializer());
BOOST_LOG_TRIVIAL(trace)
<< __func__ << " : "
<< "Inserting transaction = " << sttx.getTransactionID();
ripple::uint256 nodestoreHash = sttx.getTransactionID();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal_);
auto journal = ripple::debugLog();
accountTxData.emplace_back(txMeta, std::move(nodestoreHash), journal);
std::string keyStr{(const char*)sttx.getTransactionID().data(), 32};
flatMapBackend_.storeTransaction(
std::move(keyStr),
ledger->info().seq,
ledger.seq,
std::move(*raw),
std::move(*txn.mutable_metadata_blob()));
}
return accountTxData;
}
std::shared_ptr<Ledger>
std::optional<ripple::LedgerInfo>
ReportingETL::loadInitialLedger(uint32_t startingSequence)
{
// check that database is actually empty
auto ledgers = getLedger(startingSequence);
if (ledgers.size())
auto ledger = getLedger(startingSequence, pgPool_);
if (ledger)
{
BOOST_LOG_TRIVIAL(fatal) << __func__ << " : "
<< "Database is not empty";
@@ -108,8 +128,8 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
if (!ledgerData)
return {};
ripple::LedgerInfo lgrInfo = ripple::deserializeHeader(
ripple::makeSlice(ledgerData->ledger_header()), true);
ripple::LedgerInfo lgrInfo = detail::deserializeHeader(
ripple::makeSlice(ledgerData->ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -134,19 +154,16 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence)
auto end = std::chrono::system_clock::now();
BOOST_LOG_TRIVIAL(debug) << "Time to download and store ledger = "
<< ((end - start).count()) / 1000000000.0;
return ledger;
return lgrInfo;
}
/*
void
ReportingETL::publishLedger(std::shared_ptr<Ledger>& ledger)
ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{
app_.getOPs().pubLedger(ledger);
// app_.getOPs().pubLedger(ledger);
setLastPublish();
}
*/
/*
bool
ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{
@@ -156,11 +173,11 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
size_t numAttempts = 0;
while (!stopping_)
{
auto ledger = app_.getLedgerMaster().getLedgerBySeq(ledgerSequence);
auto ledger = getLedger(ledgerSequence, pgPool_);
if (!ledger)
{
BOOST_LOG_TRIVIAL(warn)
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Trying to publish. Could not find ledger with sequence = "
<< ledgerSequence;
@@ -203,6 +220,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
continue;
}
/*
publishStrand_.post([this, ledger]() {
app_.getOPs().pubLedger(ledger);
setLastPublish();
@@ -210,11 +228,11 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
<< __func__ << " : "
<< "Published ledger. " << detail::toString(ledger->info());
});
*/
return true;
}
return false;
}
*/
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
ReportingETL::fetchLedgerData(uint32_t idx)
@@ -250,15 +268,15 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Beginning ledger update";
LedgerInfo lgrInfo =
deserializeHeader(makeSlice(rawData.ledger_header()), true);
ripple::LedgerInfo lgrInfo =
detail::deserializeHeader(ripple::makeSlice(rawData.ledger_header()));
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(rawData)};
insertTransactions(lgrInfo, rawData)};
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -269,7 +287,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
{
flatMapBackend_.store(
std::move(*obj.mutable_key()),
next->info().seq,
lgrInfo.seq,
std::move(*obj.mutable_data()));
}
flatMapBackend_.sync();
@@ -280,8 +298,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Finished ledger update. " << detail::toString(next->info());
return {info, std::move(accountTxData)};
<< "Finished ledger update. " << detail::toString(lgrInfo);
return {lgrInfo, std::move(accountTxData)};
}
// Database must be populated when this starts
@@ -313,11 +331,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
<< "Starting etl pipeline";
writing_ = true;
auto parent = getLedger(startSequence - 1);
auto parent = getLedger(startSequence - 1, pgPool_);
if (!parent)
{
assert(false);
Throw<std::runtime_error>("runETLPipeline: parent ledger is null");
throw std::runtime_error("runETLPipeline: parent ledger is null");
}
std::atomic_bool writeConflict = false;
@@ -375,105 +393,48 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
transformQueue.push({});
}};
std::thread transformer{
[this, &parent, &writeConflict, &loadQueue, &transformQueue]() {
beast::setCurrentThreadName("rippled: ReportingETL transform");
std::thread transformer{[this,
&writeConflict,
&transformQueue,
&lastPublishedSequence]() {
beast::setCurrentThreadName("rippled: ReportingETL transform");
while (!writeConflict)
{
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchResponse{transformQueue.pop()};
// if fetchResponse is an empty optional, the extracter thread
// has stopped and the transformer should stop as well
if (!fetchResponse)
{
break;
}
if (isStopping())
continue;
auto start = std::chrono::system_clock::now();
auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse);
auto end = std::chrono::system_clock::now();
if (!writeToPostgres(lgrInfo, accountTxData, pgPool_))
writeConflict = true;
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(debug) << "transform time = " << duration;
if (!writeConflict)
{
publishLedger(ledger);
lastPublishedSequence = lgrInfo.seq;
}
}
}};
std::thread loader{[this,
&lastPublishedSequence,
&loadQueue,
&writeConflict]() {
beast::setCurrentThreadName("rippled: ReportingETL load");
size_t totalTransactions = 0;
double totalTime = 0;
while (!writeConflict)
{
std::optional<std::pair<
std::shared_ptr<Ledger>,
std::vector<AccountTransactionsData>>>
result{loadQueue.pop()};
// if result is an empty optional, the transformer thread has
// stopped and the loader should stop as well
if (!result)
std::optional<org::xrpl::rpc::v1::GetLedgerResponse> fetchResponse{
transformQueue.pop()};
// if fetchResponse is an empty optional, the extracter thread
// has stopped and the transformer should stop as well
if (!fetchResponse)
{
break;
}
if (isStopping())
continue;
auto& ledger = result->first;
auto& accountTxData = result->second;
auto start = std::chrono::system_clock::now();
// write to the key-value store
// flushLedger(ledger);
auto mid = std::chrono::system_clock::now();
// write to RDBMS
// if there is a write conflict, some other process has already
// written this ledger and has taken over as the ETL writer
#ifdef RIPPLED_REPORTING
if (!writeToPostgres(
ledger->info(), accountTxData, app_.getPgPool(), journal_))
writeConflict = true;
#endif
auto [lgrInfo, accountTxData] = buildNextLedger(*fetchResponse);
auto end = std::chrono::system_clock::now();
if (!writeToPostgres(lgrInfo, accountTxData, pgPool_))
writeConflict = true;
if (!writeConflict)
{
publishLedger(ledger);
lastPublishedSequence = ledger->info().seq;
}
// print some performance numbers
auto kvTime = ((mid - start).count()) / 1000000000.0;
auto relationalTime = ((end - mid).count()) / 1000000000.0;
size_t numTxns = accountTxData.size();
totalTime += kvTime;
totalTransactions += numTxns;
auto duration = ((end - start).count()) / 1000000000.0;
auto numTxns = accountTxData.size();
BOOST_LOG_TRIVIAL(info)
<< "Load phase of etl : "
<< "Successfully published ledger! Ledger info: "
<< detail::toString(ledger->info())
<< ". txn count = " << numTxns
<< ". key-value write time = " << kvTime
<< ". relational write time = " << relationalTime
<< ". key-value tps = " << numTxns / kvTime
<< ". relational tps = " << numTxns / relationalTime
<< ". total key-value tps = " << totalTransactions / totalTime;
<< detail::toString(lgrInfo) << ". txn count = " << numTxns
<< ". load time = " << duration << ". load tps "
<< numTxns / duration;
if (!writeConflict)
{
publishLedger(lgrInfo);
lastPublishedSequence = lgrInfo.seq;
}
}
}};
// wait for all of the threads to stop
loader.join();
extracter.join();
transformer.join();
auto end = std::chrono::system_clock::now();
@@ -501,8 +462,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence)
void
ReportingETL::monitor()
{
auto ledgers = getLedger(std::monostate);
if (!ledgers.size())
auto ledger = getLedger(std::monostate(), pgPool_);
if (!ledger)
{
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Database is empty. Will download a ledger "
@@ -545,7 +506,7 @@ ReportingETL::monitor()
{
if (startSequence_)
{
Throw<std::runtime_error>(
throw std::runtime_error(
"start sequence specified but db is already populated");
}
BOOST_LOG_TRIVIAL(info)
@@ -563,7 +524,7 @@ ReportingETL::monitor()
{
// publishLedger(ledger);
}
uint32_t nextSequence = ledger->info().seq + 1;
uint32_t nextSequence = ledger->seq + 1;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
@@ -596,7 +557,7 @@ ReportingETL::monitor()
bool success = publishLedger(nextSequence, timeoutSeconds);
if (!success)
{
BOOST_LOG_TRIVIAL(warn)
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " : "
<< "Failed to publish ledger with sequence = " << nextSequence
<< " . Beginning ETL";
@@ -653,91 +614,17 @@ ReportingETL::ReportingETL(
boost::asio::io_context& ioc)
: publishStrand_(ioc)
, ioContext_(ioc)
, loadBalancer_(*this)
, flatMapBackend_(
(*config).at("database").as_object().at("cassandra").as_object())
config.at("database").as_object().at("cassandra").as_object())
, pgPool_(make_PgPool(
config.at("database").as_object().at("postgres").as_object()))
, loadBalancer_(
config.at("etl_sources").as_array(),
flatMapBackend_,
networkValidatedLedgers_,
ioc)
{
// if present, get endpoint from config
if (app_.config().exists("reporting"))
{
flatMapBackend_.open();
Section section = app_.config().section("reporting");
BOOST_LOG_TRIVIAL(debug) << "Parsing config info";
auto& vals = section.values();
for (auto& v : vals)
{
BOOST_LOG_TRIVIAL(debug) << "val is " << v;
Section source = app_.config().section(v);
std::pair<std::string, bool> ipPair = source.find("source_ip");
if (!ipPair.second)
continue;
std::pair<std::string, bool> wsPortPair =
source.find("source_ws_port");
if (!wsPortPair.second)
continue;
std::pair<std::string, bool> grpcPortPair =
source.find("source_grpc_port");
if (!grpcPortPair.second)
{
// add source without grpc port
// used in read-only mode to detect when new ledgers have
// been validated. Used for publishing
if (app_.config().reportingReadOnly())
loadBalancer_.add(ipPair.first, wsPortPair.first);
continue;
}
loadBalancer_.add(
ipPair.first, wsPortPair.first, grpcPortPair.first);
}
// this is true iff --reportingReadOnly was passed via command line
readOnly_ = app_.config().reportingReadOnly();
// if --reportingReadOnly was not passed via command line, check config
// file. Command line takes precedence
if (!readOnly_)
{
std::pair<std::string, bool> ro = section.find("read_only");
if (ro.second)
{
readOnly_ = (ro.first == "true" || ro.first == "1");
app_.config().setReportingReadOnly(readOnly_);
}
}
// handle command line arguments
if (app_.config().START_UP == Config::StartUpType::FRESH && !readOnly_)
{
startSequence_ = std::stol(app_.config().START_LEDGER);
}
// if not passed via command line, check config for start sequence
if (!startSequence_)
{
std::pair<std::string, bool> start = section.find("start_sequence");
if (start.second)
{
startSequence_ = std::stoi(start.first);
}
}
std::pair<std::string, bool> flushInterval =
section.find("flush_interval");
if (flushInterval.second)
flushInterval_ = std::stoi(flushInterval.first);
std::pair<std::string, bool> numMarkers = section.find("num_markers");
if (numMarkers.second)
numMarkers_ = std::stoi(numMarkers.first);
ReportingETL::instance_ = this;
}
flatMapBackend_.open();
initSchema(pgPool_);
}
ReportingETL* ReportingETL::instance_ = 0;
} // namespace ripple