mirror of
https://github.com/XRPLF/clio.git
synced 2026-04-29 15:37:53 +00:00
Catch database timeout in coroutine
This commit is contained in:
@@ -158,11 +158,11 @@ checkResult(PgResult const& res, std::uint32_t const numFieldsExpected)
|
||||
if (!res)
|
||||
{
|
||||
auto msg = res.msg();
|
||||
BOOST_LOG_TRIVIAL(debug) << msg;
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg;
|
||||
if (msg.find("statement timeout"))
|
||||
throw DatabaseTimeout();
|
||||
assert(false);
|
||||
throw std::runtime_error(msg);
|
||||
throw DatabaseTimeout();
|
||||
}
|
||||
if (res.status() != PGRES_TUPLES_OK)
|
||||
{
|
||||
@@ -170,8 +170,9 @@ checkResult(PgResult const& res, std::uint32_t const numFieldsExpected)
|
||||
msg << " : Postgres response should have been "
|
||||
"PGRES_TUPLES_OK but instead was "
|
||||
<< res.status() << " - msg = " << res.msg();
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " - " << msg.str();
|
||||
assert(false);
|
||||
throw std::runtime_error(msg.str());
|
||||
throw DatabaseTimeout();
|
||||
}
|
||||
|
||||
BOOST_LOG_TRIVIAL(trace)
|
||||
@@ -461,17 +462,16 @@ PostgresBackend::fetchTransactions(
|
||||
auto hw = new HandlerWrapper(std::move(handler));
|
||||
|
||||
auto start = std::chrono::system_clock::now();
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration = ((end - start).count()) / 1000000000.0;
|
||||
|
||||
std::atomic_uint numRemaining = hashes.size();
|
||||
std::atomic_bool errored = false;
|
||||
|
||||
for (size_t i = 0; i < hashes.size(); ++i)
|
||||
{
|
||||
auto const& hash = hashes[i];
|
||||
boost::asio::spawn(
|
||||
get_associated_executor(yield),
|
||||
[this, &hash, &results, hw, &numRemaining, i](
|
||||
[this, &hash, &results, hw, &numRemaining, &errored, i](
|
||||
boost::asio::yield_context yield) {
|
||||
BOOST_LOG_TRIVIAL(trace) << __func__ << " getting txn = " << i;
|
||||
|
||||
@@ -483,14 +483,21 @@ PostgresBackend::fetchTransactions(
|
||||
"WHERE HASH = \'\\x"
|
||||
<< ripple::strHex(hash) << "\'";
|
||||
|
||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
||||
checkResult(res, 4))
|
||||
try
|
||||
{
|
||||
results[i] = {
|
||||
res.asUnHexedBlob(0, 0),
|
||||
res.asUnHexedBlob(0, 1),
|
||||
res.asBigInt(0, 2),
|
||||
res.asBigInt(0, 3)};
|
||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
||||
checkResult(res, 4))
|
||||
{
|
||||
results[i] = {
|
||||
res.asUnHexedBlob(0, 0),
|
||||
res.asUnHexedBlob(0, 1),
|
||||
res.asBigInt(0, 2),
|
||||
res.asBigInt(0, 3)};
|
||||
}
|
||||
}
|
||||
catch (DatabaseTimeout const&)
|
||||
{
|
||||
errored = true;
|
||||
}
|
||||
|
||||
if (--numRemaining == 0)
|
||||
@@ -506,12 +513,19 @@ PostgresBackend::fetchTransactions(
|
||||
|
||||
delete hw;
|
||||
|
||||
auto end2 = std::chrono::system_clock::now();
|
||||
duration = ((end2 - end).count()) / 1000000000.0;
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
|
||||
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " fetched " << std::to_string(hashes.size())
|
||||
<< " transactions with threadpool. took " << std::to_string(duration);
|
||||
<< " transactions asynchronously. took "
|
||||
<< std::to_string(duration.count());
|
||||
if (errored)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out";
|
||||
throw DatabaseTimeout();
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
@@ -537,13 +551,14 @@ PostgresBackend::doFetchLedgerObjects(
|
||||
auto hw = new HandlerWrapper(std::move(handler));
|
||||
|
||||
std::atomic_uint numRemaining = keys.size();
|
||||
std::atomic_bool errored = false;
|
||||
auto start = std::chrono::system_clock::now();
|
||||
for (size_t i = 0; i < keys.size(); ++i)
|
||||
{
|
||||
auto const& key = keys[i];
|
||||
boost::asio::spawn(
|
||||
boost::asio::get_associated_executor(yield),
|
||||
[this, &key, &results, &numRemaining, hw, i, sequence](
|
||||
[this, &key, &results, &numRemaining, &errored, hw, i, sequence](
|
||||
boost::asio::yield_context yield) {
|
||||
PgQuery pgQuery(pgPool_);
|
||||
|
||||
@@ -555,9 +570,16 @@ PostgresBackend::doFetchLedgerObjects(
|
||||
<< " AND ledger_seq <= " << std::to_string(sequence)
|
||||
<< " ORDER BY ledger_seq DESC LIMIT 1";
|
||||
|
||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
||||
checkResult(res, 1))
|
||||
results[i] = res.asUnHexedBlob();
|
||||
try
|
||||
{
|
||||
if (auto const res = pgQuery(sql.str().data(), yield);
|
||||
checkResult(res, 1))
|
||||
results[i] = res.asUnHexedBlob();
|
||||
}
|
||||
catch (DatabaseTimeout const& ex)
|
||||
{
|
||||
errored = true;
|
||||
}
|
||||
|
||||
if (--numRemaining == 0)
|
||||
{
|
||||
@@ -573,11 +595,17 @@ PostgresBackend::doFetchLedgerObjects(
|
||||
delete hw;
|
||||
|
||||
auto end = std::chrono::system_clock::now();
|
||||
auto duration = ((end - start).count()) / 1000000000.0;
|
||||
auto duration =
|
||||
std::chrono::duration_cast<std::chrono::milliseconds>(end - start);
|
||||
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " fetched " << std::to_string(keys.size())
|
||||
<< " objects with threadpool. took " << std::to_string(duration);
|
||||
<< " objects asynchronously. ms = " << std::to_string(duration.count());
|
||||
if (errored)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(error) << __func__ << " Database fetch timed out";
|
||||
throw DatabaseTimeout();
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
@@ -838,50 +838,51 @@ ReportingETL::monitor()
|
||||
<< __func__ << " : "
|
||||
<< "Database is populated. "
|
||||
<< "Starting monitor loop. sequence = " << nextSequence;
|
||||
while (!stopping_ &&
|
||||
networkValidatedLedgers_->waitUntilValidatedByNetwork(nextSequence))
|
||||
while (true)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(info) << __func__ << " : "
|
||||
<< "Ledger with sequence = " << nextSequence
|
||||
<< " has been validated by the network. "
|
||||
<< "Attempting to find in database and publish";
|
||||
// Attempt to take over responsibility of ETL writer after 2 failed
|
||||
// attempts to publish the ledger. publishLedger() fails if the
|
||||
// ledger that has been validated by the network is not found in the
|
||||
// database after the specified number of attempts. publishLedger()
|
||||
// waits one second between each attempt to read the ledger from the
|
||||
// database
|
||||
//
|
||||
// In strict read-only mode, when the software fails to find a
|
||||
// ledger in the database that has been validated by the network,
|
||||
// the software will only try to publish subsequent ledgers once,
|
||||
// until one of those ledgers is found in the database. Once the
|
||||
// software successfully publishes a ledger, the software will fall
|
||||
// back to the normal behavior of trying several times to publish
|
||||
// the ledger that has been validated by the network. In this
|
||||
// manner, a reporting processing running in read-only mode does not
|
||||
// need to restart if the database is wiped.
|
||||
constexpr size_t timeoutSeconds = 2;
|
||||
bool success = publishLedger(nextSequence, timeoutSeconds);
|
||||
if (!success)
|
||||
if (Backend::synchronousAndRetryOnTimeout([&](auto yield) {
|
||||
return backend_->fetchLedgerBySequence(nextSequence, yield);
|
||||
}))
|
||||
{
|
||||
publishLedger(nextSequence, {});
|
||||
++nextSequence;
|
||||
}
|
||||
else if (networkValidatedLedgers_->waitUntilValidatedByNetwork(
|
||||
nextSequence, 1000))
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " : "
|
||||
<< "Failed to publish ledger with sequence = " << nextSequence
|
||||
<< " . Beginning ETL";
|
||||
// doContinousETLPipelined returns the most recent sequence
|
||||
// published empty optional if no sequence was published
|
||||
std::optional<uint32_t> lastPublished =
|
||||
runETLPipeline(nextSequence, extractorThreads_);
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " : "
|
||||
<< "Aborting ETL. Falling back to publishing";
|
||||
// if no ledger was published, don't increment nextSequence
|
||||
if (lastPublished)
|
||||
nextSequence = *lastPublished + 1;
|
||||
<< "Ledger with sequence = " << nextSequence
|
||||
<< " has been validated by the network. "
|
||||
<< "Attempting to find in database and publish";
|
||||
// Attempt to take over responsibility of ETL writer after 2 failed
|
||||
// attempts to publish the ledger. publishLedger() fails if the
|
||||
// ledger that has been validated by the network is not found in the
|
||||
// database after the specified number of attempts. publishLedger()
|
||||
// waits one second between each attempt to read the ledger from the
|
||||
// database
|
||||
constexpr size_t timeoutSeconds = 2;
|
||||
bool success = publishLedger(nextSequence, timeoutSeconds);
|
||||
if (!success)
|
||||
{
|
||||
BOOST_LOG_TRIVIAL(warning)
|
||||
<< __func__ << " : "
|
||||
<< "Failed to publish ledger with sequence = "
|
||||
<< nextSequence << " . Beginning ETL";
|
||||
// doContinousETLPipelined returns the most recent sequence
|
||||
// published empty optional if no sequence was published
|
||||
std::optional<uint32_t> lastPublished =
|
||||
runETLPipeline(nextSequence, extractorThreads_);
|
||||
BOOST_LOG_TRIVIAL(info)
|
||||
<< __func__ << " : "
|
||||
<< "Aborting ETL. Falling back to publishing";
|
||||
// if no ledger was published, don't increment nextSequence
|
||||
if (lastPublished)
|
||||
nextSequence = *lastPublished + 1;
|
||||
}
|
||||
else
|
||||
++nextSequence;
|
||||
}
|
||||
else
|
||||
++nextSequence;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user