online delete for cassandra. doesn't crash, but not sure it works

This commit is contained in:
CJ Cobb
2021-05-11 21:14:47 +00:00
parent ca886fe2c8
commit 24816c021d
7 changed files with 197 additions and 16 deletions

View File

@@ -8,12 +8,15 @@ namespace Backend {
std::unique_ptr<BackendInterface> std::unique_ptr<BackendInterface>
makeBackend(boost::json::object const& config) makeBackend(boost::json::object const& config)
{ {
boost::json::object const& dbConfig = config.at("database").as_object(); boost::json::object dbConfig = config.at("database").as_object();
auto type = dbConfig.at("type").as_string(); auto type = dbConfig.at("type").as_string();
if (boost::iequals(type, "cassandra")) if (boost::iequals(type, "cassandra"))
{ {
if (config.contains("online_delete"))
dbConfig.at(type).as_object()["ttl"] =
config.at("online_delete").as_int64() * 4;
auto backend = auto backend =
std::make_unique<CassandraBackend>(dbConfig.at(type).as_object()); std::make_unique<CassandraBackend>(dbConfig.at(type).as_object());
return std::move(backend); return std::move(backend);

View File

@@ -399,7 +399,7 @@ public:
doFinishWrites() const = 0; doFinishWrites() const = 0;
virtual bool virtual bool
doOnlineDelete(uint32_t minLedgerToKeep) const = 0; doOnlineDelete(uint32_t numLedgersToKeep) const = 0;
virtual bool virtual bool
writeKeys( writeKeys(
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,

View File

@@ -726,6 +726,87 @@ struct WriteKeyCallbackData
{ {
} }
}; };
struct OnlineDeleteCallbackData
{
CassandraBackend const& backend;
ripple::uint256 key;
uint32_t ledgerSequence;
std::vector<unsigned char> object;
std::condition_variable& cv;
std::atomic_uint32_t& numOutstanding;
std::mutex& mtx;
uint32_t currentRetries = 0;
OnlineDeleteCallbackData(
CassandraBackend const& backend,
ripple::uint256&& key,
uint32_t ledgerSequence,
std::vector<unsigned char>&& object,
std::condition_variable& cv,
std::mutex& mtx,
std::atomic_uint32_t& numOutstanding)
: backend(backend)
, key(std::move(key))
, ledgerSequence(ledgerSequence)
, object(std::move(object))
, cv(cv)
, mtx(mtx)
, numOutstanding(numOutstanding)
{
}
};
void
onlineDeleteCallback(CassFuture* fut, void* cbData);
void
onlineDelete(OnlineDeleteCallbackData& cb)
{
{
CassandraStatement statement{
cb.backend.getInsertObjectPreparedStatement()};
statement.bindBytes(cb.key);
statement.bindInt(cb.ledgerSequence);
statement.bindBytes(cb.object);
cb.backend.executeAsyncWrite(statement, onlineDeleteCallback, cb, true);
}
}
void
onlineDeleteCallback(CassFuture* fut, void* cbData)
{
OnlineDeleteCallbackData& requestParams =
*static_cast<OnlineDeleteCallbackData*>(cbData);
CassandraBackend const& backend = requestParams.backend;
auto rc = cass_future_error_code(fut);
if (rc != CASS_OK)
{
// exponential backoff with a max wait of 2^10 ms (about 1 second)
auto wait = std::chrono::milliseconds(
lround(std::pow(2, std::min(10u, requestParams.currentRetries))));
BOOST_LOG_TRIVIAL(error)
<< "ERROR!!! Cassandra insert book error: " << rc << ", "
<< cass_error_desc(rc) << ", retrying in " << wait.count()
<< " milliseconds";
++requestParams.currentRetries;
std::shared_ptr<boost::asio::steady_timer> timer =
std::make_shared<boost::asio::steady_timer>(
backend.getIOContext(),
std::chrono::steady_clock::now() + wait);
timer->async_wait(
[timer, &requestParams](const boost::system::error_code& error) {
onlineDelete(requestParams);
});
}
else
{
BOOST_LOG_TRIVIAL(trace) << __func__ << " Successfully inserted a book";
{
std::lock_guard lck(requestParams.mtx);
--requestParams.numOutstanding;
requestParams.cv.notify_one();
}
}
}
void void
writeKeyCallback(CassFuture* fut, void* cbData); writeKeyCallback(CassFuture* fut, void* cbData);
void void
@@ -1105,8 +1186,77 @@ CassandraBackend::runIndexer(uint32_t ledgerSequence) const
bool bool
CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const CassandraBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{ {
throw std::runtime_error("doOnlineDelete : unimplemented"); // calculate TTL
return false; // ledgers close roughly every 4 seconds. We double the TTL so that way
// there is a window of time to update the database, to prevent unchanging
// records from being deleted.
auto rng = fetchLedgerRangeNoThrow();
if (!rng)
return false;
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
if (minLedger <= rng->minSequence)
return false;
std::condition_variable cv;
std::mutex mtx;
std::vector<std::shared_ptr<OnlineDeleteCallbackData>> cbs;
uint32_t concurrentLimit = 10;
std::atomic_uint32_t numOutstanding = 0;
// iterate through latest ledger, updating TTL
std::optional<ripple::uint256> cursor;
while (true)
{
try
{
auto [objects, curCursor, warning] =
fetchLedgerPage(cursor, minLedger, 256);
if (warning)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__
<< " online delete running but flag ledger is not complete";
std::this_thread::sleep_for(std::chrono::seconds(10));
continue;
}
for (auto& obj : objects)
{
++numOutstanding;
cbs.push_back(std::make_shared<OnlineDeleteCallbackData>(
*this,
std::move(obj.key),
minLedger,
std::move(obj.blob),
cv,
mtx,
numOutstanding));
onlineDelete(*cbs.back());
std::unique_lock<std::mutex> lck(mtx);
BOOST_LOG_TRIVIAL(trace) << __func__ << "Got the mutex";
cv.wait(lck, [&numOutstanding, concurrentLimit]() {
return numOutstanding < concurrentLimit;
});
}
BOOST_LOG_TRIVIAL(debug) << __func__ << " fetched a page";
cursor = curCursor;
if (!cursor)
break;
}
catch (DatabaseTimeout const& e)
{
BOOST_LOG_TRIVIAL(warning)
<< __func__ << " Database timeout fetching keys";
std::this_thread::sleep_for(std::chrono::seconds(2));
}
}
std::unique_lock<std::mutex> lck(mtx);
cv.wait(lck, [&numOutstanding]() { return numOutstanding == 0; });
CassandraStatement statement{deleteLedgerRange_};
statement.bindInt(minLedger);
executeSyncWrite(statement);
// update ledger_range
return true;
} }
void void
@@ -1208,6 +1358,7 @@ CassandraBackend::open(bool readOnly)
int threads = config_.contains("threads") int threads = config_.contains("threads")
? config_["threads"].as_int64() ? config_["threads"].as_int64()
: std::thread::hardware_concurrency(); : std::thread::hardware_concurrency();
int ttl = config_.contains("ttl") ? config_["ttl"].as_int64() * 2 : 0;
rc = cass_cluster_set_num_threads_io(cluster, threads); rc = cass_cluster_set_num_threads_io(cluster, threads);
if (rc != CASS_OK) if (rc != CASS_OK)
@@ -1327,7 +1478,8 @@ CassandraBackend::open(bool readOnly)
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "objects"
<< " ( key blob, sequence bigint, object blob, PRIMARY " << " ( key blob, sequence bigint, object blob, PRIMARY "
"KEY(key, " "KEY(key, "
"sequence)) WITH CLUSTERING ORDER BY (sequence DESC)"; "sequence)) WITH CLUSTERING ORDER BY (sequence DESC) AND"
<< " default_time_to_live = " << ttl;
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1352,7 +1504,8 @@ CassandraBackend::open(bool readOnly)
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "transactions"
<< " ( hash blob PRIMARY KEY, ledger_sequence bigint, " << " ( hash blob PRIMARY KEY, ledger_sequence bigint, "
"transaction " "transaction "
"blob, metadata blob)"; "blob, metadata blob)"
<< " WITH default_time_to_live = " << ttl;
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1407,7 +1560,9 @@ CassandraBackend::open(bool readOnly)
" hash blob, " " hash blob, "
"PRIMARY KEY " "PRIMARY KEY "
"(account, seq_idx)) WITH " "(account, seq_idx)) WITH "
"CLUSTERING ORDER BY (seq_idx desc)"; "CLUSTERING ORDER BY (seq_idx desc)"
<< " AND default_time_to_live = " << ttl;
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1419,7 +1574,8 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledgers"
<< " ( sequence bigint PRIMARY KEY, header blob )"; << " ( sequence bigint PRIMARY KEY, header blob )"
<< " WITH default_time_to_live = " << ttl;
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1431,7 +1587,8 @@ CassandraBackend::open(bool readOnly)
query.str(""); query.str("");
query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes" query << "CREATE TABLE IF NOT EXISTS " << tablePrefix << "ledger_hashes"
<< " (hash blob PRIMARY KEY, sequence bigint)"; << " (hash blob PRIMARY KEY, sequence bigint)"
<< " WITH default_time_to_live = " << ttl;
if (!executeSimpleStatement(query.str())) if (!executeSimpleStatement(query.str()))
continue; continue;
@@ -1605,6 +1762,11 @@ CassandraBackend::open(bool readOnly)
"(?,null)"; "(?,null)";
if (!updateLedgerRange_.prepareStatement(query, session_.get())) if (!updateLedgerRange_.prepareStatement(query, session_.get()))
continue; continue;
query = {};
query << " update " << tablePrefix << "ledger_range"
<< " set sequence = ? where is_latest = false";
if (!deleteLedgerRange_.prepareStatement(query, session_.get()))
continue;
query.str(""); query.str("");
query << " select header from " << tablePrefix query << " select header from " << tablePrefix

View File

@@ -166,6 +166,11 @@ public:
bindBytes(data.data(), data.size()); bindBytes(data.data(), data.size());
} }
void void
bindBytes(std::vector<unsigned char> const& data)
{
bindBytes(data.data(), data.size());
}
void
bindBytes(ripple::AccountID const& data) bindBytes(ripple::AccountID const& data)
{ {
bindBytes(data.data(), data.size()); bindBytes(data.data(), data.size());
@@ -649,6 +654,7 @@ private:
CassandraPreparedStatement insertLedgerHeader_; CassandraPreparedStatement insertLedgerHeader_;
CassandraPreparedStatement insertLedgerHash_; CassandraPreparedStatement insertLedgerHash_;
CassandraPreparedStatement updateLedgerRange_; CassandraPreparedStatement updateLedgerRange_;
CassandraPreparedStatement deleteLedgerRange_;
CassandraPreparedStatement updateLedgerHeader_; CassandraPreparedStatement updateLedgerHeader_;
CassandraPreparedStatement selectLedgerBySeq_; CassandraPreparedStatement selectLedgerBySeq_;
CassandraPreparedStatement selectLatestLedger_; CassandraPreparedStatement selectLatestLedger_;
@@ -735,6 +741,11 @@ public:
{ {
return insertBook2_; return insertBook2_;
} }
CassandraPreparedStatement const&
getInsertObjectPreparedStatement() const
{
return insertObject_;
}
CassandraPreparedStatement const& CassandraPreparedStatement const&
getSelectLedgerDiffPreparedStatement() const getSelectLedgerDiffPreparedStatement() const
@@ -1353,7 +1364,7 @@ public:
syncCv_.wait(lck, [this]() { return finishedAllRequests(); }); syncCv_.wait(lck, [this]() { return finishedAllRequests(); });
} }
bool bool
doOnlineDelete(uint32_t minLedgerToKeep) const override; doOnlineDelete(uint32_t numLedgersToKeep) const override;
boost::asio::io_context& boost::asio::io_context&
getIOContext() const getIOContext() const

View File

@@ -874,14 +874,20 @@ PostgresBackend::writeBooks(
return true; return true;
} }
bool bool
PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const PostgresBackend::doOnlineDelete(uint32_t numLedgersToKeep) const
{ {
auto rng = fetchLedgerRangeNoThrow();
if (!rng)
return false;
uint32_t minLedger = rng->maxSequence - numLedgersToKeep;
if (minLedger <= rng->minSequence)
return false;
uint32_t limit = 2048; uint32_t limit = 2048;
PgQuery pgQuery(pgPool_); PgQuery pgQuery(pgPool_);
{ {
std::stringstream sql; std::stringstream sql;
sql << "DELETE FROM ledgers WHERE ledger_seq < " sql << "DELETE FROM ledgers WHERE ledger_seq < "
<< std::to_string(minLedgerToKeep); << std::to_string(minLedger);
auto res = pgQuery(sql.str().data()); auto res = pgQuery(sql.str().data());
if (res.msg() != "ok") if (res.msg() != "ok")
throw std::runtime_error("Error deleting from ledgers table"); throw std::runtime_error("Error deleting from ledgers table");
@@ -892,7 +898,7 @@ PostgresBackend::doOnlineDelete(uint32_t minLedgerToKeep) const
{ {
std::stringstream sql; std::stringstream sql;
sql << "SELECT DISTINCT ON (key) key,ledger_seq,object FROM objects" sql << "SELECT DISTINCT ON (key) key,ledger_seq,object FROM objects"
<< " WHERE ledger_seq <= " << std::to_string(minLedgerToKeep); << " WHERE ledger_seq <= " << std::to_string(minLedger);
if (cursor.size()) if (cursor.size())
sql << " AND key < \'\\x" << cursor << "\'"; sql << " AND key < \'\\x" << cursor << "\'";
sql << " ORDER BY key DESC, ledger_seq DESC" sql << " ORDER BY key DESC, ledger_seq DESC"

View File

@@ -113,7 +113,7 @@ public:
doFinishWrites() const override; doFinishWrites() const override;
bool bool
doOnlineDelete(uint32_t minLedgerToKeep) const override; doOnlineDelete(uint32_t numLedgersToKeep) const override;
bool bool
writeKeys( writeKeys(
std::unordered_set<ripple::uint256> const& keys, std::unordered_set<ripple::uint256> const& keys,

View File

@@ -507,8 +507,7 @@ ReportingETL::runETLPipeline(uint32_t startSequence, int numExtractors)
deleting_ = true; deleting_ = true;
ioContext_.post([this, &range]() { ioContext_.post([this, &range]() {
BOOST_LOG_TRIVIAL(info) << "Running online delete"; BOOST_LOG_TRIVIAL(info) << "Running online delete";
flatMapBackend_->doOnlineDelete( flatMapBackend_->doOnlineDelete(*onlineDeleteInterval_);
range->maxSequence - *onlineDeleteInterval_);
BOOST_LOG_TRIVIAL(info) << "Finished online delete"; BOOST_LOG_TRIVIAL(info) << "Finished online delete";
deleting_ = false; deleting_ = false;
}); });