From 6eb87bfaffc1507dd6dc03f6983992a88cbef8f1 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Wed, 24 Mar 2021 05:33:27 -0400 Subject: [PATCH] proper parallel extraction --- reporting/CassandraBackend.cpp | 2 +- reporting/CassandraBackend.h | 2 - reporting/ETLHelpers.h | 2 +- reporting/ReportingETL.cpp | 116 ++++++++++++++++++--------------- 4 files changed, 65 insertions(+), 57 deletions(-) diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index dcbec44f..0c4ac057 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -758,7 +758,7 @@ CassandraBackend::open() query = {}; query << " update " << tablePrefix << "ledger_range" - << " set sequence = ? where is_latest = ? if sequence != ?"; + << " set sequence = ? where is_latest = ?"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 3d641502..86c41384 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -751,13 +751,11 @@ public: CassandraStatement statement{updateLedgerRange_}; statement.bindInt(ledgerSequence_); statement.bindBoolean(false); - statement.bindInt(ledgerSequence_); executeSyncWrite(statement); } CassandraStatement statement{updateLedgerRange_}; statement.bindInt(ledgerSequence_); statement.bindBoolean(true); - statement.bindInt(ledgerSequence_); return executeSyncUpdate(statement); } void diff --git a/reporting/ETLHelpers.h b/reporting/ETLHelpers.h index 24593fc5..1f225a86 100644 --- a/reporting/ETLHelpers.h +++ b/reporting/ETLHelpers.h @@ -110,7 +110,7 @@ class ThreadSafeQueue public: /// @param maxSize maximum size of the queue. Calls that would cause the /// queue to exceed this size will block until free space is available - explicit ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) + ThreadSafeQueue(uint32_t maxSize) : maxSize_(maxSize) { } diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 661f58dd..518359a1 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -317,7 +317,7 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) // Database must be populated when this starts std::optional -ReportingETL::runETLPipeline(uint32_t startSequence, int offset) +ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) { /* * Behold, mortals! This function spawns three separate threads, which talk @@ -356,68 +356,80 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int offset) std::optional lastPublishedSequence; constexpr uint32_t maxQueueSize = 1000; auto begin = std::chrono::system_clock::now(); + using QueueType = + ThreadSafeQueue>; + std::vector> queues; - ThreadSafeQueue> - transformQueue{maxQueueSize}; + auto getNext = [&queues, &startSequence, &numExtractors]( + uint32_t sequence) -> std::shared_ptr { + std::cout << std::to_string((sequence - startSequence) % numExtractors); + return queues[(sequence - startSequence) % numExtractors]; + }; + std::vector threads; + for (size_t i = 1; i < numExtractors + 1; ++i) + { + auto transformQueue = std::make_shared(maxQueueSize); + queues.push_back(transformQueue); + std::cout << "added to queues"; - std::thread extracter{[this, - &startSequence, - &writeConflict, - &transformQueue, - &offset]() { - beast::setCurrentThreadName("rippled: ReportingETL extract"); - uint32_t currentSequence = startSequence + offset; + threads.emplace_back( + [this, &startSequence, &writeConflict, transformQueue, &i]() { + beast::setCurrentThreadName("rippled: ReportingETL extract"); + uint32_t currentSequence = startSequence + i; - // there are two stopping conditions here. - // First, if there is a write conflict in the load thread, the ETL - // mechanism should stop. - // The other stopping condition is if the entire server is shutting - // down. This can be detected in a variety of ways. See the comment - // at the top of the function - while (networkValidatedLedgers_.waitUntilValidatedByNetwork( - currentSequence) && - !writeConflict && !isStopping()) - { - auto start = std::chrono::system_clock::now(); - std::optional fetchResponse{ - fetchLedgerDataAndDiff(currentSequence)}; - auto end = std::chrono::system_clock::now(); + // there are two stopping conditions here. + // First, if there is a write conflict in the load thread, the + // ETL mechanism should stop. The other stopping condition is if + // the entire server is shutting down. This can be detected in a + // variety of ways. See the comment at the top of the function + while (networkValidatedLedgers_.waitUntilValidatedByNetwork( + currentSequence) && + !writeConflict && !isStopping()) + { + auto start = std::chrono::system_clock::now(); + std::optional + fetchResponse{fetchLedgerDataAndDiff(currentSequence)}; + auto end = std::chrono::system_clock::now(); - auto time = ((end - start).count()) / 1000000000.0; - auto tps = - fetchResponse->transactions_list().transactions_size() / time; + auto time = ((end - start).count()) / 1000000000.0; + auto tps = + fetchResponse->transactions_list().transactions_size() / + time; - BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time - << " . Extract phase tps = " << tps; - // if the fetch is unsuccessful, stop. fetchLedger only returns - // false if the server is shutting down, or if the ledger was - // found in the database (which means another process already - // wrote the ledger that this process was trying to extract; - // this is a form of a write conflict). Otherwise, - // fetchLedgerDataAndDiff will keep trying to fetch the - // specified ledger until successful - if (!fetchResponse) - { - break; - } + BOOST_LOG_TRIVIAL(info) << "Extract phase time = " << time + << " . Extract phase tps = " << tps; + // if the fetch is unsuccessful, stop. fetchLedger only + // returns false if the server is shutting down, or if the + // ledger was found in the database (which means another + // process already wrote the ledger that this process was + // trying to extract; this is a form of a write conflict). + // Otherwise, fetchLedgerDataAndDiff will keep trying to + // fetch the specified ledger until successful + if (!fetchResponse) + { + break; + } - transformQueue.push(std::move(fetchResponse)); - currentSequence += offset; - } - // empty optional tells the transformer to shut down - transformQueue.push({}); - }}; + transformQueue->push(std::move(fetchResponse)); + currentSequence += i; + } + // empty optional tells the transformer to shut down + transformQueue->push({}); + }); + } std::thread transformer{[this, &writeConflict, - &transformQueue, + &startSequence, + &getNext, &lastPublishedSequence]() { beast::setCurrentThreadName("rippled: ReportingETL transform"); + uint32_t currentSequence = startSequence; while (!writeConflict) { std::optional fetchResponse{ - transformQueue.pop()}; + getNext(currentSequence)->pop()}; // if fetchResponse is an empty optional, the extracter thread // has stopped and the transformer should stop as well if (!fetchResponse) @@ -467,7 +479,8 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int offset) }}; // wait for all of the threads to stop - extracter.join(); + for (auto& t : threads) + t.join(); transformer.join(); auto end = std::chrono::system_clock::now(); BOOST_LOG_TRIVIAL(debug) @@ -600,10 +613,7 @@ ReportingETL::monitor() // doContinousETLPipelined returns the most recent sequence // published empty optional if no sequence was published std::optional lastPublished = nextSequence; - for (size_t i = 0; i < 10; ++i) - { - runETLPipeline(nextSequence, i); - } + runETLPipeline(nextSequence, 10); BOOST_LOG_TRIVIAL(info) << __func__ << " : " << "Aborting ETL. Falling back to publishing";