Add database timeout exception. clear out incomplete cassandra data. add ledger_entry rpc

This commit is contained in:
CJ Cobb
2021-03-12 16:37:27 -05:00
parent d2f0537f02
commit 4609010967
8 changed files with 239 additions and 74 deletions

View File

@@ -3,7 +3,7 @@
namespace Backend {
PostgresBackend::PostgresBackend(boost::json::object const& config)
: pgPool_(make_PgPool(config))
: pgPool_(make_PgPool(config)), writeConnection_(pgPool_)
{
}
void
@@ -12,13 +12,29 @@ PostgresBackend::writeLedger(
std::string&& ledgerHeader,
bool isFirst) const
{
ledgerHeader_ = ledgerInfo;
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
auto ledgerInsert = boost::str(
cmd % ledgerInfo.seq % ripple::strHex(ledgerInfo.hash) %
ripple::strHex(ledgerInfo.parentHash) % ledgerInfo.drops.drops() %
ledgerInfo.closeTime.time_since_epoch().count() %
ledgerInfo.parentCloseTime.time_since_epoch().count() %
ledgerInfo.closeTimeResolution.count() % ledgerInfo.closeFlags %
ripple::strHex(ledgerInfo.accountHash) %
ripple::strHex(ledgerInfo.txHash));
auto res = writeConnection_(ledgerInsert.data());
abortWrite_ = !res;
}
void
PostgresBackend::writeAccountTransactions(
std::vector<AccountTransactionsData>&& data) const
{
if (abortWrite_)
return;
PgQuery pg(pgPool_);
for (auto const& record : data)
{
@@ -42,6 +58,8 @@ PostgresBackend::writeLedgerObject(
bool isDeleted,
std::optional<ripple::uint256>&& book) const
{
if (abortWrite_)
return;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n';
@@ -50,8 +68,7 @@ PostgresBackend::writeLedgerObject(
// insert after 1 million records
if (numRowsInObjectsBuffer_ % 1000000 == 0)
{
PgQuery pgQuery(pgPool_);
pgQuery.bulkInsert("objects", objectsBuffer_.str());
writeConnection_.bulkInsert("objects", objectsBuffer_.str());
objectsBuffer_ = {};
}
@@ -70,6 +87,8 @@ PostgresBackend::writeTransaction(
std::string&& transaction,
std::string&& metadata) const
{
if (abortWrite_)
return;
transactionsBuffer_ << "\\\\x" << ripple::strHex(hash) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(transaction) << '\t' << "\\\\x"
@@ -226,7 +245,9 @@ PostgresBackend::fetchLedgerObject(
auto res = pgQuery(sql.str().data());
if (checkResult(res, 1))
{
return res.asUnHexedBlob(0, 0);
auto blob = res.asUnHexedBlob(0, 0);
if (blob.size())
return blob;
}
return {};
@@ -286,7 +307,7 @@ PostgresBackend::fetchAllTransactionHashesInLedger(
sql << "SELECT hash FROM transactions WHERE "
<< "ledger_seq = " << std::to_string(ledgerSequence);
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 3))
if (size_t numRows = checkResult(res, 1))
{
std::vector<ripple::uint256> hashes;
for (size_t i = 0; i < numRows; ++i)
@@ -510,43 +531,30 @@ void
PostgresBackend::startWrites() const
{
numRowsInObjectsBuffer_ = 0;
}
bool
PostgresBackend::finishWrites() const
{
PgQuery pg(pgPool_);
auto res = pg("BEGIN");
abortWrite_ = false;
auto res = writeConnection_("BEGIN");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;
msg << "Postgres error creating transaction: " << res.msg();
throw std::runtime_error(msg.str());
}
auto cmd = boost::format(
R"(INSERT INTO ledgers
VALUES (%u,'\x%s', '\x%s',%u,%u,%u,%u,%u,'\x%s','\x%s'))");
}
auto ledgerInsert = boost::str(
cmd % ledgerHeader_.seq % ripple::strHex(ledgerHeader_.hash) %
ripple::strHex(ledgerHeader_.parentHash) % ledgerHeader_.drops.drops() %
ledgerHeader_.closeTime.time_since_epoch().count() %
ledgerHeader_.parentCloseTime.time_since_epoch().count() %
ledgerHeader_.closeTimeResolution.count() % ledgerHeader_.closeFlags %
ripple::strHex(ledgerHeader_.accountHash) %
ripple::strHex(ledgerHeader_.txHash));
res = pg(ledgerInsert.data());
if (res)
bool
PostgresBackend::finishWrites() const
{
if (!abortWrite_)
{
pg.bulkInsert("transactions", transactionsBuffer_.str());
pg.bulkInsert("books", booksBuffer_.str());
pg.bulkInsert("account_transactions", accountTxBuffer_.str());
writeConnection_.bulkInsert("transactions", transactionsBuffer_.str());
writeConnection_.bulkInsert("books", booksBuffer_.str());
writeConnection_.bulkInsert(
"account_transactions", accountTxBuffer_.str());
std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size())
pg.bulkInsert("objects", objectsStr);
writeConnection_.bulkInsert("objects", objectsStr);
}
res = pg("COMMIT");
auto res = writeConnection_("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK)
{
std::stringstream msg;