handle postgres timeouts

This commit is contained in:
CJ Cobb
2021-03-11 16:44:43 -05:00
parent d62f7accfe
commit d2f0537f02
6 changed files with 52 additions and 11 deletions

View File

@@ -48,10 +48,12 @@ doLedgerData(
return response;
}
ripple::uint256 cursor;
std::optional<ripple::uint256> cursor;
if (request.contains("cursor"))
{
cursor.parseHex(request.at("cursor").as_string().c_str());
BOOST_LOG_TRIVIAL(debug) << __func__ << " : parsing cursor";
cursor = ripple::uint256{};
cursor->parseHex(request.at("cursor").as_string().c_str());
}
bool binary =
request.contains("binary") ? request.at("binary").as_bool() : false;

View File

@@ -34,6 +34,15 @@ struct LedgerRange
uint32_t maxSequence;
};
class DatabaseTimeout : public std::exception
{
const char*
what() const throw() override
{
return "Database read timed out. Please retry the request";
}
};
class BackendInterface
{
public:

View File

@@ -181,6 +181,7 @@ Pg::query(char const* command, std::size_t nParams, char const* const* values)
BOOST_LOG_TRIVIAL(error) << ss.str();
PgResult retRes(ret.get(), conn_.get());
disconnect();
return retRes;
}
}

View File

@@ -81,10 +81,14 @@ checkResult(PgResult const& res, uint32_t numFieldsExpected)
{
if (!res)
{
auto msg = res.msg();
BOOST_LOG_TRIVIAL(debug) << msg;
if (msg.find("statement timeout"))
throw DatabaseTimeout();
assert(false);
throw std::runtime_error("null postgres response");
throw std::runtime_error(msg);
}
else if (res.status() != PGRES_TUPLES_OK)
if (res.status() != PGRES_TUPLES_OK)
{
std::stringstream msg;
msg << " : Postgres response should have been "
@@ -150,6 +154,7 @@ std::optional<uint32_t>
PostgresBackend::fetchLatestLedgerSequence() const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
auto res = pgQuery(
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1");
if (checkResult(res, 1))
@@ -161,6 +166,7 @@ std::optional<ripple::LedgerInfo>
PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT * FROM ledgers WHERE ledger_seq = "
<< std::to_string(sequence);
@@ -211,6 +217,7 @@ PostgresBackend::fetchLedgerObject(
uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT object FROM objects WHERE key = "
<< "\'\\x" << ripple::strHex(key) << "\'"
@@ -230,6 +237,7 @@ std::optional<TransactionAndMetadata>
PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE hash = "
@@ -249,6 +257,7 @@ std::vector<TransactionAndMetadata>
PostgresBackend::fetchAllTransactionsInLedger(uint32_t ledgerSequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT transaction, metadata, ledger_seq FROM transactions WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
@@ -272,6 +281,7 @@ PostgresBackend::fetchAllTransactionHashesInLedger(
uint32_t ledgerSequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT hash FROM transactions WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
@@ -295,15 +305,17 @@ PostgresBackend::fetchLedgerPage(
std::uint32_t limit) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT key,object FROM"
<< " (SELECT DISTINCT ON (key) * FROM objects"
<< " WHERE ledger_seq <= " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND key > \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key, ledger_seq DESC) sub"
sql << " AND key < \'\\x" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key DESC, ledger_seq DESC) sub"
<< " WHERE object != \'\\x\'"
<< " LIMIT " << std::to_string(limit);
BOOST_LOG_TRIVIAL(debug) << __func__ << sql.str();
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 2))
{
@@ -347,6 +359,7 @@ PostgresBackend::fetchBookOffers(
keys.push_back(res.asUInt256(i, 0));
}
std::vector<Blob> blobs = fetchLedgerObjects(keys, ledgerSequence);
std::vector<LedgerObject> results;
std::transform(
blobs.begin(),
@@ -366,6 +379,7 @@ PostgresBackend::fetchTransactions(
std::vector<ripple::uint256> const& hashes) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE ";
@@ -400,6 +414,7 @@ PostgresBackend::fetchLedgerObjects(
uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT DISTINCT ON(key) object FROM objects WHERE";
@@ -445,6 +460,7 @@ PostgresBackend::fetchAccountTransactions(
std::optional<AccountTransactionsCursor> const& cursor) const
{
PgQuery pgQuery(pgPool_);
pgQuery("SET statement_timeout TO 10000");
std::stringstream sql;
sql << "SELECT hash, ledger_seq, transaction_index FROM "
"account_transactions WHERE account = "

12
test.py
View File

@@ -224,7 +224,7 @@ async def ledger_data(ip, port, ledger, limit, binary):
async def ledger_data_full(ip, port, ledger, binary):
async def ledger_data_full(ip, port, ledger, binary, limit):
address = 'ws://' + str(ip) + ':' + str(port)
try:
blobs = []
@@ -232,15 +232,19 @@ async def ledger_data_full(ip, port, ledger, binary):
async with websockets.connect(address) as ws:
marker = None
while True:
res = {}
if marker is None:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary)}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary), "limit":int(limit)}))
res = json.loads(await ws.recv())
else:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "binary":bool(binary)}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"cursor":marker, "binary":bool(binary), "limit":int(limit)}))
res = json.loads(await ws.recv())
if "error" in res:
print(res)
continue
objects = []
if "result" in res:
@@ -430,7 +434,7 @@ def run(args):
ledger_data(args.ip, args.port, args.ledger, args.limit, args.binary))
elif args.action == "ledger_data_full":
asyncio.get_event_loop().run_until_complete(
ledger_data_full(args.ip, args.port, args.ledger, args.binary))
ledger_data_full(args.ip, args.port, args.ledger, args.binary, args.limit))
elif args.action == "ledger":
res = asyncio.get_event_loop().run_until_complete(

View File

@@ -211,7 +211,16 @@ public:
// BOOST_LOG_TRIVIAL(debug) << __func__ << " parsed";
boost::json::object request = raw.as_object();
BOOST_LOG_TRIVIAL(debug) << " received request : " << request;
auto response = buildResponse(request, backend_);
boost::json::object response;
try
{
response = buildResponse(request, backend_);
}
catch (Backend::DatabaseTimeout const& t)
{
response["error"] =
"Database read timeout. Please retry the request";
}
BOOST_LOG_TRIVIAL(trace) << __func__ << response;
response_ = boost::json::serialize(response);