diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index c50b2712..3d641502 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1515,13 +1515,6 @@ public: throw std::runtime_error("decrementing num outstanding below 0"); } size_t cur = (--numRequestsOutstanding_); - // sanity check - if (!canAddRequest()) - { - assert(false); - throw std::runtime_error( - "decremented num outstanding but can't add more"); - } { // mutex lock required to prevent race condition around spurious // wakeup diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index ed429dd8..661f58dd 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -317,7 +317,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) // Database must be populated when this starts std::optional -ReportingETL::runETLPipeline(uint32_t startSequence) +ReportingETL::runETLPipeline(uint32_t startSequence, int offset) { /* * Behold, mortals! This function spawns three separate threads, which talk @@ -363,9 +363,10 @@ ReportingETL::runETLPipeline(uint32_t startSequence) std::thread extracter{[this, &startSequence, &writeConflict, - &transformQueue]() { + &transformQueue, + &offset]() { beast::setCurrentThreadName("rippled: ReportingETL extract"); - uint32_t currentSequence = startSequence; + uint32_t currentSequence = startSequence + offset; // there are two stopping conditions here. // First, if there is a write conflict in the load thread, the ETL @@ -401,7 +402,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence) } transformQueue.push(std::move(fetchResponse)); - ++currentSequence; + currentSequence += offset; } // empty optional tells the transformer to shut down transformQueue.push({}); @@ -598,8 +599,11 @@ ReportingETL::monitor() << " . Beginning ETL"; // doContinousETLPipelined returns the most recent sequence // published empty optional if no sequence was published - std::optional lastPublished = - runETLPipeline(nextSequence); + std::optional lastPublished = nextSequence; + for (size_t i = 0; i < 10; ++i) + { + runETLPipeline(nextSequence, i); + } BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Aborting ETL. Falling back to publishing"; diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index 1873a66a..dfe3c3c7 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -166,7 +166,7 @@ private: /// @param startSequence the first ledger to extract /// @return the last ledger written to the database, if any std::optional - runETLPipeline(uint32_t startSequence); + runETLPipeline(uint32_t startSequence, int offset); /// Monitor the network for newly validated ledgers. Also monitor the /// database to see if any process is writing those ledgers. This function