simulate async fetch batch transactions

This commit is contained in:
CJ Cobb
2021-04-16 18:51:51 +00:00
parent 5364830224
commit e833bfb38c

View File

@@ -1,3 +1,4 @@
#include <boost/asio.hpp>
#include <boost/format.hpp> #include <boost/format.hpp>
#include <reporting/PostgresBackend.h> #include <reporting/PostgresBackend.h>
namespace Backend { namespace Backend {
@@ -414,33 +415,57 @@ PostgresBackend::fetchTransactions(
pgQuery("SET statement_timeout TO 10000"); pgQuery("SET statement_timeout TO 10000");
std::stringstream sql; std::stringstream sql;
bool first = true; bool first = true;
for (auto const& hash : hashes) auto start = std::chrono::system_clock::now();
boost::asio::thread_pool pool{hashes.size()};
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " created threadpool. took " << std::to_string(duration);
std::vector<TransactionAndMetadata> results;
results.resize(hashes.size());
std::condition_variable cv;
std::mutex mtx;
std::atomic_uint numRemaining = hashes.size();
for (size_t i = 0; i < hashes.size(); ++i)
{ {
if (!first) auto const& hash = hashes[i];
sql << " UNION ALL "; boost::asio::post([this,
&hash,
&results,
&numRemaining,
&cv,
&mtx,
i]() {
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM transactions " sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE "; "WHERE HASH = \'\\x"
sql << "HASH = \'\\x" << ripple::strHex(hash) << "\'"; << ripple::strHex(hash) << "\'";
first = false;
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetching. sql = " << sql.str();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetched";
if (size_t numRows = checkResult(res, 3)) if (size_t numRows = checkResult(res, 3))
{ {
std::vector<TransactionAndMetadata> results; results[i] = {
for (size_t i = 0; i < numRows; ++i) res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2)};
}
if (--numRemaining == 0)
{ {
results.push_back( std::unique_lock lck(mtx);
{res.asUnHexedBlob(i, 0), cv.notify_one();
res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2)});
} }
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions. took " << std::to_string(duration);
return results; return results;
} } // namespace Backend
return {};
}
std::vector<Blob> std::vector<Blob>
PostgresBackend::fetchLedgerObjects( PostgresBackend::fetchLedgerObjects(
@@ -504,9 +529,15 @@ PostgresBackend::fetchAccountTransactions(
<< " AND transaction_index < " << cursor->transactionIndex; << " AND transaction_index < " << cursor->transactionIndex;
sql << " ORDER BY ledger_seq DESC, transaction_index DESC"; sql << " ORDER BY ledger_seq DESC, transaction_index DESC";
sql << " LIMIT " << std::to_string(limit); sql << " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetching " << sql.str(); auto start = std::chrono::system_clock::now();
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetched " << sql.str(); auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : executed first query in " << std::to_string(duration)
<< " num records = " << std::to_string(checkResult(res, 3))
<< " query = " << sql.str();
size_t numRows = checkResult(res, 3); size_t numRows = checkResult(res, 3);
std::vector<ripple::uint256> hashes; std::vector<ripple::uint256> hashes;
@@ -526,7 +557,16 @@ PostgresBackend::fetchAccountTransactions(
sql2 << " AND ledger_seq < " << cursor->ledgerSequence sql2 << " AND ledger_seq < " << cursor->ledgerSequence
<< " ORDER BY ledger_seq DESC, transaction_index DESC"; << " ORDER BY ledger_seq DESC, transaction_index DESC";
sql2 << " LIMIT " << std::to_string(newLimit); sql2 << " LIMIT " << std::to_string(newLimit);
start = std::chrono::system_clock::now();
res = pgQuery(sql2.str().data()); res = pgQuery(sql2.str().data());
end = std::chrono::system_clock::now();
duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " : executed second query in "
<< std::to_string(duration)
<< " num records = " << std::to_string(checkResult(res, 3))
<< " query = " << sql2.str();
if (numRows = checkResult(res, 3)) if (numRows = checkResult(res, 3))
{ {
for (size_t i = 0; i < numRows; ++i) for (size_t i = 0; i < numRows; ++i)