From 3aed708262ed1b670f078dff2589baf449374c38 Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 27 Apr 2021 16:19:45 +0000 Subject: [PATCH] make threadpool a class object --- reporting/PostgresBackend.cpp | 128 +++++++++++++++++++++------------- reporting/PostgresBackend.h | 1 + 2 files changed, 80 insertions(+), 49 deletions(-) diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 558ce85b..398b15f6 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -411,61 +411,91 @@ std::vector PostgresBackend::fetchTransactions( std::vector const& hashes) const { - PgQuery pgQuery(pgPool_); - pgQuery("SET statement_timeout TO 10000"); - std::stringstream sql; - bool first = true; - auto start = std::chrono::system_clock::now(); - boost::asio::thread_pool pool{hashes.size()}; - auto end = std::chrono::system_clock::now(); - auto duration = ((end - start).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(info) - << __func__ << " created threadpool. took " << std::to_string(duration); std::vector results; - results.resize(hashes.size()); - std::condition_variable cv; - std::mutex mtx; - std::atomic_uint numRemaining = hashes.size(); - for (size_t i = 0; i < hashes.size(); ++i) + constexpr bool doAsync = true; + if (doAsync) { - auto const& hash = hashes[i]; - boost::asio::post([this, - &hash, - &results, - &numRemaining, - &cv, - &mtx, - i]() { - PgQuery pgQuery(pgPool_); - std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_seq FROM transactions " + auto start = std::chrono::system_clock::now(); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) << __func__ << " created threadpool. took " + << std::to_string(duration); + results.resize(hashes.size()); + std::condition_variable cv; + std::mutex mtx; + std::atomic_uint numRemaining = hashes.size(); + for (size_t i = 0; i < hashes.size(); ++i) + { + auto const& hash = hashes[i]; + boost::asio::post( + pool_, [this, &hash, &results, &numRemaining, &cv, &mtx, i]() { + BOOST_LOG_TRIVIAL(debug) + << __func__ << " getting txn = " << i; + PgQuery pgQuery(pgPool_); + std::stringstream sql; + sql << "SELECT transaction,metadata,ledger_seq FROM " + "transactions " + "WHERE HASH = \'\\x" + << ripple::strHex(hash) << "\'"; + + auto res = pgQuery(sql.str().data()); + if (size_t numRows = checkResult(res, 3)) + { + results[i] = { + res.asUnHexedBlob(0, 0), + res.asUnHexedBlob(0, 1), + res.asBigInt(0, 2)}; + } + if (--numRemaining == 0) + { + std::unique_lock lck(mtx); + cv.notify_one(); + } + }); + } + std::unique_lock lck(mtx); + cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); + auto end2 = std::chrono::system_clock::now(); + duration = ((end2 - end).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(hashes.size()) + << " transactions with threadpool. took " + << std::to_string(duration); + } + else + { + PgQuery pgQuery(pgPool_); + pgQuery("SET statement_timeout TO 10000"); + std::stringstream sql; + for (size_t i = 0; i < hashes.size(); ++i) + { + auto const& hash = hashes[i]; + sql << "SELECT transaction,metadata,ledger_seq FROM " + "transactions " "WHERE HASH = \'\\x" << ripple::strHex(hash) << "\'"; - - auto res = pgQuery(sql.str().data()); - if (size_t numRows = checkResult(res, 3)) - { - results[i] = { - res.asUnHexedBlob(0, 0), - res.asUnHexedBlob(0, 1), - res.asBigInt(0, 2)}; - } - if (--numRemaining == 0) - { - std::unique_lock lck(mtx); - cv.notify_one(); - } - }); + if (i + 1 < hashes.size()) + sql << " UNION ALL "; + } + auto start = std::chrono::system_clock::now(); + auto res = pgQuery(sql.str().data()); + auto end = std::chrono::system_clock::now(); + auto duration = ((end - start).count()) / 1000000000.0; + BOOST_LOG_TRIVIAL(info) + << __func__ << " fetched " << std::to_string(hashes.size()) + << " transactions with union all. took " + << std::to_string(duration); + if (size_t numRows = checkResult(res, 3)) + { + for (size_t i = 0; i < numRows; ++i) + results.push_back( + {res.asUnHexedBlob(i, 0), + res.asUnHexedBlob(i, 1), + res.asBigInt(i, 2)}); + } } - std::unique_lock lck(mtx); - cv.wait(lck, [&numRemaining]() { return numRemaining == 0; }); - auto end2 = std::chrono::system_clock::now(); - duration = ((end2 - end).count()) / 1000000000.0; - BOOST_LOG_TRIVIAL(info) - << __func__ << " fetched " << std::to_string(hashes.size()) - << " transactions. took " << std::to_string(duration); return results; -} // namespace Backend +} std::vector PostgresBackend::fetchLedgerObjects( diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 6653dbe9..d7af001d 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -15,6 +15,7 @@ private: std::shared_ptr pgPool_; mutable PgQuery writeConnection_; mutable bool abortWrite_ = false; + mutable boost::asio::thread_pool pool_{200}; public: PostgresBackend(boost::json::object const& config);