no duplicates in keys table for postgres

This commit is contained in:
CJ Cobb
2021-06-01 15:47:50 -04:00
parent d692f7f675
commit 9edb743dcf
6 changed files with 39 additions and 18 deletions

View File

@@ -37,7 +37,7 @@ doServerInfo(
cur = keyIndex->keyIndex; cur = keyIndex->keyIndex;
auto page = backend.fetchLedgerPage({}, cur, 1); auto page = backend.fetchLedgerPage({}, cur, 1);
boost::json::object entry; boost::json::object entry;
entry["complete"] = page.warning.has_value(); entry["complete"] = !page.warning.has_value();
entry["sequence"] = cur; entry["sequence"] = cur;
indexes.emplace_back(entry); indexes.emplace_back(entry);
cur = cur + 1; cur = cur + 1;

View File

@@ -114,6 +114,8 @@ BackendIndexer::writeKeyFlagLedgerAsync(
try try
{ {
{ {
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - checking for complete...";
auto page = auto page =
backend.fetchLedgerPage({}, nextFlag.keyIndex, 1); backend.fetchLedgerPage({}, nextFlag.keyIndex, 1);
if (!page.warning) if (!page.warning)
@@ -126,6 +128,8 @@ BackendIndexer::writeKeyFlagLedgerAsync(
<< std::to_string(ledgerSequence); << std::to_string(ledgerSequence);
return; return;
} }
BOOST_LOG_TRIVIAL(info)
<< "writeKeyFlagLedger - is not complete";
} }
indexing_ = nextFlag.keyIndex; indexing_ = nextFlag.keyIndex;
auto start = std::chrono::system_clock::now(); auto start = std::chrono::system_clock::now();

View File

@@ -275,11 +275,14 @@ public:
assert(false); assert(false);
throw std::runtime_error("Missing base flag ledger"); throw std::runtime_error("Missing base flag ledger");
} }
BOOST_LOG_TRIVIAL(debug) << __func__ << " recursing"; uint32_t lowerSequence = (ledgerSequence - 1) >>
uint32_t lowerSequence = ledgerSequence >> indexer_.getKeyShift() indexer_.getKeyShift() << indexer_.getKeyShift();
<< indexer_.getKeyShift();
if (lowerSequence < rng->minSequence) if (lowerSequence < rng->minSequence)
lowerSequence = rng->minSequence; lowerSequence = rng->minSequence;
BOOST_LOG_TRIVIAL(debug)
<< __func__ << " recursing. ledgerSequence = "
<< std::to_string(ledgerSequence)
<< " , lowerSequence = " << std::to_string(lowerSequence);
auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit); auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit);
std::vector<ripple::uint256> keys; std::vector<ripple::uint256> keys;
std::transform( std::transform(

View File

@@ -844,11 +844,10 @@ create table if not exists account_transactions7 partition of account_transactio
CREATE TABLE IF NOT EXISTS keys ( CREATE TABLE IF NOT EXISTS keys (
ledger_seq bigint NOT NULL, ledger_seq bigint NOT NULL,
key bytea NOT NULL key bytea NOT NULL,
PRIMARY KEY(ledger_seq, key)
); );
CREATE INDEX key_idx ON keys USING btree(ledger_seq, key);
-- account_tx() RPC helper. From the rippled reporting process, only the -- account_tx() RPC helper. From the rippled reporting process, only the
-- parameters without defaults are required. For the parameters with -- parameters without defaults are required. For the parameters with
-- defaults, validation should be done by rippled, such as: -- defaults, validation should be done by rippled, such as:

View File

@@ -640,9 +640,6 @@ PostgresBackend::doFinishWrites() const
BOOST_LOG_TRIVIAL(debug) BOOST_LOG_TRIVIAL(debug)
<< __func__ << " objects size = " << objectsStr.size() << __func__ << " objects size = " << objectsStr.size()
<< " txns size = " << txStr.size(); << " txns size = " << txStr.size();
std::string keysStr = keysBuffer_.str();
if (keysStr.size())
writeConnection_.bulkInsert("keys", keysStr);
} }
auto res = writeConnection_("COMMIT"); auto res = writeConnection_("COMMIT");
if (!res || res.status() != PGRES_COMMAND_OK) if (!res || res.status() != PGRES_COMMAND_OK)
@@ -655,8 +652,6 @@ PostgresBackend::doFinishWrites() const
transactionsBuffer_.clear(); transactionsBuffer_.clear();
objectsBuffer_.str(""); objectsBuffer_.str("");
objectsBuffer_.clear(); objectsBuffer_.clear();
keysBuffer_.str("");
keysBuffer_.clear();
accountTxBuffer_.str(""); accountTxBuffer_.str("");
accountTxBuffer_.clear(); accountTxBuffer_.clear();
numRowsInObjectsBuffer_ = 0; numRowsInObjectsBuffer_ = 0;
@@ -675,8 +670,12 @@ PostgresBackend::writeKeys(
PgQuery& conn = isAsync ? pgQuery : writeConnection_; PgQuery& conn = isAsync ? pgQuery : writeConnection_;
std::stringstream asyncBuffer; std::stringstream asyncBuffer;
std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_; std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_;
std::string tableName = isAsync ? "keys_temp_async" : "keys_temp";
if (isAsync) if (isAsync)
conn("BEGIN"); conn("BEGIN");
conn(std::string{
"CREATE TABLE " + tableName + " AS SELECT * FROM keys WITH NO DATA"}
.c_str());
size_t numRows = 0; size_t numRows = 0;
for (auto& key : keys) for (auto& key : keys)
{ {
@@ -687,20 +686,25 @@ PostgresBackend::writeKeys(
// When writing in the background, we insert after every 10000 rows // When writing in the background, we insert after every 10000 rows
if ((isAsync && numRows == 10000) || numRows == 100000) if ((isAsync && numRows == 10000) || numRows == 100000)
{ {
conn.bulkInsert("keys", buffer.str()); conn.bulkInsert(tableName.c_str(), buffer.str());
std::stringstream temp; std::stringstream temp;
buffer.swap(temp); buffer.swap(temp);
numRows = 0; numRows = 0;
} }
} }
if (numRows > 0)
conn.bulkInsert(tableName.c_str(), buffer.str());
conn(std::string{
"INSERT INTO keys SELECT * FROM " + tableName +
" ON CONFLICT DO NOTHING"}
.c_str());
conn(std::string{"DROP TABLE " + tableName}.c_str());
if (isAsync) if (isAsync)
{ {
if (numRows > 0)
conn.bulkInsert("keys", buffer.str());
std::stringstream temp;
buffer.swap(temp);
conn("COMMIT"); conn("COMMIT");
} }
std::stringstream temp;
buffer.swap(temp);
return true; return true;
} }
bool bool

13
test.py
View File

@@ -764,6 +764,15 @@ async def fee(ip, port):
print(json.dumps(res,indent=4,sort_keys=True)) print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e: except websockets.exceptions.connectionclosederror as e:
print(e) print(e)
async def server_info(ip, port):
address = 'ws://' + str(ip) + ':' + str(port)
try:
async with websockets.connect(address) as ws:
await ws.send(json.dumps({"command":"server_info"}))
res = json.loads(await ws.recv())
print(json.dumps(res,indent=4,sort_keys=True))
except websockets.exceptions.connectionclosederror as e:
print(e)
async def ledger_diff(ip, port, base, desired, includeBlobs): async def ledger_diff(ip, port, base, desired, includeBlobs):
address = 'ws://' + str(ip) + ':' + str(port) address = 'ws://' + str(ip) + ':' + str(port)
@@ -785,7 +794,7 @@ async def perf(ip, port):
parser = argparse.ArgumentParser(description='test script for xrpl-reporting') parser = argparse.ArgumentParser(description='test script for xrpl-reporting')
parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee"]) parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info"])
parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--ip', default='127.0.0.1')
parser.add_argument('--port', default='8080') parser.add_argument('--port', default='8080')
@@ -828,6 +837,8 @@ def run(args):
args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1]
if args.action == "fee": if args.action == "fee":
asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port))
elif args.action == "server_info":
asyncio.get_event_loop().run_until_complete(server_info(args.ip, args.port))
elif args.action == "perf": elif args.action == "perf":
asyncio.get_event_loop().run_until_complete( asyncio.get_event_loop().run_until_complete(
perf(args.ip,args.port)) perf(args.ip,args.port))