numExtractors is config

This commit is contained in:
Ubuntu
2021-03-24 17:03:15 +00:00
parent 6eb87bfaff
commit 694111a9b7
2 changed files with 10 additions and 5 deletions

View File

@@ -366,14 +366,14 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
return queues[(sequence - startSequence) % numExtractors]; return queues[(sequence - startSequence) % numExtractors];
}; };
std::vector<std::thread> threads; std::vector<std::thread> threads;
for (size_t i = 1; i < numExtractors + 1; ++i) for (size_t i = 0; i < numExtractors; ++i)
{ {
auto transformQueue = std::make_shared<QueueType>(maxQueueSize); auto transformQueue = std::make_shared<QueueType>(maxQueueSize);
queues.push_back(transformQueue); queues.push_back(transformQueue);
std::cout << "added to queues"; std::cout << "added to queues";
threads.emplace_back( threads.emplace_back(
[this, &startSequence, &writeConflict, transformQueue, &i]() { [this, &startSequence, &writeConflict, transformQueue, i, numExtractors]() {
beast::setCurrentThreadName("rippled: ReportingETL extract"); beast::setCurrentThreadName("rippled: ReportingETL extract");
uint32_t currentSequence = startSequence + i; uint32_t currentSequence = startSequence + i;
@@ -397,7 +397,9 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
time; time;
BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time
<< " . Extract phase tps = " << tps; << " . Extract phase tps = " << tps
<< " . thread num = " << i
<< " . seq = " << currentSequence;
// if the fetch is unsuccessful, stop. fetchLedger only // if the fetch is unsuccessful, stop. fetchLedger only
// returns false if the server is shutting down, or if the // returns false if the server is shutting down, or if the
// ledger was found in the database (which means another // ledger was found in the database (which means another
@@ -411,7 +413,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
} }
transformQueue->push(std::move(fetchResponse)); transformQueue->push(std::move(fetchResponse));
currentSequence += i; currentSequence += numExtractors;
} }
// empty optional tells the transformer to shut down // empty optional tells the transformer to shut down
transformQueue->push({}); transformQueue->push({});
@@ -613,7 +615,7 @@ ReportingETL::monitor()
// doContinousETLPipelined returns the most recent sequence // doContinousETLPipelined returns the most recent sequence
// published empty optional if no sequence was published // published empty optional if no sequence was published
std::optional<uint32_t> lastPublished = nextSequence; std::optional<uint32_t> lastPublished = nextSequence;
runETLPipeline(nextSequence, 10); runETLPipeline(nextSequence, extractorThreads_);
BOOST_LOG_TRIVIAL(info) BOOST_LOG_TRIVIAL(info)
<< __func__ << " : " << __func__ << " : "
<< "Aborting ETL. Falling back to publishing"; << "Aborting ETL. Falling back to publishing";
@@ -677,5 +679,7 @@ ReportingETL::ReportingETL(
readOnly_ = config.at("read_only").as_bool(); readOnly_ = config.at("read_only").as_bool();
if (config.contains("online_delete")) if (config.contains("online_delete"))
onlineDeleteInterval_ = config.at("online_delete").as_int64(); onlineDeleteInterval_ = config.at("online_delete").as_int64();
if (config.contains("extractor_threads"))
extractorThreads_ = config.at("extractor_threads").as_int64();
} }

View File

@@ -61,6 +61,7 @@ class ReportingETL
private: private:
std::unique_ptr<BackendInterface> flatMapBackend_; std::unique_ptr<BackendInterface> flatMapBackend_;
std::optional<uint32_t> onlineDeleteInterval_; std::optional<uint32_t> onlineDeleteInterval_;
uint32_t extractorThreads_ = 1;
std::thread worker_; std::thread worker_;
boost::asio::io_context& ioContext_; boost::asio::io_context& ioContext_;