This commit is contained in:
CJ Cobb
2021-07-02 15:05:46 +00:00
parent 9f8724b7ab
commit 2eec383b35
3 changed files with 49 additions and 42 deletions

View File

@@ -205,6 +205,7 @@ BackendInterface::fetchLedgerPage(
uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4)); uint32_t adjustedLimit = std::max(limitHint, std::max(limit, (uint32_t)4));
LedgerPage page; LedgerPage page;
page.cursor = cursor; page.cursor = cursor;
int numCalls = 0;
do do
{ {
adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2; adjustedLimit = adjustedLimit >= 8192 ? 8192 : adjustedLimit * 2;
@@ -223,7 +224,7 @@ BackendInterface::fetchLedgerPage(
page.objects.insert( page.objects.insert(
page.objects.end(), partial.objects.begin(), partial.objects.end()); page.objects.end(), partial.objects.begin(), partial.objects.end());
page.cursor = partial.cursor; page.cursor = partial.cursor;
} while (page.objects.size() < limit && page.cursor); } while (page.objects.size() < limit && page.cursor && ++numCalls < 10);
if (incomplete) if (incomplete)
{ {
auto rng = fetchLedgerRange(); auto rng = fetchLedgerRange();

View File

@@ -301,13 +301,52 @@ CassandraBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence); auto hashes = fetchAllTransactionHashesInLedger(ledgerSequence);
return fetchTransactions(hashes); return fetchTransactions(hashes);
} }
std::vector<TransactionAndMetadata>
CassandraBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
{
std::size_t const numHashes = hashes.size();
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numHashes);
auto start = std::chrono::system_clock::now();
for (std::size_t i = 0; i < hashes.size(); ++i)
{
cbs.push_back(std::make_shared<ReadCallbackData>(
*this, hashes[i], results[i], cv, numFinished, numHashes));
read(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(
lck, [&numFinished, &numHashes]() { return numFinished == numHashes; });
auto end = std::chrono::system_clock::now();
for (auto const& res : results)
{
if (res.transaction.size() == 1 && res.transaction[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << numHashes << " transactions from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count()
<< " milliseconds";
return results;
}
std::vector<ripple::uint256> std::vector<ripple::uint256>
CassandraBackend::fetchAllTransactionHashesInLedger( CassandraBackend::fetchAllTransactionHashesInLedger(
uint32_t ledgerSequence) const uint32_t ledgerSequence) const
{ {
CassandraStatement statement{selectAllTransactionHashesInLedger_}; CassandraStatement statement{selectAllTransactionHashesInLedger_};
statement.bindInt(ledgerSequence); statement.bindInt(ledgerSequence);
auto start = std::chrono::system_clock::now();
CassandraResult result = executeSyncRead(statement); CassandraResult result = executeSyncRead(statement);
auto end = std::chrono::system_clock::now();
if (!result) if (!result)
{ {
BOOST_LOG_TRIVIAL(error) BOOST_LOG_TRIVIAL(error)
@@ -320,6 +359,12 @@ CassandraBackend::fetchAllTransactionHashesInLedger(
{ {
hashes.push_back(result.getUInt256()); hashes.push_back(result.getUInt256());
} while (result.nextRow()); } while (result.nextRow());
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << hashes.size()
<< " transaction hashes from Cassandra in "
<< std::chrono::duration_cast<std::chrono::milliseconds>(end - start)
.count()
<< " milliseconds";
return hashes; return hashes;
} }
@@ -1568,13 +1613,6 @@ CassandraBackend::open(bool readOnly)
continue; continue;
query.str(""); query.str("");
query << "SELECT transaction, metadata, ledger_sequence FROM "
<< tablePrefix << "transactions"
<< " WHERE ledger_sequence = ?";
if (!selectAllTransactionsInLedger_.prepareStatement(
query, session_.get()))
continue;
query.str("");
query << "SELECT hash FROM " << tablePrefix << "ledger_transactions" query << "SELECT hash FROM " << tablePrefix << "ledger_transactions"
<< " WHERE ledger_sequence = ?"; << " WHERE ledger_sequence = ?";
if (!selectAllTransactionHashesInLedger_.prepareStatement( if (!selectAllTransactionHashesInLedger_.prepareStatement(

View File

@@ -635,7 +635,6 @@ private:
CassandraPreparedStatement insertTransaction_; CassandraPreparedStatement insertTransaction_;
CassandraPreparedStatement insertLedgerTransaction_; CassandraPreparedStatement insertLedgerTransaction_;
CassandraPreparedStatement selectTransaction_; CassandraPreparedStatement selectTransaction_;
CassandraPreparedStatement selectAllTransactionsInLedger_;
CassandraPreparedStatement selectAllTransactionHashesInLedger_; CassandraPreparedStatement selectAllTransactionHashesInLedger_;
CassandraPreparedStatement selectObject_; CassandraPreparedStatement selectObject_;
CassandraPreparedStatement selectLedgerPageKeys_; CassandraPreparedStatement selectLedgerPageKeys_;
@@ -1059,39 +1058,8 @@ public:
}; };
std::vector<TransactionAndMetadata> std::vector<TransactionAndMetadata>
fetchTransactions(std::vector<ripple::uint256> const& hashes) const override fetchTransactions(
{ std::vector<ripple::uint256> const& hashes) const override;
std::size_t const numHashes = hashes.size();
BOOST_LOG_TRIVIAL(debug)
<< "Fetching " << numHashes << " transactions from Cassandra";
std::atomic_uint32_t numFinished = 0;
std::condition_variable cv;
std::mutex mtx;
std::vector<TransactionAndMetadata> results{numHashes};
std::vector<std::shared_ptr<ReadCallbackData>> cbs;
cbs.reserve(numHashes);
for (std::size_t i = 0; i < hashes.size(); ++i)
{
cbs.push_back(std::make_shared<ReadCallbackData>(
*this, hashes[i], results[i], cv, numFinished, numHashes));
read(*cbs[i]);
}
assert(results.size() == cbs.size());
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numFinished, &numHashes]() {
return numFinished == numHashes;
});
for (auto const& res : results)
{
if (res.transaction.size() == 1 && res.transaction[0] == 0)
throw DatabaseTimeout();
}
BOOST_LOG_TRIVIAL(debug)
<< "Fetched " << numHashes << " transactions from Cassandra";
return results;
}
void void
read(ReadCallbackData& data) const read(ReadCallbackData& data) const