diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index 36d2d392d..630ffaa07 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -158,11 +158,11 @@ checkResult(PgResult const& res, std::uint32_t const numFieldsExpected) if (!res) { auto msg = res.msg(); - BOOST_LOG_TRIVIAL(debug) << msg; + BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg; if (msg.find("statement timeout")) throw DatabaseTimeout(); assert(false); - throw std::runtime_error(msg); + throw DatabaseTimeout(); } if (res.status() != PGRES_TUPLES_OK) { @@ -170,8 +170,9 @@ checkResult(PgResult const& res, std::uint32_t const numFieldsExpected) msg << " : Postgres response should have been " "PGRES_TUPLES_OK but instead was " << res.status() << " - msg = " << res.msg(); + BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg.str(); assert(false); - throw std::runtime_error(msg.str()); + throw DatabaseTimeout(); } BOOST_LOG_TRIVIAL(trace) @@ -461,17 +462,16 @@ PostgresBackend::fetchTransactions( auto hw = new HandlerWrapper(std::move(handler)); auto start = std::chrono::system_clock::now(); - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; std::atomic_uint numRemaining = hashes.size(); + std::atomic_bool errored = false; for (size_t i = 0; i < hashes.size(); ++i) { auto const& hash = hashes[i]; boost::asio::spawn( get_associated_executor(yield), - [this, &hash, &results, hw, &numRemaining, i]( + [this, &hash, &results, hw, &numRemaining, &errored, i]( boost::asio::yield_context yield) { BOOST_LOG_TRIVIAL(trace) << __func__ << " getting txn = " << i; @@ -483,14 +483,21 @@ PostgresBackend::fetchTransactions( "WHERE HASH = \'\\x" << ripple::strHex(hash) << "\'"; - if (auto const res = pgQuery(sql.str().data(), yield); - checkResult(res, 4)) + try { - results[i] = { - res.asUnHexedBlob(0, 0), - res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2), - res.asBigInt(0, 3)}; + if (auto const res = pgQuery(sql.str().data(), yield); + checkResult(res, 4)) + { + results[i] = { + res.asUnHexedBlob(0, 0), + res.asUnHexedBlob(0, 1), + res.asBigInt(0, 2), + res.asBigInt(0, 3)}; + } + } + catch (DatabaseTimeout const&) + { + errored = true; } if (--numRemaining == 0) @@ -506,12 +513,19 @@ PostgresBackend::fetchTransactions( delete hw; - auto end2 = std::chrono::system_clock::now(); - duration = ((end2 - end).count()) / 1000000000.0; + auto end = std::chrono::system_clock::now(); + auto duration = + std::chrono::duration_cast(end - start); BOOST_LOG_TRIVIAL(info) << __func__ << " fetched " << std::to_string(hashes.size()) - << " transactions with threadpool. took " << std::to_string(duration); + << " transactions asynchronously. took " + << std::to_string(duration.count()); + if (errored) + { + BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out"; + throw DatabaseTimeout(); + } return results; } @@ -537,13 +551,14 @@ PostgresBackend::doFetchLedgerObjects( auto hw = new HandlerWrapper(std::move(handler)); std::atomic_uint numRemaining = keys.size(); + std::atomic_bool errored = false; auto start = std::chrono::system_clock::now(); for (size_t i = 0; i < keys.size(); ++i) { auto const& key = keys[i]; boost::asio::spawn( boost::asio::get_associated_executor(yield), - [this, &key, &results, &numRemaining, hw, i, sequence]( + [this, &key, &results, &numRemaining, &errored, hw, i, sequence]( boost::asio::yield_context yield) { PgQuery pgQuery(pgPool_); @@ -555,9 +570,16 @@ PostgresBackend::doFetchLedgerObjects( << " AND ledger_seq <= " << std::to_string(sequence) << " ORDER BY ledger_seq DESC LIMIT 1"; - if (auto const res = pgQuery(sql.str().data(), yield); - checkResult(res, 1)) - results[i] = res.asUnHexedBlob(); + try + { + if (auto const res = pgQuery(sql.str().data(), yield); + checkResult(res, 1)) + results[i] = res.asUnHexedBlob(); + } + catch (DatabaseTimeout const& ex) + { + errored = true; + } if (--numRemaining == 0) { @@ -573,11 +595,17 @@ PostgresBackend::doFetchLedgerObjects( delete hw; auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; + auto duration = + std::chrono::duration_cast(end - start); BOOST_LOG_TRIVIAL(info) << __func__ << " fetched " << std::to_string(keys.size()) - << " objects with threadpool. took " << std::to_string(duration); + << " objects asynchronously. ms = " << std::to_string(duration.count()); + if (errored) + { + BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out"; + throw DatabaseTimeout(); + } return results; } diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index cbb54198a..5791f9386 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -838,50 +838,51 @@ ReportingETL::monitor() << __func__ << " : " << "Database is populated. " << "Starting monitor loop. sequence = " << nextSequence; - while (!stopping_ && - networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence)) + while (true) { - BOOST_LOG_TRIVIAL(info) << __func__ << " : " - << "Ledger with sequence = " << nextSequence - << " has been validated by the network. " - << "Attempting to find in database and publish"; - // Attempt to take over responsibility of ETL writer after 2 failed - // attempts to publish the ledger. publishLedger() fails if the - // ledger that has been validated by the network is not found in the - // database after the specified number of attempts. publishLedger() - // waits one second between each attempt to read the ledger from the - // database - // - // In strict read-only mode, when the software fails to find a - // ledger in the database that has been validated by the network, - // the software will only try to publish subsequent ledgers once, - // until one of those ledgers is found in the database. Once the - // software successfully publishes a ledger, the software will fall - // back to the normal behavior of trying several times to publish - // the ledger that has been validated by the network. In this - // manner, a reporting processing running in read-only mode does not - // need to restart if the database is wiped. - constexpr size_t timeoutSeconds = 2; - bool success = publishLedger(nextSequence, timeoutSeconds); - if (!success) + if (Backend::synchronousAndRetryOnTimeout([&](auto yield) { + return backend_->fetchLedgerBySequence(nextSequence, yield); + })) + { + publishLedger(nextSequence, {}); + ++nextSequence; + } + else if (networkValidatedLedgers_->waitUntilValidatedByNetwork( + nextSequence, 1000)) { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " : " - << "Failed to publish ledger with sequence = " << nextSequence - << " . Beginning ETL"; - // doContinousETLPipelined returns the most recent sequence - // published empty optional if no sequence was published - std::optional lastPublished = - runETLPipeline(nextSequence, extractorThreads_); BOOST_LOG_TRIVIAL(info) << __func__ << " : " - << "Aborting ETL. Falling back to publishing"; - // if no ledger was published, don't increment nextSequence - if (lastPublished) - nextSequence = *lastPublished + 1; + << "Ledger with sequence = " << nextSequence + << " has been validated by the network. " + << "Attempting to find in database and publish"; + // Attempt to take over responsibility of ETL writer after 2 failed + // attempts to publish the ledger. publishLedger() fails if the + // ledger that has been validated by the network is not found in the + // database after the specified number of attempts. publishLedger() + // waits one second between each attempt to read the ledger from the + // database + constexpr size_t timeoutSeconds = 2; + bool success = publishLedger(nextSequence, timeoutSeconds); + if (!success) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " : " + << "Failed to publish ledger with sequence = " + << nextSequence << " . Beginning ETL"; + // doContinousETLPipelined returns the most recent sequence + // published empty optional if no sequence was published + std::optional lastPublished = + runETLPipeline(nextSequence, extractorThreads_); + BOOST_LOG_TRIVIAL(info) + << __func__ << " : " + << "Aborting ETL. Falling back to publishing"; + // if no ledger was published, don't increment nextSequence + if (lastPublished) + nextSequence = *lastPublished + 1; + } + else + ++nextSequence; } - else - ++nextSequence; } }