diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index ecd3a7fe..8b6dc4ba 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -144,6 +144,9 @@ public: virtual bool finishWrites() const = 0; + virtual bool + doOnlineDelete(uint32_t minLedgerToKeep) const = 0; + virtual ~BackendInterface() { } diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index 056aeb04..dcbec44f 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -265,6 +265,12 @@ CassandraBackend::fetchAllTransactionHashesInLedger( } while (result.nextRow()); return hashes; } +bool +CassandraBackend::doOnlineDelete(uint32_t minLedgerToKeep) const +{ + throw std::runtime_error("doOnlineDelete : unimplemented"); + return false; +} void CassandraBackend::open() diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 259a6401..c50b2712 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -1457,6 +1457,8 @@ public: syncCv_.wait(lck, [this]() { return finishedAllRequests(); }); } + bool + doOnlineDelete(uint32_t minLedgerToKeep) const override; boost::asio::io_context& getIOContext() const diff --git a/reporting/DBHelpers.h b/reporting/DBHelpers.h index 1a66553d..2775c065 100644 --- a/reporting/DBHelpers.h +++ b/reporting/DBHelpers.h @@ -52,9 +52,22 @@ isOffer(std::string const& object) short offer_bytes = (object[1] << 8) | object[2]; return offer_bytes == 0x006f; } +template +inline bool +isOfferHex(T const& object) +{ + auto blob = ripple::strUnHex(4, object.begin(), object.begin() + 4); + if (blob) + { + short offer_bytes = ((*blob)[1] << 8) | (*blob)[2]; + return offer_bytes == 0x006f; + } + return false; +} +template inline ripple::uint256 -getBook(std::string const& offer) +getBook(T const& offer) { ripple::SerialIter it{offer.data(), offer.size()}; ripple::SLE sle{it, {}}; diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index a9220ab9..245a9137 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -583,5 +583,103 @@ PostgresBackend::finishWrites() const numRowsInObjectsBuffer_ = 0; return true; } +bool +PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const +{ + uint32_t limit = 2048; + PgQuery pgQuery(pgPool_); + { + std::stringstream sql; + sql << "DELETE FROM ledgers WHERE ledger_seq < " + << std::to_string(minLedgerToKeep); + auto res = pgQuery(sql.str().data()); + if (res.msg() != "ok") + throw std::runtime_error("Error deleting from ledgers table"); + } + + std::string cursor; + do + { + std::stringstream sql; + sql << "SELECT DISTINCT ON (key) key,ledger_seq,object FROM objects" + << " WHERE ledger_seq <= " << std::to_string(minLedgerToKeep); + if (cursor.size()) + sql << " AND key < \'\\x" << cursor << "\'"; + sql << " ORDER BY key DESC, ledger_seq DESC" + << " LIMIT " << std::to_string(limit); + BOOST_LOG_TRIVIAL(trace) << __func__ << sql.str(); + auto res = pgQuery(sql.str().data()); + BOOST_LOG_TRIVIAL(debug) << __func__ << "Fetched a page"; + if (size_t numRows = checkResult(res, 3)) + { + std::stringstream deleteSql; + std::stringstream deleteOffersSql; + deleteSql << "DELETE FROM objects WHERE ("; + deleteOffersSql << "DELETE FROM books WHERE ("; + bool firstOffer = true; + for (size_t i = 0; i < numRows; ++i) + { + std::string_view keyView{res.c_str(i, 0) + 2}; + int64_t sequence = res.asBigInt(i, 1); + std::string_view objView{res.c_str(i, 2) + 2}; + if (i != 0) + deleteSql << " OR "; + + deleteSql << "(key = " + << "\'\\x" << keyView << "\'"; + if (objView.size() == 0) + deleteSql << " AND ledger_seq <= " + << std::to_string(sequence); + else + deleteSql << " AND ledger_seq < " + << std::to_string(sequence); + deleteSql << ")"; + bool deleteOffer = false; + if (objView.size()) + { + deleteOffer = isOfferHex(objView); + } + else + { + // This is rather unelegant. For a deleted object, we don't + // know its type just from the key (or do we?). So, we just + // assume it is an offer and try to delete it. The + // alternative is to read the actual object out of the db + // from before it was deleted. This could result in a lot of + // individual reads though, so we chose to just delete + deleteOffer = true; + } + if (deleteOffer) + { + if (!firstOffer) + deleteOffersSql << " OR "; + deleteOffersSql << "( offer_key = " + << "\'\\x" << keyView << "\')"; + firstOffer = false; + } + } + if (numRows == limit) + cursor = res.c_str(numRows - 1, 0) + 2; + else + cursor = {}; + deleteSql << ")"; + deleteOffersSql << ")"; + BOOST_LOG_TRIVIAL(trace) << __func__ << deleteSql.str(); + res = pgQuery(deleteSql.str().data()); + if (res.msg() != "ok") + throw std::runtime_error("Error deleting from objects table"); + if (!firstOffer) + { + BOOST_LOG_TRIVIAL(trace) << __func__ << deleteOffersSql.str(); + res = pgQuery(deleteOffersSql.str().data()); + if (res.msg() != "ok") + throw std::runtime_error("Error deleting from books table"); + } + BOOST_LOG_TRIVIAL(debug) + << __func__ << "Deleted a page. Cursor = " << cursor; + } + } while (cursor.size()); + return true; +} } // namespace Backend diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 9f09ae8a..6653dbe9 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -109,6 +109,9 @@ public: bool finishWrites() const override; + + bool + doOnlineDelete(uint32_t minLedgerToKeep) const override; }; } // namespace Backend #endif diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 1c117bdb..ed429dd8 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -448,6 +448,20 @@ ReportingETL::runETLPipeline(uint32_t startSequence) publishLedger(lgrInfo); lastPublishedSequence = lgrInfo.seq; } + auto range = flatMapBackend_->fetchLedgerRange(); + if (onlineDeleteInterval_ && !deleting_ && + range->maxSequence - range->minSequence > + *onlineDeleteInterval_) + { + deleting_ = true; + ioContext_.post([this, &range]() { + BOOST_LOG_TRIVIAL(info) << "Running online delete"; + flatMapBackend_->doOnlineDelete( + range->maxSequence - *onlineDeleteInterval_); + BOOST_LOG_TRIVIAL(info) << "Finished online delete"; + deleting_ = false; + }); + } } }}; @@ -647,5 +661,7 @@ ReportingETL::ReportingETL( startSequence_ = config.at("start_sequence").as_int64(); if (config.contains("read_only")) readOnly_ = config.at("read_only").as_bool(); + if (config.contains("online_delete")) + onlineDeleteInterval_ = config.at("online_delete").as_int64(); } diff --git a/reporting/ReportingETL.h b/reporting/ReportingETL.h index c1755249..1873a66a 100644 --- a/reporting/ReportingETL.h +++ b/reporting/ReportingETL.h @@ -60,6 +60,7 @@ class ReportingETL { private: std::unique_ptr flatMapBackend_; + std::optional onlineDeleteInterval_; std::thread worker_; boost::asio::io_context& ioContext_; @@ -89,6 +90,10 @@ private: /// Whether the software is stopping std::atomic_bool stopping_ = false; + /// Whether the software is performing online delete + // TODO this needs to live in the database, so diff servers can coordinate + // deletion + std::atomic_bool deleting_ = false; /// Used to determine when to write to the database during the initial /// ledger download. By default, the software downloads an entire ledger and