diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 56f5a7d1..865e6799 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -1359,6 +1359,16 @@ CassandraBackend::open(bool readOnly) ? config_["threads"].as_int64() : std::thread::hardware_concurrency(); int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0; + int keysTtl, keysIncr = pow(2, indexer_.getKeyShift()) * 4 * 2; + while (keysTtl < ttl) + { + keysTtl += keysIncr; + } + int booksTtl, booksIncr = pow(2, indexer_.getBookShift()) * 4 * 2; + while (booksTtl < ttl) + { + booksTtl += booksIncr; + } rc = cass_cluster_set_num_threads_io(cluster, threads); if (rc != CASS_OK) @@ -1530,7 +1540,9 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "keys" << " ( sequence bigint, key blob, PRIMARY KEY " - "(sequence, key))"; + "(sequence, key))" + " WITH default_time_to_live = " + << keysTtl; if (!executeSimpleStatement(query.str())) continue; @@ -1546,7 +1558,8 @@ CassandraBackend::open(bool readOnly) "blob>, PRIMARY KEY " "((book, sequence), quality_key)) WITH CLUSTERING ORDER BY " "(quality_key " - "ASC)"; + "ASC) AND default_time_to_live = " + << booksTtl; if (!executeSimpleStatement(query.str())) continue; query.str(""); diff --git a/reporting/Pg.cpp b/reporting/Pg.cpp index fc7bfe40..7f082b5d 100644 --- a/reporting/Pg.cpp +++ b/reporting/Pg.cpp @@ -242,8 +242,10 @@ Pg::bulkInsert(char const* table, std::string const& records) { // https://www.postgresql.org/docs/12/libpq-copy.html#LIBPQ-COPY-SEND assert(conn_.get()); - static auto copyCmd = boost::format(R"(COPY %s FROM stdin)"); - auto res = query(boost::str(copyCmd % table).c_str()); + auto copyCmd = boost::format(R"(COPY %s FROM stdin)"); + auto formattedCmd = boost::str(copyCmd % table); + BOOST_LOG_TRIVIAL(info) << __func__ << " " << formattedCmd; + auto res = query(formattedCmd.c_str()); if (!res || res.status() != PGRES_COPY_IN) { std::stringstream ss; @@ -284,7 +286,8 @@ Pg::bulkInsert(char const* table, std::string const& records) { std::stringstream ss; ss << "bulkInsert to " << table - << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status; + << ". PQputCopyEnd status not PGRES_COMMAND_OK: " << status + << " message = " << PQerrorMessage(conn_.get()); disconnect(); BOOST_LOG_TRIVIAL(error) << __func__ << " " << records; throw std::runtime_error(ss.str()); @@ -750,11 +753,12 @@ CREATE TABLE IF NOT EXISTS ledgers ( CREATE TABLE IF NOT EXISTS objects ( key bytea NOT NULL, - ledger_seq bigint NOT NULL, - object bytea, - PRIMARY KEY(key, ledger_seq) + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, + object bytea ) PARTITION BY RANGE (ledger_seq); +CREATE INDEX objects_idx ON objects USING btree(key, ledger_seq); + create table if not exists objects1 partition of objects for values from (0) to (10000000); create table if not exists objects2 partition of objects for values from (10000000) to (20000000); create table if not exists objects3 partition of objects for values from (20000000) to (30000000); @@ -772,7 +776,7 @@ CREATE INDEX IF NOT EXISTS ledgers_ledger_hash_idx ON ledgers -- cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS transactions ( hash bytea NOT NULL, - ledger_seq bigint NOT NULL , + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, transaction bytea NOT NULL, metadata bytea NOT NULL ) PARTITION BY RANGE(ledger_seq); @@ -791,7 +795,7 @@ create index if not exists tx_by_lgr_seq on transactions using hash (ledger_seq) -- ledger table cascade here based on ledger_seq. CREATE TABLE IF NOT EXISTS account_transactions ( account bytea NOT NULL, - ledger_seq bigint NOT NULL , + ledger_seq bigint NOT NULL REFERENCES ledgers ON DELETE CASCADE, transaction_index bigint NOT NULL, hash bytea NOT NULL, PRIMARY KEY (account, ledger_seq, transaction_index, hash) @@ -815,7 +819,7 @@ CREATE TABLE IF NOT EXISTS books ( CREATE INDEX book_idx ON books using btree(ledger_seq, book, offer_key); CREATE TABLE IF NOT EXISTS keys ( - ledger_seq bigint NOT NULL, + ledger_seq bigint NOT NULL, key bytea NOT NULL ); diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index 9d9c003c..87385324 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -884,6 +884,44 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const return false; uint32_t limit = 2048; PgQuery pgQuery(pgPool_); + pgQuery("SET statement_timeout TO 0"); + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + fetchLedgerPage(cursor, minLedger, 256); + if (warning) + { + BOOST_LOG_TRIVIAL(warning) << __func__ + << " online delete running but " + "flag ledger is not complete"; + std::this_thread::sleep_for(std::chrono::seconds(10)); + continue; + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + std::stringstream objectsBuffer; + + for (auto& obj : objects) + { + objectsBuffer << "\\\\x" << ripple::strHex(obj.key) << '\t' + << std::to_string(minLedger) << '\t' << "\\\\x" + << ripple::strHex(obj.blob) << '\n'; + } + pgQuery.bulkInsert("objects", objectsBuffer.str()); + cursor = curCursor; + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + BOOST_LOG_TRIVIAL(info) << __func__ << " finished inserting into objects"; { std::stringstream sql; sql << "DELETE FROM ledgers WHERE ledger_seq < " @@ -892,90 +930,22 @@ PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const if (res.msg() != "ok") throw std::runtime_error("Error deleting from ledgers table"); } - - std::string cursor; - do { std::stringstream sql; - sql << "SELECT DISTINCT ON (key) key,ledger_seq,object FROM objects" - << " WHERE ledger_seq <= " << std::to_string(minLedger); - if (cursor.size()) - sql << " AND key < \'\\x" << cursor << "\'"; - sql << " ORDER BY key DESC, ledger_seq DESC" - << " LIMIT " << std::to_string(limit); - BOOST_LOG_TRIVIAL(trace) << __func__ << sql.str(); + sql << "DELETE FROM keys WHERE ledger_seq < " + << std::to_string(minLedger); auto res = pgQuery(sql.str().data()); - BOOST_LOG_TRIVIAL(debug) << __func__ << "Fetched a page"; - if (size_t numRows = checkResult(res, 3)) - { - std::stringstream deleteSql; - std::stringstream deleteOffersSql; - deleteSql << "DELETE FROM objects WHERE ("; - deleteOffersSql << "DELETE FROM books WHERE ("; - bool firstOffer = true; - for (size_t i = 0; i < numRows; ++i) - { - std::string_view keyView{res.c_str(i, 0) + 2}; - int64_t sequence = res.asBigInt(i, 1); - std::string_view objView{res.c_str(i, 2) + 2}; - if (i != 0) - deleteSql << " OR "; - - deleteSql << "(key = " - << "\'\\x" << keyView << "\'"; - if (objView.size() == 0) - deleteSql << " AND ledger_seq <= " - << std::to_string(sequence); - else - deleteSql << " AND ledger_seq < " - << std::to_string(sequence); - deleteSql << ")"; - bool deleteOffer = false; - if (objView.size()) - { - deleteOffer = isOfferHex(objView); - } - else - { - // This is rather unelegant. For a deleted object, we - // don't know its type just from the key (or do we?). - // So, we just assume it is an offer and try to delete - // it. The alternative is to read the actual object out - // of the db from before it was deleted. This could - // result in a lot of individual reads though, so we - // chose to just delete - deleteOffer = true; - } - if (deleteOffer) - { - if (!firstOffer) - deleteOffersSql << " OR "; - deleteOffersSql << "( offer_key = " - << "\'\\x" << keyView << "\')"; - firstOffer = false; - } - } - if (numRows == limit) - cursor = res.c_str(numRows - 1, 0) + 2; - else - cursor = {}; - deleteSql << ")"; - deleteOffersSql << ")"; - BOOST_LOG_TRIVIAL(trace) << __func__ << deleteSql.str(); - res = pgQuery(deleteSql.str().data()); - if (res.msg() != "ok") - throw std::runtime_error("Error deleting from objects table"); - if (!firstOffer) - { - BOOST_LOG_TRIVIAL(trace) << __func__ << deleteOffersSql.str(); - res = pgQuery(deleteOffersSql.str().data()); - if (res.msg() != "ok") - throw std::runtime_error("Error deleting from books table"); - } - BOOST_LOG_TRIVIAL(debug) - << __func__ << "Deleted a page. Cursor = " << cursor; - } - } while (cursor.size()); + if (res.msg() != "ok") + throw std::runtime_error("Error deleting from keys table"); + } + { + std::stringstream sql; + sql << "DELETE FROM books WHERE ledger_seq < " + << std::to_string(minLedger); + auto res = pgQuery(sql.str().data()); + if (res.msg() != "ok") + throw std::runtime_error("Error deleting from books table"); + } return true; }