bug fixes. insert objects in batch

This commit is contained in:
CJ Cobb
2021-03-04 12:49:34 -05:00
parent 5606d4a7dd
commit 694ec7bfe5
3 changed files with 43 additions and 31 deletions

View File

@@ -744,7 +744,7 @@ CREATE TABLE IF NOT EXISTS ledgers (
CREATE TABLE IF NOT EXISTS objects (
key bytea NOT NULL,
ledger_seq bigint NOT NULL,
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
object bytea,
PRIMARY KEY(key, ledger_seq)
);
@@ -769,7 +769,7 @@ CREATE TABLE IF NOT EXISTS account_transactions (
ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE,
transaction_index bigint NOT NULL,
hash bytea NOT NULL,
PRIMARY KEY (account, ledger_seq, transaction_index),
PRIMARY KEY (account, ledger_seq, transaction_index)
);
-- Table that maps a book to a list of offers in that book. Deletes from the ledger table
-- cascade here based on ledger_seq.

View File

@@ -54,7 +54,7 @@ PostgresBackend::writeAccountTransactions(
accountTxBuffer_ << "\\\\x" << acct << '\t'
<< std::to_string(ledgerSeq) << '\t'
<< std::to_string(idx) << '\t' << "\\\\x"
<< ripple::strHex(txHash);
<< ripple::strHex(txHash) << '\n';
}
}
}
@@ -69,9 +69,19 @@ PostgresBackend::writeLedgerObject(
{
if (abortWrite_)
return;
static int numRows = 0;
numRows++;
objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t'
<< std::to_string(seq) << '\t' << "\\\\x"
<< ripple::strHex(blob) << '\n';
// If the buffer gets too large, the insert fails. Not sure why. So we
// insert after 1 million records
if (numRows % 1000000 == 0)
{
PgQuery pgQuery(pgPool_);
pgQuery.bulkInsert("objects", objectsBuffer_.str());
objectsBuffer_ = {};
}
if (book)
{
@@ -138,16 +148,16 @@ checkResult(PgResult const& res, uint32_t numFieldsExpected)
ripple::LedgerInfo
parseLedgerInfo(PgResult const& res)
{
char const* hash = res.c_str(0, 0);
char const* prevHash = res.c_str(0, 1);
char const* accountHash = res.c_str(0, 2);
char const* txHash = res.c_str(0, 3);
std::int64_t totalCoins = res.asBigInt(0, 4);
std::int64_t closeTime = res.asBigInt(0, 5);
std::int64_t parentCloseTime = res.asBigInt(0, 6);
std::int64_t closeTimeRes = res.asBigInt(0, 7);
std::int64_t closeFlags = res.asBigInt(0, 8);
std::int64_t ledgerSeq = res.asBigInt(0, 9);
std::int64_t ledgerSeq = res.asBigInt(0, 0);
char const* hash = res.c_str(0, 1);
char const* prevHash = res.c_str(0, 2);
std::int64_t totalCoins = res.asBigInt(0, 3);
std::int64_t closeTime = res.asBigInt(0, 4);
std::int64_t parentCloseTime = res.asBigInt(0, 5);
std::int64_t closeTimeRes = res.asBigInt(0, 6);
std::int64_t closeFlags = res.asBigInt(0, 7);
char const* accountHash = res.c_str(0, 8);
char const* txHash = res.c_str(0, 9);
using time_point = ripple::NetClock::time_point;
using duration = ripple::NetClock::duration;
@@ -175,8 +185,8 @@ std::optional<uint32_t>
PostgresBackend::fetchLatestLedgerSequence() const
{
PgQuery pgQuery(pgPool_);
auto res =
pgQuery("SELECT sequence FROM ledgers ORDER BY sequence DESC LIMIT 1");
auto res = pgQuery(
"SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1");
if (checkResult(res, 1))
return res.asBigInt(0, 0);
return {};
@@ -187,7 +197,7 @@ PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT * FROM ledgers WHERE sequence = "
sql << "SELECT * FROM ledgers WHERE ledger_seq = "
<< std::to_string(sequence);
auto res = pgQuery(sql.str().data());
if (checkResult(res, 10))
@@ -238,8 +248,8 @@ PostgresBackend::fetchLedgerObject(
std::stringstream sql;
sql << "SELECT object FROM objects WHERE key = "
<< "\'\\x" << ripple::strHex(key) << "\'"
<< " AND sequence <= " << std::to_string(sequence)
<< " ORDER BY sequence DESC LIMIT 1";
<< " AND ledger_seq <= " << std::to_string(sequence)
<< " ORDER BY ledger_seq DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (checkResult(res, 1))
{
@@ -258,7 +268,7 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_sequence FROM transactions "
sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE hash = "
<< "\'\\x" << ripple::strHex(hash) << "\'";
auto res = pgQuery(sql.str().data());
@@ -286,10 +296,10 @@ PostgresBackend::fetchLedgerPage(
std::stringstream sql;
sql << "SELECT key,object FROM"
<< " (SELECT DISTINCT ON (key) * FROM objects"
<< " WHERE sequence <= " << std::to_string(ledgerSequence);
<< " WHERE ledger_seq <= " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND key > \'x\\" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key, sequence DESC) sub"
sql << " ORDER BY key, ledger_seq DESC) sub"
<< " WHERE object != \'\\x\'"
<< " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
@@ -325,10 +335,10 @@ PostgresBackend::fetchBookOffers(
sql << "SELECT key FROM"
<< " (SELECT DISTINCT ON (key) * FROM books WHERE book = "
<< "\'\\x" << ripple::strHex(book)
<< "\' AND sequence <= " << std::to_string(ledgerSequence);
<< "\' AND ledger_seq <= " << std::to_string(ledgerSequence);
if (cursor)
sql << " AND key > \'" << ripple::strHex(*cursor) << "\'";
sql << " ORDER BY key DESC, sequence DESC)"
sql << " ORDER BY key DESC, ledger_seq DESC)"
<< " sub WHERE NOT deleted"
<< " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
@@ -363,7 +373,7 @@ PostgresBackend::fetchTransactions(
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT transaction,metadata,ledger_sequence FROM transactions "
sql << "SELECT transaction,metadata,ledger_seq FROM transactions "
"WHERE ";
bool first = true;
for (auto const& hash : hashes)
@@ -419,8 +429,8 @@ PostgresBackend::fetchLedgerObjects(
<< "\'\\x" << ripple::strHex(key) << "\'";
}
sql << " ) "
<< " AND sequence <= " << std::to_string(sequence)
<< " ORDER BY sequence DESC LIMIT 1";
<< " AND ledger_seq <= " << std::to_string(sequence)
<< " ORDER BY ledger_seq DESC LIMIT 1";
auto res = pgQuery(sql.str().data());
if (size_t numRows = checkResult(res, 1))
{
@@ -446,11 +456,11 @@ PostgresBackend::fetchAccountTransactions(
{
PgQuery pgQuery(pgPool_);
std::stringstream sql;
sql << "SELECT hash, ledger_sequence, transaction_index FROM "
sql << "SELECT hash, ledger_seq, transaction_index FROM "
"account_transactions WHERE account = "
<< ripple::strHex(account);
if (cursor)
sql << " AND ledger_sequence < " << cursor->ledgerSequence
sql << " AND ledger_seq < " << cursor->ledgerSequence
<< " AND transaction_index < " << cursor->transactionIndex;
sql << " LIMIT " << std::to_string(limit);
auto res = pgQuery(sql.str().data());
@@ -510,8 +520,10 @@ PostgresBackend::finishWrites() const
if (abortWrite_)
return false;
PgQuery pg(pgPool_);
std::string objectsStr = objectsBuffer_.str();
if (objectsStr.size())
pg.bulkInsert("objects", objectsStr);
pg.bulkInsert("transactions", transactionsBuffer_.str());
pg.bulkInsert("objects", objectsBuffer_.str());
pg.bulkInsert("books", booksBuffer_.str());
pg.bulkInsert("account_transactions", accountTxBuffer_.str());
auto res = pg("COMMIT");

View File

@@ -258,6 +258,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
<< "Deserialized ledger header. " << detail::toString(lgrInfo);
flatMapBackend_->startWrites();
flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
std::vector<AccountTransactionsData> accountTxData{
insertTransactions(lgrInfo, rawData)};
@@ -302,8 +304,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData)
std::move(bookDir));
}
flatMapBackend_->writeAccountTransactions(std::move(accountTxData));
flatMapBackend_->writeLedger(
lgrInfo, std::move(*rawData.mutable_ledger_header()));
bool success = flatMapBackend_->finishWrites();
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " : "