add avg extract time

This commit is contained in:
CJ Cobb
2021-03-24 13:13:28 -04:00
parent 694111a9b7
commit 3062b5e678

View File

@@ -372,52 +372,63 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
queues.push_back(transformQueue);
std::cout << "added to queues";
threads.emplace_back(
[this, &startSequence, &writeConflict, transformQueue, i, numExtractors]() {
beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence + i;
threads.emplace_back([this,
&startSequence,
&writeConflict,
transformQueue,
i,
numExtractors]() {
beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence + i;
// there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the
// ETL mechanism should stop. The other stopping condition is if
// the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function
while (networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) &&
!writeConflict && !isStopping())
double totalTime = 0;
// there are two stopping conditions here.
// First, if there is a write conflict in the load thread, the
// ETL mechanism should stop. The other stopping condition is if
// the entire server is shutting down. This can be detected in a
// variety of ways. See the comment at the top of the function
while (networkValidatedLedgers_.waitUntilValidatedByNetwork(
currentSequence) &&
!writeConflict && !isStopping())
{
auto start = std::chrono::system_clock::now();
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchResponse{fetchLedgerDataAndDiff(currentSequence)};
auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0;
totalTime += time;
auto tps =
fetchResponse->transactions_list().transactions_size() /
time;
BOOST_LOG_TRIVIAL(info)
<< "Extract phase time = " << time
<< " . Extract phase tps = " << tps
<< " . Avg extract time = "
<< totalTime / (currentSequence - startSequence + 1)
<< " . thread num = " << i
<< " . seq = " << currentSequence;
// if the fetch is unsuccessful, stop. fetchLedger only
// returns false if the server is shutting down, or if the
// ledger was found in the database (which means another
// process already wrote the ledger that this process was
// trying to extract; this is a form of a write conflict).
// Otherwise, fetchLedgerDataAndDiff will keep trying to
// fetch the specified ledger until successful
if (!fetchResponse)
{
auto start = std::chrono::system_clock::now();
std::optional<org::xrpl::rpc::v1::GetLedgerResponse>
fetchResponse{fetchLedgerDataAndDiff(currentSequence)};
auto end = std::chrono::system_clock::now();
auto time = ((end - start).count()) / 1000000000.0;
auto tps =
fetchResponse->transactions_list().transactions_size() /
time;
BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time
<< " . Extract phase tps = " << tps
<< " . thread num = " << i
<< " . seq = " << currentSequence;
// if the fetch is unsuccessful, stop. fetchLedger only
// returns false if the server is shutting down, or if the
// ledger was found in the database (which means another
// process already wrote the ledger that this process was
// trying to extract; this is a form of a write conflict).
// Otherwise, fetchLedgerDataAndDiff will keep trying to
// fetch the specified ledger until successful
if (!fetchResponse)
{
break;
}
transformQueue->push(std::move(fetchResponse));
currentSequence += numExtractors;
break;
}
// empty optional tells the transformer to shut down
transformQueue->push({});
});
transformQueue->push(std::move(fetchResponse));
currentSequence += numExtractors;
}
// empty optional tells the transformer to shut down
transformQueue->push({});
});
}
std::thread transformer{[this,