From 24816c021df1e8971b41fa31aa7e55f9c8db74fa Mon Sep 17 00:00:00 2001 From: CJ Cobb Date: Tue, 11 May 2021 21:14:47 +0000 Subject: [PATCH] online delete for cassandra. doesn't crash, but not sure it works --- reporting/BackendFactory.h | 5 +- reporting/BackendInterface.h | 2 +- reporting/CassandraBackend.cpp | 176 +++++++++++++++++++++++++++++++-- reporting/CassandraBackend.h | 13 ++- reporting/PostgresBackend.cpp | 12 ++- reporting/PostgresBackend.h | 2 +- reporting/ReportingETL.cpp | 3 +- 7 files changed, 197 insertions(+), 16 deletions(-) diff --git a/reporting/BackendFactory.h b/reporting/BackendFactory.h index c4630b0e..5ee48e79 100644 --- a/reporting/BackendFactory.h +++ b/reporting/BackendFactory.h @@ -8,12 +8,15 @@ namespace Backend { std::unique_ptr makeBackend(boost::json::object const& config) { - boost::json::object const& dbConfig = config.at("database").as_object(); + boost::json::object dbConfig = config.at("database").as_object(); auto type = dbConfig.at("type").as_string(); if (boost::iequals(type, "cassandra")) { + if (config.contains("online_delete")) + dbConfig.at(type).as_object()["ttl"] = + config.at("online_delete").as_int64() * 4; auto backend = std::make_unique(dbConfig.at(type).as_object()); return std::move(backend); diff --git a/reporting/BackendInterface.h b/reporting/BackendInterface.h index 17b68520..3fa3a6a5 100644 --- a/reporting/BackendInterface.h +++ b/reporting/BackendInterface.h @@ -399,7 +399,7 @@ public: doFinishWrites() const = 0; virtual bool - doOnlineDelete(uint32_t minLedgerToKeep) const = 0; + doOnlineDelete(uint32_t numLedgersToKeep) const = 0; virtual bool writeKeys( std::unordered_set const& keys, diff --git a/reporting/CassandraBackend.cpp b/reporting/CassandraBackend.cpp index fd812271..56f5a7d1 100644 --- a/reporting/CassandraBackend.cpp +++ b/reporting/CassandraBackend.cpp @@ -726,6 +726,87 @@ struct WriteKeyCallbackData { } }; +struct OnlineDeleteCallbackData +{ + CassandraBackend const& backend; + ripple::uint256 key; + uint32_t ledgerSequence; + std::vector object; + std::condition_variable& cv; + std::atomic_uint32_t& numOutstanding; + std::mutex& mtx; + uint32_t currentRetries = 0; + OnlineDeleteCallbackData( + CassandraBackend const& backend, + ripple::uint256&& key, + uint32_t ledgerSequence, + std::vector&& object, + std::condition_variable& cv, + std::mutex& mtx, + std::atomic_uint32_t& numOutstanding) + : backend(backend) + , key(std::move(key)) + , ledgerSequence(ledgerSequence) + , object(std::move(object)) + , cv(cv) + , mtx(mtx) + , numOutstanding(numOutstanding) + + { + } +}; +void +onlineDeleteCallback(CassFuture* fut, void* cbData); +void +onlineDelete(OnlineDeleteCallbackData& cb) +{ + { + CassandraStatement statement{ + cb.backend.getInsertObjectPreparedStatement()}; + statement.bindBytes(cb.key); + statement.bindInt(cb.ledgerSequence); + statement.bindBytes(cb.object); + + cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true); + } +} +void +onlineDeleteCallback(CassFuture* fut, void* cbData) +{ + OnlineDeleteCallbackData& requestParams = + *static_cast(cbData); + + CassandraBackend const& backend = requestParams.backend; + auto rc = cass_future_error_code(fut); + if (rc != CASS_OK) + { + // exponential backoff with a max wait of 2^10 ms (about 1 second) + auto wait = std::chrono::milliseconds( + lround(std::pow(2, std::min(10u, requestParams.currentRetries)))); + BOOST_LOG_TRIVIAL(error) + << "ERROR!!! Cassandra insert book error: " << rc << ", " + << cass_error_desc(rc) << ", retrying in " << wait.count() + << " milliseconds"; + ++requestParams.currentRetries; + std::shared_ptr timer = + std::make_shared( + backend.getIOContext(), + std::chrono::steady_clock::now() + wait); + timer->async_wait( + [timer, &requestParams](const boost::system::error_code& error) { + onlineDelete(requestParams); + }); + } + else + { + BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book"; + { + std::lock_guard lck(requestParams.mtx); + --requestParams.numOutstanding; + requestParams.cv.notify_one(); + } + } +} void writeKeyCallback(CassFuture* fut, void* cbData); void @@ -1105,8 +1186,77 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const bool CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { - throw std::runtime_error("doOnlineDelete : unimplemented"); - return false; + // calculate TTL + // ledgers close roughly every 4 seconds. We double the TTL so that way + // there is a window of time to update the database, to prevent unchanging + // records from being deleted. + auto rng = fetchLedgerRangeNoThrow(); + if (!rng) + return false; + uint32_t minLedger = rng->maxSequence - numLedgersToKeep; + if (minLedger <= rng->minSequence) + return false; + std::condition_variable cv; + std::mutex mtx; + std::vector> cbs; + uint32_t concurrentLimit = 10; + std::atomic_uint32_t numOutstanding = 0; + + // iterate through latest ledger, updating TTL + std::optional cursor; + while (true) + { + try + { + auto [objects, curCursor, warning] = + fetchLedgerPage(cursor, minLedger, 256); + if (warning) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ + << " online delete running but flag ledger is not complete"; + std::this_thread::sleep_for(std::chrono::seconds(10)); + continue; + } + + for (auto& obj : objects) + { + ++numOutstanding; + cbs.push_back(std::make_shared( + *this, + std::move(obj.key), + minLedger, + std::move(obj.blob), + cv, + mtx, + numOutstanding)); + + onlineDelete(*cbs.back()); + std::unique_lock lck(mtx); + BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex"; + cv.wait(lck, [&numOutstanding, concurrentLimit]() { + return numOutstanding < concurrentLimit; + }); + } + BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page"; + cursor = curCursor; + if (!cursor) + break; + } + catch (DatabaseTimeout const& e) + { + BOOST_LOG_TRIVIAL(warning) + << __func__ << " Database timeout fetching keys"; + std::this_thread::sleep_for(std::chrono::seconds(2)); + } + } + std::unique_lock lck(mtx); + cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; }); + CassandraStatement statement{deleteLedgerRange_}; + statement.bindInt(minLedger); + executeSyncWrite(statement); + // update ledger_range + return true; } void @@ -1208,6 +1358,7 @@ CassandraBackend::open(bool readOnly) int threads = config_.contains("threads") ? config_["threads"].as_int64() : std::thread::hardware_concurrency(); + int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0; rc = cass_cluster_set_num_threads_io(cluster, threads); if (rc != CASS_OK) @@ -1327,7 +1478,8 @@ CassandraBackend::open(bool readOnly) query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects" << " ( key blob, sequence bigint, object blob, PRIMARY " "KEY(key, " - "sequence)) WITH CLUSTERING ORDER BY (sequence DESC)"; + "sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND" + << " default_time_to_live = " << ttl; if (!executeSimpleStatement(query.str())) continue; @@ -1352,7 +1504,8 @@ CassandraBackend::open(bool readOnly) query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" << " ( hash blob PRIMARY KEY, ledger_sequence bigint, " "transaction " - "blob, metadata blob)"; + "blob, metadata blob)" + << " WITH default_time_to_live = " << ttl; if (!executeSimpleStatement(query.str())) continue; @@ -1407,7 +1560,9 @@ CassandraBackend::open(bool readOnly) " hash blob, " "PRIMARY KEY " "(account, seq_idx)) WITH " - "CLUSTERING ORDER BY (seq_idx desc)"; + "CLUSTERING ORDER BY (seq_idx desc)" + << " AND default_time_to_live = " << ttl; + if (!executeSimpleStatement(query.str())) continue; @@ -1419,7 +1574,8 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" - << " ( sequence bigint PRIMARY KEY, header blob )"; + << " ( sequence bigint PRIMARY KEY, header blob )" + << " WITH default_time_to_live = " << ttl; if (!executeSimpleStatement(query.str())) continue; @@ -1431,7 +1587,8 @@ CassandraBackend::open(bool readOnly) query.str(""); query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" - << " (hash blob PRIMARY KEY, sequence bigint)"; + << " (hash blob PRIMARY KEY, sequence bigint)" + << " WITH default_time_to_live = " << ttl; if (!executeSimpleStatement(query.str())) continue; @@ -1605,6 +1762,11 @@ CassandraBackend::open(bool readOnly) "(?,null)"; if (!updateLedgerRange_.prepareStatement(query, session_.get())) continue; + query = {}; + query << " update " << tablePrefix << "ledger_range" + << " set sequence = ? where is_latest = false"; + if (!deleteLedgerRange_.prepareStatement(query, session_.get())) + continue; query.str(""); query << " select header from " << tablePrefix diff --git a/reporting/CassandraBackend.h b/reporting/CassandraBackend.h index 925e1570..f9d66acd 100644 --- a/reporting/CassandraBackend.h +++ b/reporting/CassandraBackend.h @@ -166,6 +166,11 @@ public: bindBytes(data.data(), data.size()); } void + bindBytes(std::vector const& data) + { + bindBytes(data.data(), data.size()); + } + void bindBytes(ripple::AccountID const& data) { bindBytes(data.data(), data.size()); @@ -649,6 +654,7 @@ private: CassandraPreparedStatement insertLedgerHeader_; CassandraPreparedStatement insertLedgerHash_; CassandraPreparedStatement updateLedgerRange_; + CassandraPreparedStatement deleteLedgerRange_; CassandraPreparedStatement updateLedgerHeader_; CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLatestLedger_; @@ -735,6 +741,11 @@ public: { return insertBook2_; } + CassandraPreparedStatement const& + getInsertObjectPreparedStatement() const + { + return insertObject_; + } CassandraPreparedStatement const& getSelectLedgerDiffPreparedStatement() const @@ -1353,7 +1364,7 @@ public: syncCv_.wait(lck, [this]() { return finishedAllRequests(); }); } bool - doOnlineDelete(uint32_t minLedgerToKeep) const override; + doOnlineDelete(uint32_t numLedgersToKeep) const override; boost::asio::io_context& getIOContext() const diff --git a/reporting/PostgresBackend.cpp b/reporting/PostgresBackend.cpp index f20cc8d6..9d9c003c 100644 --- a/reporting/PostgresBackend.cpp +++ b/reporting/PostgresBackend.cpp @@ -874,14 +874,20 @@ PostgresBackend::writeBooks( return true; } bool -PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const +PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const { + auto rng = fetchLedgerRangeNoThrow(); + if (!rng) + return false; + uint32_t minLedger = rng->maxSequence - numLedgersToKeep; + if (minLedger <= rng->minSequence) + return false; uint32_t limit = 2048; PgQuery pgQuery(pgPool_); { std::stringstream sql; sql << "DELETE FROM ledgers WHERE ledger_seq < " - << std::to_string(minLedgerToKeep); + << std::to_string(minLedger); auto res = pgQuery(sql.str().data()); if (res.msg() != "ok") throw std::runtime_error("Error deleting from ledgers table"); @@ -892,7 +898,7 @@ PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const { std::stringstream sql; sql << "SELECT DISTINCT ON (key) key,ledger_seq,object FROM objects" - << " WHERE ledger_seq <= " << std::to_string(minLedgerToKeep); + << " WHERE ledger_seq <= " << std::to_string(minLedger); if (cursor.size()) sql << " AND key < \'\\x" << cursor << "\'"; sql << " ORDER BY key DESC, ledger_seq DESC" diff --git a/reporting/PostgresBackend.h b/reporting/PostgresBackend.h index 27dbcaa9..806a95da 100644 --- a/reporting/PostgresBackend.h +++ b/reporting/PostgresBackend.h @@ -113,7 +113,7 @@ public: doFinishWrites() const override; bool - doOnlineDelete(uint32_t minLedgerToKeep) const override; + doOnlineDelete(uint32_t numLedgersToKeep) const override; bool writeKeys( std::unordered_set const& keys, diff --git a/reporting/ReportingETL.cpp b/reporting/ReportingETL.cpp index 44dd1448..d238a369 100644 --- a/reporting/ReportingETL.cpp +++ b/reporting/ReportingETL.cpp @@ -507,8 +507,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors) deleting_ = true; ioContext_.post([this, &range]() { BOOST_LOG_TRIVIAL(info) << "Running online delete"; - flatMapBackend_->doOnlineDelete( - range->maxSequence - *onlineDeleteInterval_); + flatMapBackend_->doOnlineDelete(*onlineDeleteInterval_); BOOST_LOG_TRIVIAL(info) << "Finished online delete"; deleting_ = false; });