merge master

This commit is contained in:
Nathan Nichols
2021-05-03 12:13:50 -05:00
16 changed files with 1416 additions and 573 deletions

View File

@@ -294,7 +294,24 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::move(bookDir));
}
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
bool success = flatMapBackend_->finishWrites(lgrInfo.seq);
accumTxns_ += rawData.transactions_list().transactions_size();
bool success = true;
if (accumTxns_ > txnThreshold_)
{
auto start = std::chrono::system_clock::now();
success = flatMapBackend_->finishWrites(lgrInfo.seq);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Accumulated " << std::to_string(accumTxns_)
<< " transactions. Wrote in " << std::to_string(duration)
<< " transactions per second = "
<< std::to_string(accumTxns_ / duration);
accumTxns_ = 0;
}
else
BOOST_LOG_TRIVIAL(info) << __func__ << " skipping commit";
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "
<< "Inserted/modified/deleted all objects. Number of objects = "
@@ -310,6 +327,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::optional<uint32_t>
ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
{
if (startSequence > finishSequence_)
return {};
/*
* Behold, mortals! This function spawns three separate threads, which talk
* to each other via 2 different thread safe queues and 1 atomic variable.
@@ -342,6 +361,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
assert(false);
throw std::runtime_error("runETLPipeline: parent ledger is null");
}
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populating caches";
flatMapBackend_->getIndexer().populateCaches(*flatMapBackend_);
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
<< "Populated caches";
std::atomic_bool writeConflict = false;
std::optional<uint32_t> lastPublishedSequence;
@@ -379,7 +403,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// ETL mechanism should stop. The other stopping condition is if
// the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function
while (networkValidatedLedgers_.waitUntilValidatedByNetwork(
while (currentSequence <= finishSequence_ &&
networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) &&
!writeConflict && !isStopping())
{
@@ -416,6 +441,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
transformQueue->push(std::move(fetchResponse));
currentSequence += numExtractors;
if (currentSequence > finishSequence_)
break;
}
// empty optional tells the transformer to shut down
transformQueue->push({});
@@ -497,6 +524,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
<< "Extracted and wrote " << *lastPublishedSequence - startSequence
<< " in " << ((end - begin).count()) / 1000000000.0;
writing_ = false;
flatMapBackend_->getIndexer().clearCaches();
BOOST_LOG_TRIVIAL(debug) << __func__ << " : "
<< "Stopping etl pipeline";
@@ -581,7 +609,6 @@ ReportingETL::monitor()
}
else
{
// publishLedger(ledger);
}
uint32_t nextSequence = latestSequence.value() + 1;
@@ -680,14 +707,18 @@ ReportingETL::ReportingETL(
networkValidatedLedgers_,
ioc)
{
flatMapBackend_->open();
if (config.contains("start_sequence"))
startSequence_ = config.at("start_sequence").as_int64();
if (config.contains("finish_sequence"))
finishSequence_ = config.at("finish_sequence").as_int64();
if (config.contains("read_only"))
readOnly_ = config.at("read_only").as_bool();
if (config.contains("online_delete"))
onlineDeleteInterval_ = config.at("online_delete").as_int64();
if (config.contains("extractor_threads"))
extractorThreads_ = config.at("extractor_threads").as_int64();
if (config.contains("txn_threshold"))
txnThreshold_ = config.at("txn_threshold").as_int64();
flatMapBackend_->open(readOnly_);
}