diff --git a/handlers/ServerInfo.cpp b/handlers/ServerInfo.cpp index 18120969..0d064c86 100644 --- a/handlers/ServerInfo.cpp +++ b/handlers/ServerInfo.cpp @@ -37,7 +37,7 @@ doServerInfo( cur = keyIndex->keyIndex; auto page = backend.fetchLedgerPage({}, cur, 1); boost::json::object entry; - entry["complete"] = page.warning.has_value(); + entry["complete"] = !page.warning.has_value(); entry["sequence"] = cur; indexes.emplace_back(entry); cur = cur + 1; diff --git a/reporting/BackendIndexer.cpp b/reporting/BackendIndexer.cpp index 42097645..8b30e20e 100644 --- a/reporting/BackendIndexer.cpp +++ b/reporting/BackendIndexer.cpp @@ -114,6 +114,8 @@ BackendIndexer::writeKeyFlagLedgerAsync( try { { + BOOST_LOG_TRIVIAL(info) + << "writeKeyFlagLedger - checking for complete..."; auto page = backend.fetchLedgerPage({}, nextFlag.keyIndex, 1); if (!page.warning) @@ -126,6 +128,8 @@ BackendIndexer::writeKeyFlagLedgerAsync( << std::to_string(ledgerSequence); return; } + BOOST_LOG_TRIVIAL(info) + << "writeKeyFlagLedger - is not complete"; } indexing_ = nextFlag.keyIndex; auto start = std::chrono::system_clock::now(); diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 8b6d4814..aa01ae17 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -275,11 +275,14 @@ public: assert(false); throw std::runtime_error("Missing base flag ledger"); } - BOOST_LOG_TRIVIAL(debug) << __func__ << " recursing"; - uint32_t lowerSequence = ledgerSequence >> indexer_.getKeyShift() - << indexer_.getKeyShift(); + uint32_t lowerSequence = (ledgerSequence - 1) >> + indexer_.getKeyShift() << indexer_.getKeyShift(); if (lowerSequence < rng->minSequence) lowerSequence = rng->minSequence; + BOOST_LOG_TRIVIAL(debug) + << __func__ << " recursing. ledgerSequence = " + << std::to_string(ledgerSequence) + << " , lowerSequence = " << std::to_string(lowerSequence); auto lowerPage = fetchLedgerPage(cursor, lowerSequence, limit); std::vector keys; std::transform( diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index e9ce37e6..c5e351b2 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -844,11 +844,10 @@ create table if not exists account_transactions7 partition of account_transactio CREATE TABLE IF NOT EXISTS keys ( ledger_seq bigint NOT NULL, - key bytea NOT NULL + key bytea NOT NULL, + PRIMARY KEY(ledger_seq, key) ); -CREATE INDEX key_idx ON keys USING btree(ledger_seq, key); - -- account_tx() RPC helper. From the rippled reporting process, only the -- parameters without defaults are required. For the parameters with -- defaults, validation should be done by rippled, such as: diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 06441fac..b474558b 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -640,9 +640,6 @@ PostgresBackend::doFinishWrites() const BOOST_LOG_TRIVIAL(debug) << __func__ << " objects size = " << objectsStr.size() << " txns size = " << txStr.size(); - std::string keysStr = keysBuffer_.str(); - if (keysStr.size()) - writeConnection_.bulkInsert("keys", keysStr); } auto res = writeConnection_("COMMIT"); if (!res || res.status() != PGRES_COMMAND_OK) @@ -655,8 +652,6 @@ PostgresBackend::doFinishWrites() const transactionsBuffer_.clear(); objectsBuffer_.str(""); objectsBuffer_.clear(); - keysBuffer_.str(""); - keysBuffer_.clear(); accountTxBuffer_.str(""); accountTxBuffer_.clear(); numRowsInObjectsBuffer_ = 0; @@ -675,8 +670,12 @@ PostgresBackend::writeKeys( PgQuery& conn = isAsync ? pgQuery : writeConnection_; std::stringstream asyncBuffer; std::stringstream& buffer = isAsync ? asyncBuffer : keysBuffer_; + std::string tableName = isAsync ? "keys_temp_async" : "keys_temp"; if (isAsync) conn("BEGIN"); + conn(std::string{ + "CREATE TABLE " + tableName + " AS SELECT * FROM keys WITH NO DATA"} + .c_str()); size_t numRows = 0; for (auto& key : keys) { @@ -687,20 +686,25 @@ PostgresBackend::writeKeys( // When writing in the background, we insert after every 10000 rows if ((isAsync && numRows == 10000) || numRows == 100000) { - conn.bulkInsert("keys", buffer.str()); + conn.bulkInsert(tableName.c_str(), buffer.str()); std::stringstream temp; buffer.swap(temp); numRows = 0; } } + if (numRows > 0) + conn.bulkInsert(tableName.c_str(), buffer.str()); + conn(std::string{ + "INSERT INTO keys SELECT * FROM " + tableName + + " ON CONFLICT DO NOTHING"} + .c_str()); + conn(std::string{"DROP TABLE " + tableName}.c_str()); if (isAsync) { - if (numRows > 0) - conn.bulkInsert("keys", buffer.str()); - std::stringstream temp; - buffer.swap(temp); conn("COMMIT"); } + std::stringstream temp; + buffer.swap(temp); return true; } bool diff --git a/test.py b/test.py index 72d296ab..83066d1d 100755 --- a/test.py +++ b/test.py @@ -764,6 +764,15 @@ async def fee(ip, port): print(json.dumps(res,indent=4,sort_keys=True)) except websockets.exceptions.connectionclosederror as e: print(e) +async def server_info(ip, port): + address = 'ws://' + str(ip) + ':' + str(port) + try: + async with websockets.connect(address) as ws: + await ws.send(json.dumps({"command":"server_info"})) + res = json.loads(await ws.recv()) + print(json.dumps(res,indent=4,sort_keys=True)) + except websockets.exceptions.connectionclosederror as e: + print(e) async def ledger_diff(ip, port, base, desired, includeBlobs): address = 'ws://' + str(ip) + ':' + str(port) @@ -785,7 +794,7 @@ async def perf(ip, port): parser = argparse.ArgumentParser(description='test script for xrpl-reporting') -parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee"]) +parser.add_argument('action', choices=["account_info", "tx", "txs","account_tx", "account_tx_full","ledger_data", "ledger_data_full", "book_offers","ledger","ledger_range","ledger_entry", "ledgers", "ledger_entries","account_txs","account_infos","account_txs_full","book_offerses","ledger_diff","perf","fee","server_info"]) parser.add_argument('--ip', default='127.0.0.1') parser.add_argument('--port', default='8080') @@ -828,6 +837,8 @@ def run(args): args.ledger = asyncio.get_event_loop().run_until_complete(ledger_range(args.ip, args.port))[1] if args.action == "fee": asyncio.get_event_loop().run_until_complete(fee(args.ip, args.port)) + elif args.action == "server_info": + asyncio.get_event_loop().run_until_complete(server_info(args.ip, args.port)) elif args.action == "perf": asyncio.get_event_loop().run_until_complete( perf(args.ip,args.port))