make threadpool a class object

This commit is contained in:
CJ Cobb
2021-04-27 16:19:45 +00:00
parent 3be12dd4b4
commit 3aed708262
2 changed files with 80 additions and 49 deletions

View File

@@ -411,61 +411,91 @@ std::vector<TransactionAndMetadata>
PostgresBackend::fetchTransactions( PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const std::vector<ripple::uint256> const& hashes) const
{ {
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
bool first = true;
auto start = std::chrono::system_clock::now();
boost::asio::thread_pool pool{hashes.size()};
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " created threadpool. took " << std::to_string(duration);
std::vector<TransactionAndMetadata> results; std::vector<TransactionAndMetadata> results;
results.resize(hashes.size()); constexpr bool doAsync = true;
std::condition_variable cv; if (doAsync)
std::mutex mtx;
std::atomic_uint numRemaining = hashes.size();
for (size_t i = 0; i < hashes.size(); ++i)
{ {
auto const& hash = hashes[i]; auto start = std::chrono::system_clock::now();
boost::asio::post([this, auto end = std::chrono::system_clock::now();
&hash, auto duration = ((end - start).count()) / 1000000000.0;
&results, BOOST_LOG_TRIVIAL(info) << __func__ << " created threadpool. took "
&numRemaining, << std::to_string(duration);
&cv, results.resize(hashes.size());
&mtx, std::condition_variable cv;
i]() { std::mutex mtx;
PgQuery pgQuery(pgPool_); std::atomic_uint numRemaining = hashes.size();
std::stringstream sql; for (size_t i = 0; i < hashes.size(); ++i)
sql << "SELECT transaction,metadata,ledger_seq FROM transactions " {
auto const& hash = hashes[i];
boost::asio::post(
pool_, [this, &hash, &results, &numRemaining, &cv, &mtx, i]() {
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " getting txn = " << i;
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
{
results[i] = {
res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2)};
}
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
}
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with threadpool. took "
<< std::to_string(duration);
}
else
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
sql << "SELECT transaction,metadata,ledger_seq FROM "
"transactions "
"WHERE HASH = \'\\x" "WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'"; << ripple::strHex(hash) << "\'";
if (i + 1 < hashes.size())
auto res = pgQuery(sql.str().data()); sql << " UNION ALL ";
if (size_t numRows = checkResult(res, 3)) }
{ auto start = std::chrono::system_clock::now();
results[i] = { auto res = pgQuery(sql.str().data());
res.asUnHexedBlob(0, 0), auto end = std::chrono::system_clock::now();
res.asUnHexedBlob(0, 1), auto duration = ((end - start).count()) / 1000000000.0;
res.asBigInt(0, 2)}; BOOST_LOG_TRIVIAL(info)
} << __func__ << " fetched " << std::to_string(hashes.size())
if (--numRemaining == 0) << " transactions with union all. took "
{ << std::to_string(duration);
std::unique_lock lck(mtx); if (size_t numRows = checkResult(res, 3))
cv.notify_one(); {
} for (size_t i = 0; i < numRows; ++i)
}); results.push_back(
{res.asUnHexedBlob(i, 0),
res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2)});
}
} }
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions. took " << std::to_string(duration);
return results; return results;
} // namespace Backend }
std::vector<Blob> std::vector<Blob>
PostgresBackend::fetchLedgerObjects( PostgresBackend::fetchLedgerObjects(

View File

@@ -15,6 +15,7 @@ private:
std::shared_ptr<PgPool> pgPool_; std::shared_ptr<PgPool> pgPool_;
mutable PgQuery writeConnection_; mutable PgQuery writeConnection_;
mutable bool abortWrite_ = false; mutable bool abortWrite_ = false;
mutable boost::asio::thread_pool pool_{200};
public: public:
PostgresBackend(boost::json::object const& config); PostgresBackend(boost::json::object const& config);