Publish in strand. Catch ReadTimeout in publishLedger

This commit is contained in:
CJ Cobb
2021-06-24 18:29:19 +00:00
parent 25ca6590d1
commit 415b9ccc28
3 changed files with 25 additions and 11 deletions

View File

@@ -1,6 +1,6 @@
#include <functional>
#include <backend/CassandraBackend.h> #include <backend/CassandraBackend.h>
#include <backend/DBHelpers.h> #include <backend/DBHelpers.h>
#include <functional>
#include <unordered_map> #include <unordered_map>
/* /*
namespace std { namespace std {
@@ -268,7 +268,9 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
if (!result) if (!result)
{ {
BOOST_LOG_TRIVIAL(error) << __func__ << " - no rows"; BOOST_LOG_TRIVIAL(error)
<< __func__
<< " - no rows . ledger = " << std::to_string(ledgerSequence);
return {}; return {};
} }
std::vector<ripple::uint256> hashes; std::vector<ripple::uint256> hashes;
@@ -742,7 +744,7 @@ CassandraBackend::writeKeys(
std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs; std::vector<std::shared_ptr<WriteKeyCallbackData>> cbs;
cbs.reserve(keys.size()); cbs.reserve(keys.size());
uint32_t concurrentLimit = uint32_t concurrentLimit =
isAsync ? indexerMaxRequestsOutstanding : keys.size(); isAsync ? indexerMaxRequestsOutstanding : maxRequestsOutstanding;
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " Ledger = " << std::to_string(index.keyIndex) << __func__ << " Ledger = " << std::to_string(index.keyIndex)
<< " . num keys = " << std::to_string(keys.size()) << " . num keys = " << std::to_string(keys.size())

View File

@@ -26,9 +26,9 @@
#include <boost/asio/ip/tcp.hpp> #include <boost/asio/ip/tcp.hpp>
#include <boost/beast/core.hpp> #include <boost/beast/core.hpp>
#include <boost/beast/websocket.hpp> #include <boost/beast/websocket.hpp>
#include <server/SubscriptionManager.h>
#include <cstdlib> #include <cstdlib>
#include <iostream> #include <iostream>
#include <server/SubscriptionManager.h>
#include <string> #include <string>
#include <variant> #include <variant>
@@ -173,9 +173,22 @@ ReportingETL::getFees(std::uint32_t seq)
void void
ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo) ReportingETL::publishLedger(ripple::LedgerInfo const& lgrInfo)
{ {
auto ledgerRange = backend_->fetchLedgerRange(); auto ledgerRange = backend_->fetchLedgerRangeNoThrow();
auto fees = getFees(lgrInfo.seq); auto fees = getFees(lgrInfo.seq);
auto transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq); std::vector<Backend::TransactionAndMetadata> transactions;
while (true)
{
try
{
transactions = backend_->fetchAllTransactionsInLedger(lgrInfo.seq);
break;
}
catch (Backend::DatabaseTimeout const&)
{
BOOST_LOG_TRIVIAL(warning) << "Read timeout fetching transactions";
}
}
if (!fees || !ledgerRange) if (!fees || !ledgerRange)
{ {
@@ -245,7 +258,7 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
auto lgr = backend_->fetchLedgerBySequence(ledgerSequence); auto lgr = backend_->fetchLedgerBySequence(ledgerSequence);
assert(lgr); assert(lgr);
publishLedger(*lgr); publishLedger(*lgr);
return true; return true;
} }
} }
@@ -253,7 +266,6 @@ ReportingETL::publishLedger(uint32_t ledgerSequence, uint32_t maxAttempts)
{ {
continue; continue;
} }
} }
return false; return false;
} }
@@ -553,7 +565,9 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
// success is false if the ledger was already written // success is false if the ledger was already written
if (success) if (success)
{ {
publishLedger(lgrInfo); boost::asio::post(publishStrand_, [this, lgrInfo = lgrInfo]() {
publishLedger(lgrInfo);
});
lastPublishedSequence = lgrInfo.seq; lastPublishedSequence = lgrInfo.seq;
} }
writeConflict = !success; writeConflict = !success;

View File

@@ -174,8 +174,6 @@ handle_request(
wsStyleRequest["command"] = request["method"]; wsStyleRequest["command"] = request["method"];
std::cout << "Transfromed to ws style stuff" << std::endl;
auto [builtResponse, cost] = auto [builtResponse, cost] =
buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr); buildResponse(wsStyleRequest, backend, nullptr, balancer, nullptr);