diff --git a/src/backend/BackendIndexer.cpp b/src/backend/BackendIndexer.cpp index 4aa17fa9..f3ddf8c5 100644 --- a/src/backend/BackendIndexer.cpp +++ b/src/backend/BackendIndexer.cpp @@ -209,7 +209,6 @@ BackendIndexer::finish(uint32_t ledgerSequence, BackendInterface const& backend) BOOST_LOG_TRIVIAL(debug) << __func__ << " starting. sequence = " << std::to_string(ledgerSequence); - bool isFirst = false; auto keyIndex = getKeyIndexOfSeq(ledgerSequence); if (isFirst_) { diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index d45e1eb3..439caa7f 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -7,6 +7,7 @@ template void processAsyncWriteResponse(T& requestParams, CassFuture* fut, F func) { + BOOST_LOG_TRIVIAL(debug) << __func__ << " Processing async write response"; CassandraBackend const& backend = *requestParams.backend; auto rc = cass_future_error_code(fut); if (rc != CASS_OK) @@ -39,6 +40,7 @@ template void processAsyncWrite(CassFuture* fut, void* cbData) { + BOOST_LOG_TRIVIAL(debug) << __func__ << " processing async write"; T& requestParams = *static_cast(cbData); // TODO don't pass in func processAsyncWriteResponse(requestParams, fut, requestParams.retry); @@ -56,6 +58,7 @@ struct WriteCallbackData WriteCallbackData(CassandraBackend const* b, T&& d, B bind) : backend(b), data(std::move(d)) { + BOOST_LOG_TRIVIAL(debug) << "Making WriteCallbackData"; retry = [bind, this](auto& params, bool isRetry) { auto statement = bind(params); backend->executeAsyncWrite( @@ -69,7 +72,10 @@ struct WriteCallbackData virtual void start() { + BOOST_LOG_TRIVIAL(debug) << "Starting"; + BOOST_LOG_TRIVIAL(debug) << "address is " << this; retry(*this, false); + BOOST_LOG_TRIVIAL(debug) << "Started"; } virtual void @@ -82,6 +88,7 @@ struct WriteCallbackData } virtual ~WriteCallbackData() { + BOOST_LOG_TRIVIAL(debug) << __func__; } }; template @@ -342,8 +349,11 @@ CassandraBackend::fetchTransactions( statement.bindBytes(hashes[i]); cbs.push_back(std::make_shared( numOutstanding, mtx, cv, [i, &results](auto& result) { - results[i] = { - result.getBytes(), result.getBytes(), result.getUInt32()}; + if (result.hasResult()) + results[i] = { + result.getBytes(), + result.getBytes(), + result.getUInt32()}; })); executeAsyncRead(statement, processAsyncRead, *cbs[i]); } @@ -480,7 +490,8 @@ CassandraBackend::fetchLedgerObjects( { cbs.push_back(std::make_shared( numOutstanding, mtx, cv, [i, &results](auto& result) { - results[i] = result.getBytes(); + if (result.hasResult()) + results[i] = result.getBytes(); })); CassandraStatement statement{selectObject_}; statement.bindBytes(keys[i]); @@ -515,7 +526,7 @@ CassandraBackend::writeKeys( statement.bindBytes(key); return statement; }; - std::atomic_int numOutstanding = keys.size(); + std::atomic_int numOutstanding = 0; std::condition_variable cv; std::mutex mtx; std::vector void diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 0cb2a7a7..b577930e 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -114,10 +114,13 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) << "Deserialized ledger header. " << detail::toString(lgrInfo); backend_->startWrites(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " started writes"; backend_->writeLedger( lgrInfo, std::move(*ledgerData->mutable_ledger_header()), true); + BOOST_LOG_TRIVIAL(debug) << __func__ << " wrote ledger"; std::vector accountTxData = insertTransactions(lgrInfo, *ledgerData); + BOOST_LOG_TRIVIAL(debug) << __func__ << " inserted txns"; auto start = std::chrono::system_clock::now(); @@ -126,6 +129,7 @@ ReportingETL::loadInitialLedger(uint32_t startingSequence) // consumes from the queue and inserts the data into the Ledger object. // Once the below call returns, all data has been pushed into the queue loadBalancer_->loadInitialLedger(startingSequence); + BOOST_LOG_TRIVIAL(debug) << __func__ << " loaded initial ledger"; if (!stopping_) {