diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index fbeac45e..fbf14f07 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -1,3 +1,4 @@ +#include #include #include namespace Backend { @@ -414,33 +415,57 @@ PostgresBackend::fetchTransactions( pgQuery("SET statement_timeout TO 10000"); std::stringstream sql; bool first = true; - for (auto const& hash : hashes) + auto start = std::chrono::system_clock::now(); + boost::asio::thread_pool pool{hashes.size()}; + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " created threadpool. took " << std::to_string(duration); + std::vector results; + results.resize(hashes.size()); + std::condition_variable cv; + std::mutex mtx; + std::atomic_uint numRemaining = hashes.size(); + for (size_t i = 0; i < hashes.size(); ++i) { - if (!first) - sql << " UNION ALL "; - sql << "SELECT transaction,metadata,ledger_seq FROM transactions " - "WHERE "; - sql << "HASH = \'\\x" << ripple::strHex(hash) << "\'"; - first = false; - } - BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetching. sql = " << sql.str(); - auto res = pgQuery(sql.str().data()); - BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetched"; - if (size_t numRows = checkResult(res, 3)) - { - std::vector results; - for (size_t i = 0; i < numRows; ++i) - { - results.push_back( - {res.asUnHexedBlob(i, 0), - res.asUnHexedBlob(i, 1), - res.asBigInt(i, 2)}); - } - return results; - } + auto const& hash = hashes[i]; + boost::asio::post([this, + &hash, + &results, + &numRemaining, + &cv, + &mtx, + i]() { + PgQuery pgQuery(pgPool_); + std::stringstream sql; + sql << "SELECT transaction,metadata,ledger_seq FROM transactions " + "WHERE HASH = \'\\x" + << ripple::strHex(hash) << "\'"; - return {}; -} + auto res = pgQuery(sql.str().data()); + if (size_t numRows = checkResult(res, 3)) + { + results[i] = { + res.asUnHexedBlob(0, 0), + res.asUnHexedBlob(0, 1), + res.asBigInt(0, 2)}; + } + if (--numRemaining == 0) + { + std::unique_lock lck(mtx); + cv.notify_one(); + } + }); + } + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + auto end2 = std::chrono::system_clock::now(); + duration = ((end2 - end).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(hashes.size()) + << " transactions. took " << std::to_string(duration); + return results; +} // namespace Backend std::vector PostgresBackend::fetchLedgerObjects( @@ -504,9 +529,15 @@ PostgresBackend::fetchAccountTransactions( << " AND transaction_index < " << cursor->transactionIndex; sql << " ORDER BY ledger_seq DESC, transaction_index DESC"; sql << " LIMIT " << std::to_string(limit); - BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetching " << sql.str(); + auto start = std::chrono::system_clock::now(); auto res = pgQuery(sql.str().data()); - BOOST_LOG_TRIVIAL(debug) << __func__ << " : fetched " << sql.str(); + auto end = std::chrono::system_clock::now(); + + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " : executed first query in " << std::to_string(duration) + << " num records = " << std::to_string(checkResult(res, 3)) + << " query = " << sql.str(); size_t numRows = checkResult(res, 3); std::vector hashes; @@ -526,7 +557,16 @@ PostgresBackend::fetchAccountTransactions( sql2 << " AND ledger_seq < " << cursor->ledgerSequence << " ORDER BY ledger_seq DESC, transaction_index DESC"; sql2 << " LIMIT " << std::to_string(newLimit); + start = std::chrono::system_clock::now(); res = pgQuery(sql2.str().data()); + end = std::chrono::system_clock::now(); + + duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " : executed second query in " + << std::to_string(duration) + << " num records = " << std::to_string(checkResult(res, 3)) + << " query = " << sql2.str(); if (numRows = checkResult(res, 3)) { for (size_t i = 0; i < numRows; ++i)