diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 5601a1a9..1285f0ff 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -303,7 +303,24 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::move(bookDir)); } flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - bool success = flatMapBackend_->finishWrites(); + accumTxns_ += rawData.transactions_list().transactions_size(); + bool success = true; + if (accumTxns_ > txnThreshold_) + { + auto start = std::chrono::system_clock::now(); + success = flatMapBackend_->finishWrites(); + auto end = std::chrono::system_clock::now(); + + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " Accumulated " << std::to_string(accumTxns_) + << " transactions. Wrote in " << std::to_string(duration) + << " transactions per second = " + << std::to_string(accumTxns_ / duration); + accumTxns_ = 0; + } + else + BOOST_LOG_TRIVIAL(info) << __func__ << " skipping commit"; BOOST_LOG_TRIVIAL(debug) << __func__ << " : " << "Inserted/modified/deleted all objects. Number of objects = " @@ -705,5 +722,7 @@ ReportingETL::ReportingETL( onlineDeleteInterval_ = config.at("online_delete").as_int64(); if (config.contains("extractor_threads")) extractorThreads_ = config.at("extractor_threads").as_int64(); + if (config.contains("txn_threshold")) + txnThreshold_ = config.at("txn_threshold").as_int64(); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index ca04f020..5c4da180 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -133,6 +133,9 @@ private: std::optional startSequence_; std::optional finishSequence_; + size_t accumTxns_ = 0; + size_t txnThreshold_ = 0; + /// The time that the most recently published ledger was published. Used by /// server_info std::chrono::time_point lastPublish_;