postgres support for new books and keys tables

This commit is contained in:
CJ Cobb
2021-04-29 14:51:03 +00:00
parent 1f2d670e2b
commit f1ff81ddc5
4 changed files with 24 additions and 20 deletions

View File

@@ -1188,8 +1188,6 @@ public:
, isDeleted(isDeleted) , isDeleted(isDeleted)
, book(std::move(inBook)) , book(std::move(inBook))
{ {
if (book)
++refs;
} }
}; };
struct WriteAccountTxCallbackData struct WriteAccountTxCallbackData
@@ -1290,8 +1288,6 @@ public:
std::move(book)); std::move(book));
write(*data, false); write(*data, false);
if (hasBook)
writeBook(*data, false);
} }
void void

View File

@@ -282,6 +282,7 @@ Pg::bulkInsert(char const* table, std::string const& records)
ss << "bulkInsert to " << table ss << "bulkInsert to " << table
<< ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status;
disconnect(); disconnect();
BOOST_LOG_TRIVIAL(debug) << __func__ << " " << records;
throw std::runtime_error(ss.str()); throw std::runtime_error(ss.str());
} }
} }
@@ -802,11 +803,16 @@ create table if not exists account_transactions7 partition of account_transactio
-- Table that maps a book to a list of offers in that book. Deletes from the ledger table -- Table that maps a book to a list of offers in that book. Deletes from the ledger table
-- cascade here based on ledger_seq. -- cascade here based on ledger_seq.
CREATE TABLE IF NOT EXISTS books ( CREATE TABLE IF NOT EXISTS books (
book bytea NOT NULL,
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
deleted boolean NOT NULL, book bytea NOT NULL,
offer_key bytea NOT NULL, offer_key bytea NOT NULL,
PRIMARY KEY(book, offer_key, deleted) PRIMARY KEY(ledger_seq, book, offer_key)
);
CREATE TABLE IF NOT EXISTS keys (
ledger_seq bigint NOT NULL,
key bytea NOT NULL,
PRIMARY KEY(ledger_seq, key)
); );
-- account_tx() RPC helper. From the rippled reporting process, only the -- account_tx() RPC helper. From the rippled reporting process, only the

View File

@@ -82,13 +82,6 @@ PostgresBackend::doWriteLedgerObject(
BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer"; BOOST_LOG_TRIVIAL(info) << __func__ << " Flushed large buffer";
objectsBuffer_ = {}; objectsBuffer_ = {};
} }
if (book)
{
booksBuffer_ << "\\\\x" << ripple::strHex(*book) << '\t'
<< std::to_string(seq) << '\t' << isDeleted << '\t'
<< "\\\\x" << ripple::strHex(key) << '\n';
}
} }
void void
@@ -658,7 +651,6 @@ PostgresBackend::doFinishWrites() const
if (!abortWrite_) if (!abortWrite_)
{ {
writeConnection_.bulkInsert("transactions", transactionsBuffer_.str()); writeConnection_.bulkInsert("transactions", transactionsBuffer_.str());
writeConnection_.bulkInsert("books", booksBuffer_.str());
writeConnection_.bulkInsert( writeConnection_.bulkInsert(
"account_transactions", accountTxBuffer_.str()); "account_transactions", accountTxBuffer_.str());
std::string objectsStr = objectsBuffer_.str(); std::string objectsStr = objectsBuffer_.str();
@@ -688,7 +680,9 @@ PostgresBackend::writeKeys(
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,
uint32_t ledgerSequence) const uint32_t ledgerSequence) const
{ {
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("BEGIN");
std::stringstream keysBuffer; std::stringstream keysBuffer;
size_t numRows = 0; size_t numRows = 0;
for (auto& key : keys) for (auto& key : keys)
@@ -701,7 +695,8 @@ PostgresBackend::writeKeys(
if (numRows == 1000000) if (numRows == 1000000)
{ {
pgQuery.bulkInsert("keys", keysBuffer.str()); pgQuery.bulkInsert("keys", keysBuffer.str());
keysBuffer = {}; std::stringstream temp;
keysBuffer.swap(temp);
numRows = 0; numRows = 0;
} }
} }
@@ -709,6 +704,8 @@ PostgresBackend::writeKeys(
{ {
pgQuery.bulkInsert("keys", keysBuffer.str()); pgQuery.bulkInsert("keys", keysBuffer.str());
} }
pgQuery("COMMIT");
return true;
} }
bool bool
PostgresBackend::writeBooks( PostgresBackend::writeBooks(
@@ -717,15 +714,17 @@ PostgresBackend::writeBooks(
std::unordered_set<ripple::uint256>> const& books, std::unordered_set<ripple::uint256>> const& books,
uint32_t ledgerSequence) const uint32_t ledgerSequence) const
{ {
BOOST_LOG_TRIVIAL(debug) << __func__;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
pgQuery("BEGIN");
std::stringstream booksBuffer; std::stringstream booksBuffer;
size_t numRows = 0; size_t numRows = 0;
for (auto& book : books) for (auto& book : books)
{ {
for (auto& offer : book.second) for (auto& offer : book.second)
{ {
booksBuffer << "\\\\x" << ripple::strHex(book.first) << '\t' booksBuffer << std::to_string(ledgerSequence) << '\t' << "\\\\x"
<< std::to_string(ledgerSequence) << '\t' << "\\\\x" << ripple::strHex(book.first) << '\t' << "\\\\x"
<< ripple::strHex(offer) << '\n'; << ripple::strHex(offer) << '\n';
numRows++; numRows++;
// If the buffer gets too large, the insert fails. Not sure why. So // If the buffer gets too large, the insert fails. Not sure why. So
@@ -733,7 +732,8 @@ PostgresBackend::writeBooks(
if (numRows == 1000000) if (numRows == 1000000)
{ {
pgQuery.bulkInsert("books", booksBuffer.str()); pgQuery.bulkInsert("books", booksBuffer.str());
booksBuffer = {}; std::stringstream temp;
booksBuffer.swap(temp);
numRows = 0; numRows = 0;
} }
} }
@@ -742,6 +742,8 @@ PostgresBackend::writeBooks(
{ {
pgQuery.bulkInsert("books", booksBuffer.str()); pgQuery.bulkInsert("books", booksBuffer.str());
} }
pgQuery("COMMIT");
return true;
} }
bool bool
PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const

View File

@@ -436,7 +436,7 @@ async def ledger_data(ip, port, ledger, limit, binary, cursor):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
try: try:
async with websockets.connect(address) as ws: async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor"cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"limit":int(limit),"cursor":cursor}))
await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor})) await ws.send(json.dumps({"command":"ledger_data","ledger_index":int(ledger),"binary":bool(binary),"cursor":cursor}))
res = json.loads(await ws.recv()) res = json.loads(await ws.recv())
objects = [] objects = []