diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 2bbb2f17..0ca3b765 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -372,52 +372,63 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) queues.push_back(transformQueue); std::cout << "added to queues"; - threads.emplace_back( - [this, &startSequence, &writeConflict, transformQueue, i, numExtractors]() { - beast::setCurrentThreadName("rippled: ReportingETL extract"); - uint32_t currentSequence = startSequence + i; + threads.emplace_back([this, + &startSequence, + &writeConflict, + transformQueue, + i, + numExtractors]() { + beast::setCurrentThreadName("rippled: ReportingETL extract"); + uint32_t currentSequence = startSequence + i; - // there are two stopping conditions here. - // First, if there is a write conflict in the load thread, the - // ETL mechanism should stop. The other stopping condition is if - // the entire server is shutting down. This can be detected in a - // variety of ways. See the comment at the top of the function - while (networkValidatedLedgers_.waitUntilValidatedByNetwork( - currentSequence) && - !writeConflict && !isStopping()) + double totalTime = 0; + + // there are two stopping conditions here. + // First, if there is a write conflict in the load thread, the + // ETL mechanism should stop. The other stopping condition is if + // the entire server is shutting down. This can be detected in a + // variety of ways. See the comment at the top of the function + while (networkValidatedLedgers_.waitUntilValidatedByNetwork( + currentSequence) && + !writeConflict && !isStopping()) + { + auto start = std::chrono::system_clock::now(); + std::optional + fetchResponse{fetchLedgerDataAndDiff(currentSequence)}; + auto end = std::chrono::system_clock::now(); + + auto time = ((end - start).count()) / 1000000000.0; + totalTime += time; + + auto tps = + fetchResponse->transactions_list().transactions_size() / + time; + + BOOST_LOG_TRIVIAL(info) + << "Extract phase time = " << time + << " . Extract phase tps = " << tps + << " . Avg extract time = " + << totalTime / (currentSequence - startSequence + 1) + << " . thread num = " << i + << " . seq = " << currentSequence; + // if the fetch is unsuccessful, stop. fetchLedger only + // returns false if the server is shutting down, or if the + // ledger was found in the database (which means another + // process already wrote the ledger that this process was + // trying to extract; this is a form of a write conflict). + // Otherwise, fetchLedgerDataAndDiff will keep trying to + // fetch the specified ledger until successful + if (!fetchResponse) { - auto start = std::chrono::system_clock::now(); - std::optional - fetchResponse{fetchLedgerDataAndDiff(currentSequence)}; - auto end = std::chrono::system_clock::now(); - - auto time = ((end - start).count()) / 1000000000.0; - auto tps = - fetchResponse->transactions_list().transactions_size() / - time; - - BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time - << " . Extract phase tps = " << tps - << " . thread num = " << i - << " . seq = " << currentSequence; - // if the fetch is unsuccessful, stop. fetchLedger only - // returns false if the server is shutting down, or if the - // ledger was found in the database (which means another - // process already wrote the ledger that this process was - // trying to extract; this is a form of a write conflict). - // Otherwise, fetchLedgerDataAndDiff will keep trying to - // fetch the specified ledger until successful - if (!fetchResponse) - { - break; - } - - transformQueue->push(std::move(fetchResponse)); - currentSequence += numExtractors; + break; } - // empty optional tells the transformer to shut down - transformQueue->push({}); - }); + + transformQueue->push(std::move(fetchResponse)); + currentSequence += numExtractors; + } + // empty optional tells the transformer to shut down + transformQueue->push({}); + }); } std::thread transformer{[this,