diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index b6bf4ef2..6815f2b5 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -1,6 +1,6 @@ -#include #include #include +#include #include /* namespace std { @@ -268,7 +268,9 @@ CassandraBackend::fetchAllTransactionHashesInLedger( CassandraResult result = executeSyncRead(statement); if (!result) { - BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; + BOOST_LOG_TRIVIAL(error) + << __func__ + << " - no rows . ledger = " << std::to_string(ledgerSequence); return {}; } std::vector hashes; @@ -742,7 +744,7 @@ CassandraBackend::writeKeys( std::vector> cbs; cbs.reserve(keys.size()); uint32_t concurrentLimit = - isAsync ? indexerMaxRequestsOutstanding : keys.size(); + isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding; BOOST_LOG_TRIVIAL(debug) << __func__ << " Ledger = " << std::to_string(index.keyIndex) << " . num keys = " << std::to_string(keys.size()) diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 87ab9cb4..7ee872f0 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -26,9 +26,9 @@ #include #include #include -#include #include #include +#include #include #include @@ -173,9 +173,22 @@ ReportingETL::getFees(std::uint32_t seq) void ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { - auto ledgerRange = backend_->fetchLedgerRange(); + auto ledgerRange = backend_->fetchLedgerRangeNoThrow(); + auto fees = getFees(lgrInfo.seq); - auto transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); + std::vector transactions; + while (true) + { + try + { + transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); + break; + } + catch (Backend::DatabaseTimeout const&) + { + BOOST_LOG_TRIVIAL(warning) << "Read timeout fetching transactions"; + } + } if (!fees || !ledgerRange) { @@ -245,7 +258,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); assert(lgr); publishLedger(*lgr); - + return true; } } @@ -253,7 +266,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts) { continue; } - } return false; } @@ -553,7 +565,9 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // success is false if the ledger was already written if (success) { - publishLedger(lgrInfo); + boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { + publishLedger(lgrInfo); + }); lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; diff --git a/src/server/HttpBase.h b/src/server/HttpBase.h index 0a4ab512..44f420fd 100644 --- a/src/server/HttpBase.h +++ b/src/server/HttpBase.h @@ -174,8 +174,6 @@ handle_request( wsStyleRequest["command"] = request["method"]; - std::cout << "Transfromed to ws style stuff" << std::endl; - auto [builtResponse, cost] = buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr);