Make database reads async

* yield on db read using asio
* PostgresBackend fetches multiple transactions or objects in parallel
This commit is contained in:
natenichols
2022-02-07 07:00:15 -06:00
committed by CJ Cobb
parent 7c2bef70bc
commit d016253264
50 changed files with 3612 additions and 2593 deletions

View File

@@ -4,9 +4,26 @@
#include <thread>
namespace Backend {
PostgresBackend::PostgresBackend(boost::json::object const& config)
// Type alias for async completion handlers
using completion_token = boost::asio::yield_context;
using function_type = void(boost::system::error_code);
using result_type = boost::asio::async_result<completion_token, function_type>;
using handler_type = typename result_type::completion_handler_type;
struct HandlerWrapper
{
handler_type handler;
HandlerWrapper(handler_type&& handler_) : handler(std::move(handler_))
{
}
};
PostgresBackend::PostgresBackend(
boost::asio::io_context& ioc,
boost::json::object const& config)
: BackendInterface(config)
, pgPool_(make_PgPool(config))
, pgPool_(make_PgPool(ioc, config))
, writeConnection_(pgPool_)
{
if (config.contains("write_interval"))
@@ -19,22 +36,24 @@ PostgresBackend::writeLedger(
ripple::LedgerInfo const& ledgerInfo,
std::string&& ledgerHeader)
{
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
synchronous([&](boost::asio::yield_context yield) {
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
auto ledgerInsert = boost::str(
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
ledgerInfo.closeTime.time_since_epoch().count() %
ledgerInfo.parentCloseTime.time_since_epoch().count() %
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
ripple::strHex(ledgerInfo.accountHash) %
ripple::strHex(ledgerInfo.txHash));
auto ledgerInsert = boost::str(
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
ledgerInfo.closeTime.time_since_epoch().count() %
ledgerInfo.parentCloseTime.time_since_epoch().count() %
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
ripple::strHex(ledgerInfo.accountHash) %
ripple::strHex(ledgerInfo.txHash));
auto res = writeConnection_(ledgerInsert.data());
abortWrite_ = !res;
inProcessLedger = ledgerInfo.seq;
auto res = writeConnection_(ledgerInsert.data(), yield);
abortWrite_ = !res;
inProcessLedger = ledgerInfo.seq;
});
}
void
@@ -57,65 +76,71 @@ PostgresBackend::writeAccountTransactions(
}
}
}
void
PostgresBackend::doWriteLedgerObject(
std::string&& key,
uint32_t seq,
std::uint32_t const seq,
std::string&& blob)
{
if (abortWrite_)
return;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n';
numRowsInObjectsBuffer_++;
// If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records
if (numRowsInObjectsBuffer_ % writeInterval_ == 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num objects = "
<< numRowsInObjectsBuffer_;
writeConnection_.bulkInsert("objects", objectsBuffer_.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
objectsBuffer_.str("");
}
synchronous([&](boost::asio::yield_context yield) {
if (abortWrite_)
return;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n';
numRowsInObjectsBuffer_++;
// If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records
if (numRowsInObjectsBuffer_ % writeInterval_ == 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num objects = "
<< numRowsInObjectsBuffer_;
writeConnection_.bulkInsert("objects", objectsBuffer_.str(), yield);
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
objectsBuffer_.str("");
}
});
}
void
PostgresBackend::writeSuccessor(
std::string&& key,
uint32_t seq,
std::uint32_t const seq,
std::string&& successor)
{
if (range)
{
if (successors_.count(key) > 0)
return;
successors_.insert(key);
}
successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(successor) << '\n';
BOOST_LOG_TRIVIAL(trace)
<< __func__ << ripple::strHex(key) << " - " << std::to_string(seq);
numRowsInSuccessorBuffer_++;
if (numRowsInSuccessorBuffer_ % writeInterval_ == 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num successors = "
<< numRowsInSuccessorBuffer_;
writeConnection_.bulkInsert("successor", successorBuffer_.str());
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
successorBuffer_.str("");
}
synchronous([&](boost::asio::yield_context yield) {
if (range)
{
if (successors_.count(key) > 0)
return;
successors_.insert(key);
}
successorBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(successor) << '\n';
BOOST_LOG_TRIVIAL(trace)
<< __func__ << ripple::strHex(key) << " - " << std::to_string(seq);
numRowsInSuccessorBuffer_++;
if (numRowsInSuccessorBuffer_ % writeInterval_ == 0)
{
BOOST_LOG_TRIVIAL(info)
<< __func__ << " Flushing large buffer. num successors = "
<< numRowsInSuccessorBuffer_;
writeConnection_.bulkInsert(
"successor", successorBuffer_.str(), yield);
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
successorBuffer_.str("");
}
});
}
void
PostgresBackend::writeTransaction(
std::string&& hash,
uint32_t seq,
uint32_t date,
std::uint32_t const seq,
std::uint32_t const date,
std::string&& transaction,
std::string&& metadata)
{
@@ -127,8 +152,8 @@ PostgresBackend::writeTransaction(
<< '\t' << "\\\\x" << ripple::strHex(metadata) << '\n';
}
uint32_t
checkResult(PgResult const& res, uint32_t numFieldsExpected)
std::uint32_t
checkResult(PgResult const& res, std::uint32_t const numFieldsExpected)
{
if (!res)
{
@@ -201,50 +226,62 @@ parseLedgerInfo(PgResult const& res)
info.validated = true;
return info;
}
std::optional<uint32_t>
PostgresBackend::fetchLatestLedgerSequence() const
std::optional<std::uint32_t>
PostgresBackend::fetchLatestLedgerSequence(
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
auto res = pgQuery(
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1");
if (checkResult(res, 1))
pgQuery(set_timeout, yield);
auto const query =
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1";
if (auto res = pgQuery(query, yield); checkResult(res, 1))
return res.asBigInt(0, 0);
return {};
}
std::optional<ripple::LedgerInfo>
PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const
PostgresBackend::fetchLedgerBySequence(
std::uint32_t const sequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT * FROM ledgers WHERE ledger_seq = "
<< std::to_string(sequence);
auto res = pgQuery(sql.str().data());
if (checkResult(res, 10))
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10))
return parseLedgerInfo(res);
return {};
}
std::optional<ripple::LedgerInfo>
PostgresBackend::fetchLedgerByHash(ripple::uint256 const& hash) const
PostgresBackend::fetchLedgerByHash(
ripple::uint256 const& hash,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT * FROM ledgers WHERE ledger_hash = "
<< ripple::to_string(hash);
auto res = pgQuery(sql.str().data());
if (checkResult(res, 10))
sql << "SELECT * FROM ledgers WHERE ledger_hash = \'\\x"
<< ripple::to_string(hash) << "\'";
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 10))
return parseLedgerInfo(res);
return {};
}
std::optional<LedgerRange>
PostgresBackend::hardFetchLedgerRange() const
PostgresBackend::hardFetchLedgerRange(boost::asio::yield_context& yield) const
{
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()");
auto range = PgQuery(pgPool_)("SELECT complete_ledgers()", yield);
if (!range)
return {};
@@ -279,17 +316,19 @@ PostgresBackend::hardFetchLedgerRange() const
std::optional<Blob>
PostgresBackend::doFetchLedgerObject(
ripple::uint256 const& key,
uint32_t sequence) const
std::uint32_t const sequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT object FROM objects WHERE key = "
<< "\'\\x" << ripple::strHex(key) << "\'"
<< " AND ledger_seq <= " << std::to_string(sequence)
<< " ORDER BY ledger_seq DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 1))
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1))
{
auto blob = res.asUnHexedBlob(0, 0);
if (blob.size())
@@ -301,16 +340,19 @@ PostgresBackend::doFetchLedgerObject(
// returns a transaction, metadata pair
std::optional<TransactionAndMetadata>
PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
PostgresBackend::fetchTransaction(
ripple::uint256 const& hash,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq,date FROM transactions "
"WHERE hash = "
<< "\'\\x" << ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 4))
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 4))
{
return {
{res.asUnHexedBlob(0, 0),
@@ -322,15 +364,19 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
return {};
}
std::vector<TransactionAndMetadata>
PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
PostgresBackend::fetchAllTransactionsInLedger(
std::uint32_t const ledgerSequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT transaction, metadata, ledger_seq,date FROM transactions "
"WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (size_t numRows = checkResult(res, 4))
{
std::vector<TransactionAndMetadata> txns;
@@ -348,14 +394,17 @@ PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
}
std::vector<ripple::uint256>
PostgresBackend::fetchAllTransactionHashesInLedger(
uint32_t ledgerSequence) const
std::uint32_t const ledgerSequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT hash FROM transactions WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (size_t numRows = checkResult(res, 1))
{
std::vector<ripple::uint256> hashes;
@@ -365,22 +414,26 @@ PostgresBackend::fetchAllTransactionHashesInLedger(
}
return hashes;
}
return {};
}
std::optional<ripple::uint256>
PostgresBackend::doFetchSuccessorKey(
ripple::uint256 key,
uint32_t ledgerSequence) const
std::uint32_t const ledgerSequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT next FROM successor WHERE key = "
<< "\'\\x" << ripple::strHex(key) << "\'"
<< " AND ledger_seq <= " << std::to_string(ledgerSequence)
<< " ORDER BY ledger_seq DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 1))
if (auto res = pgQuery(sql.str().data(), yield); checkResult(res, 1))
{
auto next = res.asUInt256(0, 0);
if (next == lastKey)
@@ -393,114 +446,107 @@ PostgresBackend::doFetchSuccessorKey(
std::vector<TransactionAndMetadata>
PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
std::vector<ripple::uint256> const& hashes,
boost::asio::yield_context& yield) const
{
std::vector<TransactionAndMetadata> results;
constexpr bool doAsync = true;
if (doAsync)
{
auto start = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
results.resize(hashes.size());
std::condition_variable cv;
std::mutex mtx;
std::atomic_uint numRemaining = hashes.size();
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
boost::asio::post(
pool_, [this, &hash, &results, &numRemaining, &cv, &mtx, i]() {
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " getting txn = " << i;
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq,date FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
if (!hashes.size())
return {};
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 4))
{
results[i] = {
res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2),
res.asBigInt(0, 3)};
}
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
}
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with threadpool. took "
<< std::to_string(duration);
}
else
std::vector<TransactionAndMetadata> results;
results.resize(hashes.size());
handler_type handler(std::forward<decltype(yield)>(yield));
result_type result(handler);
auto hw = new HandlerWrapper(std::move(handler));
auto start = std::chrono::system_clock::now();
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
std::atomic_uint numRemaining = hashes.size();
for (size_t i = 0; i < hashes.size(); ++i)
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
for (size_t i = 0; i < hashes.size(); ++i)
{
auto const& hash = hashes[i];
sql << "SELECT transaction,metadata,ledger_seq,date FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
if (i + 1 < hashes.size())
sql << " UNION ALL ";
}
auto start = std::chrono::system_clock::now();
auto res = pgQuery(sql.str().data());
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with union all. took "
<< std::to_string(duration);
if (size_t numRows = checkResult(res, 3))
{
for (size_t i = 0; i < numRows; ++i)
results.push_back(
{res.asUnHexedBlob(i, 0),
res.asUnHexedBlob(i, 1),
res.asBigInt(i, 2),
res.asBigInt(i, 3)});
}
auto const& hash = hashes[i];
boost::asio::spawn(
get_associated_executor(yield),
[this, &hash, &results, hw, &numRemaining, i](
boost::asio::yield_context yield) {
BOOST_LOG_TRIVIAL(trace) << __func__ << " getting txn = " << i;
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq,date FROM "
"transactions "
"WHERE HASH = \'\\x"
<< ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data(), yield);
if (size_t numRows = checkResult(res, 4))
{
results[i] = {
res.asUnHexedBlob(0, 0),
res.asUnHexedBlob(0, 1),
res.asBigInt(0, 2),
res.asBigInt(0, 3)};
}
if (--numRemaining == 0)
{
handler_type h(std::move(hw->handler));
h(boost::system::error_code{});
}
});
}
// Yields the worker to the io_context until handler is called.
result.get();
delete hw;
auto end2 = std::chrono::system_clock::now();
duration = ((end2 - end).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(hashes.size())
<< " transactions with threadpool. took " << std::to_string(duration);
return results;
}
std::vector<Blob>
PostgresBackend::doFetchLedgerObjects(
std::vector<ripple::uint256> const& keys,
uint32_t sequence) const
std::uint32_t const sequence,
boost::asio::yield_context& yield) const
{
if (!keys.size())
return {};
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::vector<Blob> results;
results.resize(keys.size());
std::condition_variable cv;
std::mutex mtx;
handler_type handler(std::forward<decltype(yield)>(yield));
result_type result(handler);
auto hw = new HandlerWrapper(std::move(handler));
std::atomic_uint numRemaining = keys.size();
auto start = std::chrono::system_clock::now();
for (size_t i = 0; i < keys.size(); ++i)
{
auto const& key = keys[i];
boost::asio::post(
pool_,
[this, &key, &results, &numRemaining, &cv, &mtx, i, sequence]() {
boost::asio::spawn(
boost::asio::get_associated_executor(yield),
[this, &key, &results, &numRemaining, hw, i, sequence](
boost::asio::yield_context yield) {
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT object FROM "
"objects "
@@ -509,37 +555,48 @@ PostgresBackend::doFetchLedgerObjects(
<< " AND ledger_seq <= " << std::to_string(sequence)
<< " ORDER BY ledger_seq DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (size_t numRows = checkResult(res, 1))
{
results[i] = res.asUnHexedBlob();
}
if (--numRemaining == 0)
{
std::unique_lock lck(mtx);
cv.notify_one();
handler_type h(std::move(hw->handler));
h(boost::system::error_code{});
}
});
}
std::unique_lock lck(mtx);
cv.wait(lck, [&numRemaining]() { return numRemaining == 0; });
// Yields the worker to the io_context until handler is called.
result.get();
delete hw;
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
BOOST_LOG_TRIVIAL(info)
<< __func__ << " fetched " << std::to_string(keys.size())
<< " objects with threadpool. took " << std::to_string(duration);
return results;
}
std::vector<LedgerObject>
PostgresBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
PostgresBackend::fetchLedgerDiff(
std::uint32_t const ledgerSequence,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
std::stringstream sql;
sql << "SELECT key,object FROM objects "
"WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (size_t numRows = checkResult(res, 2))
{
std::vector<LedgerObject> objects;
@@ -549,18 +606,20 @@ PostgresBackend::fetchLedgerDiff(uint32_t ledgerSequence) const
}
return objects;
}
return {};
}
AccountTransactions
PostgresBackend::fetchAccountTransactions(
ripple::AccountID const& account,
std::uint32_t limit,
std::uint32_t const limit,
bool forward,
std::optional<AccountTransactionsCursor> const& cursor) const
std::optional<AccountTransactionsCursor> const& cursor,
boost::asio::yield_context& yield) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
pgQuery(set_timeout, yield);
pg_params dbParams;
char const*& command = dbParams.first;
@@ -587,7 +646,7 @@ PostgresBackend::fetchAccountTransactions(
}
auto start = std::chrono::system_clock::now();
auto res = pgQuery(dbParams);
auto res = pgQuery(dbParams, yield);
auto end = std::chrono::system_clock::now();
auto duration = ((end - start).count()) / 1000000000.0;
@@ -595,6 +654,7 @@ PostgresBackend::fetchAccountTransactions(
<< __func__ << " : executed stored_procedure in "
<< std::to_string(duration)
<< " num records = " << std::to_string(checkResult(res, 1));
checkResult(res, 1);
char const* resultStr = res.c_str();
@@ -618,13 +678,13 @@ PostgresBackend::fetchAccountTransactions(
if (responseObj.contains("cursor"))
{
return {
fetchTransactions(hashes),
fetchTransactions(hashes, yield),
{{responseObj.at("cursor").at("ledger_sequence").as_int64(),
responseObj.at("cursor")
.at("transaction_index")
.as_int64()}}};
}
return {fetchTransactions(hashes), {}};
return {fetchTransactions(hashes, yield), {}};
}
return {{}, {}};
} // namespace Backend
@@ -643,85 +703,93 @@ PostgresBackend::close()
}
void
PostgresBackend::startWrites()
PostgresBackend::startWrites() const
{
numRowsInObjectsBuffer_ = 0;
abortWrite_ = false;
auto res = writeConnection_("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error creating transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
synchronous([&](boost::asio::yield_context yield) {
numRowsInObjectsBuffer_ = 0;
abortWrite_ = false;
auto res = writeConnection_("BEGIN", yield);
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error creating transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
});
}
bool
PostgresBackend::doFinishWrites()
PostgresBackend::doFinishWrites() const
{
if (!abortWrite_)
{
std::string txStr = transactionsBuffer_.str();
writeConnection_.bulkInsert("transactions", txStr);
writeConnection_.bulkInsert(
"account_transactions", accountTxBuffer_.str());
std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size())
writeConnection_.bulkInsert("objects", objectsStr);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " objects size = " << objectsStr.size()
<< " txns size = " << txStr.size();
std::string successorStr = successorBuffer_.str();
if (successorStr.size())
writeConnection_.bulkInsert("successor", successorStr);
successors_.clear();
if (!range)
synchronous([&](boost::asio::yield_context yield) {
if (!abortWrite_)
{
std::stringstream indexCreate;
indexCreate
<< "CREATE INDEX diff ON objects USING hash(ledger_seq) "
"WHERE NOT "
"ledger_seq = "
<< std::to_string(inProcessLedger);
writeConnection_(indexCreate.str().data());
std::string txStr = transactionsBuffer_.str();
writeConnection_.bulkInsert("transactions", txStr, yield);
writeConnection_.bulkInsert(
"account_transactions", accountTxBuffer_.str(), yield);
std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size())
writeConnection_.bulkInsert("objects", objectsStr, yield);
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " objects size = " << objectsStr.size()
<< " txns size = " << txStr.size();
std::string successorStr = successorBuffer_.str();
if (successorStr.size())
writeConnection_.bulkInsert("successor", successorStr, yield);
if (!range)
{
std::stringstream indexCreate;
indexCreate
<< "CREATE INDEX diff ON objects USING hash(ledger_seq) "
"WHERE NOT "
"ledger_seq = "
<< std::to_string(inProcessLedger);
writeConnection_(indexCreate.str().data(), yield);
}
}
}
auto res = writeConnection_("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error committing transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
transactionsBuffer_.str("");
transactionsBuffer_.clear();
objectsBuffer_.str("");
objectsBuffer_.clear();
successorBuffer_.str("");
successorBuffer_.clear();
accountTxBuffer_.str("");
accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0;
auto res = writeConnection_("COMMIT", yield);
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error committing transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
transactionsBuffer_.str("");
transactionsBuffer_.clear();
objectsBuffer_.str("");
objectsBuffer_.clear();
successorBuffer_.str("");
successorBuffer_.clear();
successors_.clear();
accountTxBuffer_.str("");
accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0;
});
return !abortWrite_;
}
bool
PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
PostgresBackend::doOnlineDelete(
std::uint32_t const numLedgersToKeep,
boost::asio::yield_context& yield) const
{
auto rng = fetchLedgerRange();
if (!rng)
return false;
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
std::uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
if (minLedger <= rng->minSequence)
return false;
uint32_t limit = 2048;
std::uint32_t limit = 2048;
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 0");
pgQuery("SET statement_timeout TO 0", yield);
std::optional<ripple::uint256> cursor;
while (true)
{
auto [objects, curCursor, warning] = retryOnTimeout(
[&]() { return fetchLedgerPage(cursor, minLedger, 256); });
auto [objects, curCursor, warning] = retryOnTimeout([&]() {
return fetchLedgerPage(cursor, minLedger, 256, 0, yield);
});
if (warning)
{
BOOST_LOG_TRIVIAL(warning) << __func__
@@ -739,7 +807,7 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
<< std::to_string(minLedger) << '\t' << "\\\\x"
<< ripple::strHex(obj.blob) << '\n';
}
pgQuery.bulkInsert("objects", objectsBuffer.str());
pgQuery.bulkInsert("objects", objectsBuffer.str(), yield);
cursor = curCursor;
if (!cursor)
break;
@@ -749,7 +817,7 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
std::stringstream sql;
sql << "DELETE FROM ledgers WHERE ledger_seq < "
<< std::to_string(minLedger);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (res.msg() != "ok")
throw std::runtime_error("Error deleting from ledgers table");
}
@@ -757,7 +825,7 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
std::stringstream sql;
sql << "DELETE FROM keys WHERE ledger_seq < "
<< std::to_string(minLedger);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (res.msg() != "ok")
throw std::runtime_error("Error deleting from keys table");
}
@@ -765,7 +833,7 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
std::stringstream sql;
sql << "DELETE FROM books WHERE ledger_seq < "
<< std::to_string(minLedger);
auto res = pgQuery(sql.str().data());
auto res = pgQuery(sql.str().data(), yield);
if (res.msg() != "ok")
throw std::runtime_error("Error deleting from books table");
}