diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 36bc356f..adfd41c0 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1188,8 +1188,6 @@ public: , isDeleted(isDeleted) , book(std::move(inBook)) { - if (book) - ++refs; } }; struct WriteAccountTxCallbackData @@ -1290,8 +1288,6 @@ public: std::move(book)); write(*data, false); - if (hasBook) - writeBook(*data, false); } void diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index e313d4ae..3d19075d 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -282,6 +282,7 @@ Pg::bulkInsert(char const* table, std::string const& records) ss << "bulkInsert to " << table << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; disconnect(); + BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records; throw std::runtime_error(ss.str()); } } @@ -802,11 +803,16 @@ create table if not exists account_transactions7 partition of account_transactio -- Table that maps a book to a list of offers in that book. Deletes from the ledger table -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS books ( - book bytea NOT NULL, ledger_seq bigint NOT NULL, - deleted boolean NOT NULL, + book bytea NOT NULL, offer_key bytea NOT NULL, - PRIMARY KEY(book, offer_key, deleted) + PRIMARY KEY(ledger_seq, book, offer_key) +); + +CREATE TABLE IF NOT EXISTS keys ( + ledger_seq bigint NOT NULL, + key bytea NOT NULL, + PRIMARY KEY(ledger_seq, key) ); -- account_tx() RPC helper. From the rippled reporting process, only the diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index f9c9778c..3a69700c 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -82,13 +82,6 @@ PostgresBackend::doWriteLedgerObject( BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer"; objectsBuffer_ = {}; } - - if (book) - { - booksBuffer_ << "\\\\x" << ripple::strHex(*book) << '\t' - << std::to_string(seq) << '\t' << isDeleted << '\t' - << "\\\\x" << ripple::strHex(key) << '\n'; - } } void @@ -658,7 +651,6 @@ PostgresBackend::doFinishWrites() const if (!abortWrite_) { writeConnection_.bulkInsert("transactions", transactionsBuffer_.str()); - writeConnection_.bulkInsert("books", booksBuffer_.str()); writeConnection_.bulkInsert( "account_transactions", accountTxBuffer_.str()); std::string objectsStr = objectsBuffer_.str(); @@ -688,7 +680,9 @@ PostgresBackend::writeKeys( std::unordered_set const& keys, uint32_t ledgerSequence) const { + BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); + pgQuery("BEGIN"); std::stringstream keysBuffer; size_t numRows = 0; for (auto& key : keys) @@ -701,7 +695,8 @@ PostgresBackend::writeKeys( if (numRows == 1000000) { pgQuery.bulkInsert("keys", keysBuffer.str()); - keysBuffer = {}; + std::stringstream temp; + keysBuffer.swap(temp); numRows = 0; } } @@ -709,6 +704,8 @@ PostgresBackend::writeKeys( { pgQuery.bulkInsert("keys", keysBuffer.str()); } + pgQuery("COMMIT"); + return true; } bool PostgresBackend::writeBooks( @@ -717,15 +714,17 @@ PostgresBackend::writeBooks( std::unordered_set> const& books, uint32_t ledgerSequence) const { + BOOST_LOG_TRIVIAL(debug) << __func__; PgQuery pgQuery(pgPool_); + pgQuery("BEGIN"); std::stringstream booksBuffer; size_t numRows = 0; for (auto& book : books) { for (auto& offer : book.second) { - booksBuffer << "\\\\x" << ripple::strHex(book.first) << '\t' - << std::to_string(ledgerSequence) << '\t' << "\\\\x" + booksBuffer << std::to_string(ledgerSequence) << '\t' << "\\\\x" + << ripple::strHex(book.first) << '\t' << "\\\\x" << ripple::strHex(offer) << '\n'; numRows++; // If the buffer gets too large, the insert fails. Not sure why. So @@ -733,7 +732,8 @@ PostgresBackend::writeBooks( if (numRows == 1000000) { pgQuery.bulkInsert("books", booksBuffer.str()); - booksBuffer = {}; + std::stringstream temp; + booksBuffer.swap(temp); numRows = 0; } } @@ -742,6 +742,8 @@ PostgresBackend::writeBooks( { pgQuery.bulkInsert("books", booksBuffer.str()); } + pgQuery("COMMIT"); + return true; } bool PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const diff --git a/test.py b/test.py index 1af35c8c..a71640cf 100755 --- a/test.py +++ b/test.py @@ -436,7 +436,7 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor): address = 'ws://' + str(ip) + ':' + str(port) try: async with websockets.connect(address) as ws: - await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor"cursor})) + await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor})) res = json.loads(await ws.recv()) objects = []