diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index e3c73662..d1dd6ba2 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -744,7 +744,7 @@ CREATE TABLE IF NOT EXISTS ledgers ( CREATE TABLE IF NOT EXISTS objects ( key bytea NOT NULL, - ledger_seq bigint NOT NULL, + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, object bytea, PRIMARY KEY(key, ledger_seq) ); @@ -769,7 +769,7 @@ CREATE TABLE IF NOT EXISTS account_transactions ( ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, transaction_index bigint NOT NULL, hash bytea NOT NULL, - PRIMARY KEY (account, ledger_seq, transaction_index), + PRIMARY KEY (account, ledger_seq, transaction_index) ); -- Table that maps a book to a list of offers in that book. Deletes from the ledger table -- cascade here based on ledger_seq. diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index fe1d243c..048fccbf 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -54,7 +54,7 @@ PostgresBackend::writeAccountTransactions( accountTxBuffer_ << "\\\\x" << acct << '\t' << std::to_string(ledgerSeq) << '\t' << std::to_string(idx) << '\t' << "\\\\x" - << ripple::strHex(txHash); + << ripple::strHex(txHash) << '\n'; } } } @@ -69,9 +69,19 @@ PostgresBackend::writeLedgerObject( { if (abortWrite_) return; + static int numRows = 0; + numRows++; objectsBuffer_ << "\\\\x" << ripple::strHex(key) << '\t' << std::to_string(seq) << '\t' << "\\\\x" << ripple::strHex(blob) << '\n'; + // If the buffer gets too large, the insert fails. Not sure why. So we + // insert after 1 million records + if (numRows % 1000000 == 0) + { + PgQuery pgQuery(pgPool_); + pgQuery.bulkInsert("objects", objectsBuffer_.str()); + objectsBuffer_ = {}; + } if (book) { @@ -138,16 +148,16 @@ checkResult(PgResult const& res, uint32_t numFieldsExpected) ripple::LedgerInfo parseLedgerInfo(PgResult const& res) { - char const* hash = res.c_str(0, 0); - char const* prevHash = res.c_str(0, 1); - char const* accountHash = res.c_str(0, 2); - char const* txHash = res.c_str(0, 3); - std::int64_t totalCoins = res.asBigInt(0, 4); - std::int64_t closeTime = res.asBigInt(0, 5); - std::int64_t parentCloseTime = res.asBigInt(0, 6); - std::int64_t closeTimeRes = res.asBigInt(0, 7); - std::int64_t closeFlags = res.asBigInt(0, 8); - std::int64_t ledgerSeq = res.asBigInt(0, 9); + std::int64_t ledgerSeq = res.asBigInt(0, 0); + char const* hash = res.c_str(0, 1); + char const* prevHash = res.c_str(0, 2); + std::int64_t totalCoins = res.asBigInt(0, 3); + std::int64_t closeTime = res.asBigInt(0, 4); + std::int64_t parentCloseTime = res.asBigInt(0, 5); + std::int64_t closeTimeRes = res.asBigInt(0, 6); + std::int64_t closeFlags = res.asBigInt(0, 7); + char const* accountHash = res.c_str(0, 8); + char const* txHash = res.c_str(0, 9); using time_point = ripple::NetClock::time_point; using duration = ripple::NetClock::duration; @@ -175,8 +185,8 @@ std::optional PostgresBackend::fetchLatestLedgerSequence() const { PgQuery pgQuery(pgPool_); - auto res = - pgQuery("SELECT sequence FROM ledgers ORDER BY sequence DESC LIMIT 1"); + auto res = pgQuery( + "SELECT ledger_seq FROM ledgers ORDER BY ledger_seq DESC LIMIT 1"); if (checkResult(res, 1)) return res.asBigInt(0, 0); return {}; @@ -187,7 +197,7 @@ PostgresBackend::fetchLedgerBySequence(uint32_t sequence) const { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT * FROM ledgers WHERE sequence = " + sql << "SELECT * FROM ledgers WHERE ledger_seq = " << std::to_string(sequence); auto res = pgQuery(sql.str().data()); if (checkResult(res, 10)) @@ -238,8 +248,8 @@ PostgresBackend::fetchLedgerObject( std::stringstream sql; sql << "SELECT object FROM objects WHERE key = " << "\'\\x" << ripple::strHex(key) << "\'" - << " AND sequence <= " << std::to_string(sequence) - << " ORDER BY sequence DESC LIMIT 1"; + << " AND ledger_seq <= " << std::to_string(sequence) + << " ORDER BY ledger_seq DESC LIMIT 1"; auto res = pgQuery(sql.str().data()); if (checkResult(res, 1)) { @@ -258,7 +268,7 @@ PostgresBackend::fetchTransaction(ripple::uint256 const& hash) const { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_sequence FROM transactions " + sql << "SELECT transaction,metadata,ledger_seq FROM transactions " "WHERE hash = " << "\'\\x" << ripple::strHex(hash) << "\'"; auto res = pgQuery(sql.str().data()); @@ -286,10 +296,10 @@ PostgresBackend::fetchLedgerPage( std::stringstream sql; sql << "SELECT key,object FROM" << " (SELECT DISTINCT ON (key) * FROM objects" - << " WHERE sequence <= " << std::to_string(ledgerSequence); + << " WHERE ledger_seq <= " << std::to_string(ledgerSequence); if (cursor) sql << " AND key > \'x\\" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key, sequence DESC) sub" + sql << " ORDER BY key, ledger_seq DESC) sub" << " WHERE object != \'\\x\'" << " LIMIT " << std::to_string(limit); auto res = pgQuery(sql.str().data()); @@ -325,10 +335,10 @@ PostgresBackend::fetchBookOffers( sql << "SELECT key FROM" << " (SELECT DISTINCT ON (key) * FROM books WHERE book = " << "\'\\x" << ripple::strHex(book) - << "\' AND sequence <= " << std::to_string(ledgerSequence); + << "\' AND ledger_seq <= " << std::to_string(ledgerSequence); if (cursor) sql << " AND key > \'" << ripple::strHex(*cursor) << "\'"; - sql << " ORDER BY key DESC, sequence DESC)" + sql << " ORDER BY key DESC, ledger_seq DESC)" << " sub WHERE NOT deleted" << " LIMIT " << std::to_string(limit); auto res = pgQuery(sql.str().data()); @@ -363,7 +373,7 @@ PostgresBackend::fetchTransactions( { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT transaction,metadata,ledger_sequence FROM transactions " + sql << "SELECT transaction,metadata,ledger_seq FROM transactions " "WHERE "; bool first = true; for (auto const& hash : hashes) @@ -419,8 +429,8 @@ PostgresBackend::fetchLedgerObjects( << "\'\\x" << ripple::strHex(key) << "\'"; } sql << " ) " - << " AND sequence <= " << std::to_string(sequence) - << " ORDER BY sequence DESC LIMIT 1"; + << " AND ledger_seq <= " << std::to_string(sequence) + << " ORDER BY ledger_seq DESC LIMIT 1"; auto res = pgQuery(sql.str().data()); if (size_t numRows = checkResult(res, 1)) { @@ -446,11 +456,11 @@ PostgresBackend::fetchAccountTransactions( { PgQuery pgQuery(pgPool_); std::stringstream sql; - sql << "SELECT hash, ledger_sequence, transaction_index FROM " + sql << "SELECT hash, ledger_seq, transaction_index FROM " "account_transactions WHERE account = " << ripple::strHex(account); if (cursor) - sql << " AND ledger_sequence < " << cursor->ledgerSequence + sql << " AND ledger_seq < " << cursor->ledgerSequence << " AND transaction_index < " << cursor->transactionIndex; sql << " LIMIT " << std::to_string(limit); auto res = pgQuery(sql.str().data()); @@ -510,8 +520,10 @@ PostgresBackend::finishWrites() const if (abortWrite_) return false; PgQuery pg(pgPool_); + std::string objectsStr = objectsBuffer_.str(); + if (objectsStr.size()) + pg.bulkInsert("objects", objectsStr); pg.bulkInsert("transactions", transactionsBuffer_.str()); - pg.bulkInsert("objects", objectsBuffer_.str()); pg.bulkInsert("books", booksBuffer_.str()); pg.bulkInsert("account_transactions", accountTxBuffer_.str()); auto res = pg("COMMIT"); diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 064fe910..25ab79fd 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -258,6 +258,8 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) << "Deserialized ledger header. " << detail::toString(lgrInfo); flatMapBackend_->startWrites(); + flatMapBackend_->writeLedger( + lgrInfo, std::move(*rawData.mutable_ledger_header())); std::vector accountTxData{ insertTransactions(lgrInfo, rawData)}; @@ -302,8 +304,6 @@ ReportingETL::buildNextLedger(org::xrpl::rpc::v1::GetLedgerResponse& rawData) std::move(bookDir)); } flatMapBackend_->writeAccountTransactions(std::move(accountTxData)); - flatMapBackend_->writeLedger( - lgrInfo, std::move(*rawData.mutable_ledger_header())); bool success = flatMapBackend_->finishWrites(); BOOST_LOG_TRIVIAL(debug) << __func__ << " : "