diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index b6bf4ef2..d9a7382d 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -1,6 +1,6 @@ -#include #include #include +#include #include /* namespace std { @@ -268,7 +268,9 @@ CassandraBackend::fetchAllTransactionHashesInLedger( CassandraResult result = executeSyncRead(statement); if (!result) { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; + BOOST_LOG_TRIVIAL(error) + << __func__ + << " - no rows . ledger = " << std::to_string(ledgerSequence); return {}; } std::vector hashes; diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 87ab9cb4..6828db91 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -26,9 +26,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -245,7 +245,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); assert(lgr); publishLedger(*lgr); - + return true; } } @@ -253,7 +253,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { continue; } - } return false; } @@ -516,6 +515,30 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) beast::setCurrentThreadName("rippled: ReportingETL transform"); uint32_t currentSequence = startSequence; + int counter = 0; + std::atomic_int per = 1; + auto startTimer = [this, &per]() { + std::cout << "calling outer"; + auto innerFunc = [this, &per](auto& f) -> void { + std::cout << "calling inner"; + std::shared_ptr timer = + std::make_shared( + ioContext_, + std::chrono::steady_clock::now() + + std::chrono::seconds(30)); + timer->async_wait( + [timer, f, &per](const boost::system::error_code& error) { + std::cout << "***incrementing per*****"; + ++per; + if (per > 100) + per = 100; + f(f); + }); + }; + innerFunc(innerFunc); + }; + startTimer(); + while (!writeConflict) { std::optional fetchResponse{ @@ -570,6 +593,11 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = false; }); } + if (++counter >= per) + { + std::this_thread::sleep_for(std::chrono::seconds(4)); + counter = 0; + } } }}; diff --git a/src/server/HttpBase.h b/src/server/HttpBase.h index 0a4ab512..44f420fd 100644 --- a/src/server/HttpBase.h +++ b/src/server/HttpBase.h @@ -174,8 +174,6 @@ handle_request( wsStyleRequest["command"] = request["method"]; - std::cout << "Transfromed to ws style stuff" << std::endl; - auto [builtResponse, cost] = buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr);