From bc131f666ad45857b90856ffe6b80187fc7c1f74 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 25 Jan 2022 20:10:02 +0000 Subject: [PATCH] General purpose function to retry on database timeout --- src/backend/BackendInterface.cpp | 12 +-- src/backend/BackendInterface.h | 21 +++++ src/backend/CassandraBackend.cpp | 69 +++++++--------- src/backend/PostgresBackend.cpp | 47 +++++------ src/etl/ReportingETL.cpp | 95 ++++++++++------------- src/subscriptions/SubscriptionManager.cpp | 7 +- src/webserver/WsBase.h | 2 + 7 files changed, 117 insertions(+), 136 deletions(-) diff --git a/src/backend/BackendInterface.cpp b/src/backend/BackendInterface.cpp index 8bbbda02..90fb8e76 100644 --- a/src/backend/BackendInterface.cpp +++ b/src/backend/BackendInterface.cpp @@ -27,17 +27,7 @@ std::optional BackendInterface::hardFetchLedgerRangeNoThrow() const { BOOST_LOG_TRIVIAL(debug) << __func__; - while (true) - { - try - { - return hardFetchLedgerRange(); - } - catch (DatabaseTimeout& t) - { - ; - } - } + return retryOnTimeout([&]() { return hardFetchLedgerRange(); }); } // *** state data methods std::optional diff --git a/src/backend/BackendInterface.h b/src/backend/BackendInterface.h index e32e3515..e9a2ee7b 100644 --- a/src/backend/BackendInterface.h +++ b/src/backend/BackendInterface.h @@ -5,6 +5,8 @@ #include #include #include +#include +#include namespace Backend { class DatabaseTimeout : public std::exception @@ -16,6 +18,25 @@ class DatabaseTimeout : public std::exception } }; +template +auto +retryOnTimeout(F func, size_t waitMs = 500) +{ + while (true) + { + try + { + return func(); + } + catch (DatabaseTimeout& t) + { + std::this_thread::sleep_for(std::chrono::milliseconds(waitMs)); + BOOST_LOG_TRIVIAL(error) + << __func__ << " function timed out. Retrying ... "; + } + } +} + class BackendInterface { protected: diff --git a/src/backend/CassandraBackend.cpp b/src/backend/CassandraBackend.cpp index b13ccf1a..80d3d93c 100644 --- a/src/backend/CassandraBackend.cpp +++ b/src/backend/CassandraBackend.cpp @@ -690,48 +690,39 @@ CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const std::optional cursor; while (true) { - try - { - auto [objects, curCursor, warning] = - fetchLedgerPage(cursor, minLedger, 256); - if (warning) - { - BOOST_LOG_TRIVIAL(warning) - << __func__ - << " online delete running but flag ledger is not complete"; - std::this_thread::sleep_for(std::chrono::seconds(10)); - continue; - } - - for (auto& obj : objects) - { - ++numOutstanding; - cbs.push_back(makeAndExecuteBulkAsyncWrite( - this, - std::make_tuple( - std::move(obj.key), minLedger, std::move(obj.blob)), - bind, - numOutstanding, - mtx, - cv)); - - std::unique_lock lck(mtx); - BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; - cv.wait(lck, [&numOutstanding, concurrentLimit]() { - return numOutstanding < concurrentLimit; - }); - } - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - cursor = curCursor; - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) + auto [objects, curCursor, warning] = retryOnTimeout( + [&]() { return fetchLedgerPage(cursor, minLedger, 256); }); + if (warning) { BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); + << __func__ + << " online delete running but flag ledger is not complete"; + std::this_thread::sleep_for(std::chrono::seconds(10)); + continue; } + + for (auto& obj : objects) + { + ++numOutstanding; + cbs.push_back(makeAndExecuteBulkAsyncWrite( + this, + std::make_tuple( + std::move(obj.key), minLedger, std::move(obj.blob)), + bind, + numOutstanding, + mtx, + cv)); + + std::unique_lock lck(mtx); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; + cv.wait(lck, [&numOutstanding, concurrentLimit]() { + return numOutstanding < concurrentLimit; + }); + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + if (!cursor) + break; } std::unique_lock lck(mtx); cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); diff --git a/src/backend/PostgresBackend.cpp b/src/backend/PostgresBackend.cpp index ffcc13d7..5904c757 100644 --- a/src/backend/PostgresBackend.cpp +++ b/src/backend/PostgresBackend.cpp @@ -720,38 +720,29 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const std::optional cursor; while (true) { - try + auto [objects, curCursor, warning] = retryOnTimeout( + [&]() { return fetchLedgerPage(cursor, minLedger, 256); }); + if (warning) { - auto [objects, curCursor, warning] = - fetchLedgerPage(cursor, minLedger, 256); - if (warning) - { - BOOST_LOG_TRIVIAL(warning) << __func__ - << " online delete running but " - "flag ledger is not complete"; - std::this_thread::sleep_for(std::chrono::seconds(10)); - continue; - } - BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; - std::stringstream objectsBuffer; + BOOST_LOG_TRIVIAL(warning) << __func__ + << " online delete running but " + "flag ledger is not complete"; + std::this_thread::sleep_for(std::chrono::seconds(10)); + continue; + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + std::stringstream objectsBuffer; - for (auto& obj : objects) - { - objectsBuffer << "\\\\x" << ripple::strHex(obj.key) << '\t' - << std::to_string(minLedger) << '\t' << "\\\\x" - << ripple::strHex(obj.blob) << '\n'; - } - pgQuery.bulkInsert("objects", objectsBuffer.str()); - cursor = curCursor; - if (!cursor) - break; - } - catch (DatabaseTimeout const& e) + for (auto& obj : objects) { - BOOST_LOG_TRIVIAL(warning) - << __func__ << " Database timeout fetching keys"; - std::this_thread::sleep_for(std::chrono::seconds(2)); + objectsBuffer << "\\\\x" << ripple::strHex(obj.key) << '\t' + << std::to_string(minLedger) << '\t' << "\\\\x" + << ripple::strHex(obj.blob) << '\n'; } + pgQuery.bulkInsert("objects", objectsBuffer.str()); + cursor = curCursor; + if (!cursor) + break; } BOOST_LOG_TRIVIAL(info) << __func__ << " finished inserting into objects"; { diff --git a/src/etl/ReportingETL.cpp b/src/etl/ReportingETL.cpp index 9540ac33..6169b490 100644 --- a/src/etl/ReportingETL.cpp +++ b/src/etl/ReportingETL.cpp @@ -131,37 +131,23 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) { BOOST_LOG_TRIVIAL(debug) << __func__ << " - Publishing ledger " << std::to_string(lgrInfo.seq); + if (!writing_) { BOOST_LOG_TRIVIAL(debug) << __func__ << " - Updating cache"; - auto diff = backend_->fetchLedgerDiff(lgrInfo.seq); + auto diff = Backend::retryOnTimeout( + [&]() { return backend_->fetchLedgerDiff(lgrInfo.seq); }); backend_->cache().update(diff, lgrInfo.seq); } backend_->updateRange(lgrInfo.seq); + auto fees = Backend::retryOnTimeout( + [&]() { return backend_->fetchFees(lgrInfo.seq); }); + auto transactions = Backend::retryOnTimeout( + [&]() { return backend_->fetchAllTransactionsInLedger(lgrInfo.seq); }); + auto ledgerRange = backend_->fetchLedgerRange(); - - std::optional fees; - std::vector transactions; - while (true) - { - try - { - fees = backend_->fetchFees(lgrInfo.seq); - transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); - break; - } - catch (Backend::DatabaseTimeout const&) - { - BOOST_LOG_TRIVIAL(warning) << "Read timeout fetching transactions"; - } - } - - if (!fees || !ledgerRange) - { - BOOST_LOG_TRIVIAL(error) - << __func__ << " - could not fetch from database"; - return; - } + assert(ledgerRange); + assert(fees); std::string range = std::to_string(ledgerRange->minSequence) + "-" + std::to_string(ledgerRange->maxSequence); @@ -172,7 +158,7 @@ ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) subscriptions_->pubTransaction(txAndMeta, lgrInfo); setLastPublish(); - BOOST_LOG_TRIVIAL(debug) + BOOST_LOG_TRIVIAL(info) << __func__ << " - Published ledger " << std::to_string(lgrInfo.seq); } @@ -187,43 +173,37 @@ ReportingETL::publishLedger( size_t numAttempts = 0; while (!stopping_) { - try - { - auto range = backend_->hardFetchLedgerRangeNoThrow(); + auto range = backend_->hardFetchLedgerRangeNoThrow(); - if (!range || range->maxSequence < ledgerSequence) + if (!range || range->maxSequence < ledgerSequence) + { + BOOST_LOG_TRIVIAL(debug) << __func__ << " : " + << "Trying to publish. Could not find " + "ledger with sequence = " + << ledgerSequence; + // We try maxAttempts times to publish the ledger, waiting one + // second in between each attempt. + if (maxAttempts && numAttempts >= maxAttempts) { BOOST_LOG_TRIVIAL(debug) << __func__ << " : " - << "Trying to publish. Could not find " - "ledger with sequence = " - << ledgerSequence; - // We try maxAttempts times to publish the ledger, waiting one - // second in between each attempt. - if (maxAttempts && numAttempts >= maxAttempts) - { - BOOST_LOG_TRIVIAL(debug) - << __func__ << " : " - << "Failed to publish ledger after " << numAttempts - << " attempts."; - return false; - } - std::this_thread::sleep_for(std::chrono::seconds(1)); - ++numAttempts; - continue; + << "Failed to publish ledger after " + << numAttempts << " attempts."; + return false; } - else - { - auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); - assert(lgr); - publishLedger(*lgr); - - return true; - } - } - catch (Backend::DatabaseTimeout const& e) - { + std::this_thread::sleep_for(std::chrono::seconds(1)); + ++numAttempts; continue; } + else + { + auto lgr = Backend::retryOnTimeout([&]() { + return backend_->fetchLedgerBySequence(ledgerSequence); + }); + assert(lgr); + publishLedger(*lgr); + + return true; + } } return false; } @@ -678,9 +658,12 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) // success is false if the ledger was already written if (success) { + /* boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() { publishLedger(lgrInfo); }); + */ + backend_->updateRange(lgrInfo.seq); lastPublishedSequence = lgrInfo.seq; } writeConflict = !success; diff --git a/src/subscriptions/SubscriptionManager.cpp b/src/subscriptions/SubscriptionManager.cpp index d9f66168..82088426 100644 --- a/src/subscriptions/SubscriptionManager.cpp +++ b/src/subscriptions/SubscriptionManager.cpp @@ -232,8 +232,11 @@ SubscriptionManager::pubTransaction( auto amount = tx->getFieldAmount(ripple::sfTakerGets); if (account != amount.issue().account) { - auto ownerFunds = - RPC::accountFunds(*backend_, lgrInfo.seq, amount, account); + auto ownerFunds = Backend::retryOnTimeout([&]() { + return RPC::accountFunds( + *backend_, lgrInfo.seq, amount, account); + }); + pubObj["transaction"].as_object()["owner_funds"] = ownerFunds.getText(); } diff --git a/src/webserver/WsBase.h b/src/webserver/WsBase.h index 43cbd580..9217cd09 100644 --- a/src/webserver/WsBase.h +++ b/src/webserver/WsBase.h @@ -282,6 +282,8 @@ public: catch (Backend::DatabaseTimeout const& t) { BOOST_LOG_TRIVIAL(error) << __func__ << " Database timeout"; + // TODO this should be a diff error code. Rippled probably + // does not have an analagous error code return sendError(RPC::Error::rpcNOT_READY); } }