Files
clio/tools/cassandra_delete_range/cassandra_delete_range.go
Ayaz Salikhov 9c92a2b51b style: Use pre-commit tool and add simple config (#2029)
I started with really simple pre-commit hooks and will add more on top.

Important files:
- `.pre-commit-config.yaml` - the config for pre-commit
- `.github/workflows/pre-commit.yml` - runs pre-commit hooks in branches
and `develop`
- `.github/workflows/pre-commit-autoupdate.yml` - autoupdates pre-commit
hooks once in a month
2025-04-24 17:59:43 +01:00

347 lines
13 KiB
Go

//
// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go
//
package main
import (
"bufio"
"fmt"
"log"
"os"
"strconv"
"strings"
"time"
"xrplf/clio/cassandra_delete_range/internal/cass"
"xrplf/clio/cassandra_delete_range/internal/util"
"github.com/alecthomas/kingpin/v2"
"github.com/gocql/gocql"
)
const (
defaultNumberOfNodesInCluster = 3
defaultNumberOfCoresInNode = 8
defaultSmudgeFactor = 3
)
var (
app = kingpin.New("cassandra_delete_range", "A tool that prunes data from the Clio DB.")
hosts = app.Flag("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String()
deleteAfter = app.Command("delete-after", "Prunes from the given ledger index until the end")
deleteAfterLedgerIdx = deleteAfter.Arg("idx", "Sets the earliest ledger_index to keep untouched (delete everything after this ledger index)").Required().Uint64()
deleteBefore = app.Command("delete-before", "Prunes everything before the given ledger index")
deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64()
getLedgerRange = app.Command("get-ledger-range", "Fetch the current ledger_range table values")
nodesInCluster = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
coresInNode = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
smudgeFactor = app.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int()
clusterConsistency = app.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String()
clusterTimeout = app.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int()
clusterNumConnections = app.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int()
clusterCQLVersion = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
clusterPageSize = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
keyspace = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
resume = app.Flag("resume", "Whether to resume deletion from the previous command due to something crashing").Default("false").Bool()
userName = app.Flag("username", "Username to use when connecting to the cluster").String()
password = app.Flag("password", "Password to use when connecting to the cluster").String()
skipSuccessorTable = app.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool()
skipObjectsTable = app.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool()
skipLedgerHashesTable = app.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool()
skipTransactionsTable = app.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool()
skipDiffTable = app.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool()
skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
skipAccTransactionsTable = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool()
skipNFTokenTable = app.Flag("skip-nf-tokens", "Whether to skip deletion from nf_tokens table").Default("false").Bool()
skipIssuerNFTokenTable = app.Flag("skip-issuer-nf-tokens-v2", "Whether to skip deletion from issuer_nf_tokens_v2 table").Default("false").Bool()
skipNFTokenURITable = app.Flag("skip-nf-tokens-uri", "Whether to skip deletion from nf_token_uris table").Default("false").Bool()
skipNFTokenTransactionsTable = app.Flag("skip-nf-token-transactions", "Whether to skip deletion from nf_token_transactions table").Default("false").Bool()
workerCount = 1 // the calculated number of parallel goroutines the client should run
ranges []*util.TokenRange // the calculated ranges to be executed in parallel
ledgerOrTokenRange *util.StoredRange // mapping of startRange -> endRange. Used for resume deletion
)
func main() {
log.SetOutput(os.Stdout)
command := kingpin.MustParse(app.Parse(os.Args[1:]))
cluster, err := prepareDb(hosts)
if err != nil {
log.Fatal(err)
}
cmd := strings.Join(os.Args[1:], " ")
if *resume {
prepareResume(&cmd)
}
clioCass := cass.NewClioCass(&cass.Settings{
SkipSuccessorTable: *skipSuccessorTable,
SkipObjectsTable: *skipObjectsTable,
SkipLedgerHashesTable: *skipLedgerHashesTable,
SkipTransactionsTable: *skipTransactionsTable,
SkipDiffTable: *skipDiffTable,
SkipLedgerTransactionsTable: *skipLedgerHashesTable,
SkipLedgersTable: *skipLedgersTable,
SkipWriteLatestLedger: *skipWriteLatestLedger,
SkipAccTransactionsTable: *skipAccTransactionsTable,
SkipNFTokenTable: *skipNFTokenTable,
SkipIssuerNFTokenTable: *skipIssuerNFTokenTable,
SkipNFTokenURITable: *skipNFTokenURITable,
SkipNFTokenTransactionsTable: *skipNFTokenTransactionsTable,
WorkerCount: workerCount,
Ranges: ranges,
RangesRead: ledgerOrTokenRange,
Command: cmd}, cluster)
switch command {
case deleteAfter.FullCommand():
if *deleteAfterLedgerIdx == 0 {
log.Println("Please specify ledger index to delete from")
return
}
displayParams("delete-after", hosts, cluster.Timeout/1000/1000, *deleteAfterLedgerIdx)
log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *deleteAfterLedgerIdx)
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
if !util.PromptContinue() {
log.Fatal("Aborted")
}
startTime := time.Now().UTC()
clioCass.DeleteAfter(*deleteAfterLedgerIdx)
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
case deleteBefore.FullCommand():
if *deleteBeforeLedgerIdx == 0 {
log.Println("Please specify ledger index to delete until")
return
}
displayParams("delete-before", hosts, cluster.Timeout/1000/1000, *deleteBeforeLedgerIdx)
log.Printf("Will delete everything before ledger index %d (exclusive)\n", *deleteBeforeLedgerIdx)
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
if !util.PromptContinue() {
log.Fatal("Aborted")
}
startTime := time.Now().UTC()
clioCass.DeleteBefore(*deleteBeforeLedgerIdx)
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
case getLedgerRange.FullCommand():
from, to, err := clioCass.GetLedgerRange()
if err != nil {
log.Fatal(err)
}
fmt.Printf("Range: %d -> %d\n", from, to)
}
}
func displayParams(command string, hosts *string, timeout time.Duration, ledgerIdx uint64) {
runParameters := fmt.Sprintf(`
Execution Parameters:
=====================
Command : %s
Ledger index : %d
Scylla cluster nodes : %s
Keyspace : %s
Consistency : %s
Timeout (ms) : %d
Connections per host : %d
CQL Version : %s
Page size : %d
# of parallel threads : %d
# of ranges to be executed : %d
Skip deletion of:
- successor table : %t
- objects table : %t
- ledger_hashes table : %t
- transactions table : %t
- diff table : %t
- ledger_transactions table : %t
- ledgers table : %t
- account_tx table : %t
- nf_tokens table : %t
- issuer_nf_tokens_v2 table : %t
- nf_token_uris table : %t
- nf_token_transactions table : %t
Will update ledger_range : %t
`,
command,
ledgerIdx,
*hosts,
*keyspace,
*clusterConsistency,
timeout,
*clusterNumConnections,
*clusterCQLVersion,
*clusterPageSize,
workerCount,
len(ranges),
*skipSuccessorTable || command == "delete-before",
*skipObjectsTable,
*skipLedgerHashesTable,
*skipTransactionsTable,
*skipDiffTable,
*skipLedgerTransactionsTable,
*skipLedgersTable,
*skipAccTransactionsTable,
*skipNFTokenTable,
*skipIssuerNFTokenTable,
*skipNFTokenURITable,
*skipNFTokenTransactionsTable,
!*skipWriteLatestLedger,
)
fmt.Println(runParameters)
}
func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) {
workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
ranges = util.GetTokenRanges(workerCount)
util.Shuffle(ranges)
hosts := strings.Split(*dbHosts, ",")
cluster := gocql.NewCluster(hosts...)
cluster.Consistency = util.GetConsistencyLevel(*clusterConsistency)
cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000)
cluster.NumConns = *clusterNumConnections
cluster.CQLVersion = *clusterCQLVersion
cluster.PageSize = *clusterPageSize
cluster.Keyspace = *keyspace
if *userName != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: *userName,
Password: *password,
}
}
return cluster, nil
}
func prepareResume(cmd *string) {
// format of file continue.txt is
/*
Previous user command (must match the same command to resume deletion)
Table name (ie. objects, ledger_hashes etc)
Values of token_ranges (each pair of values seperated line by line)
*/
file, err := os.Open("continue.txt")
if err != nil {
log.Fatal("continue.txt does not exist. Aborted")
}
defer file.Close()
if err != nil {
log.Fatalf("Failed to open file: %v", err)
}
scanner := bufio.NewScanner(file)
scanner.Scan()
// --resume must be last flag passed; so can check command matches
if os.Args[len(os.Args)-1] != "--resume" {
log.Fatal("--resume must be the last flag passed")
}
// get rid of --resume at the end
*cmd = strings.Join(os.Args[1:len(os.Args)-1], " ")
// makes sure command that got aborted matches the user command they enter
if scanner.Text() != *cmd {
log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), *cmd)
}
scanner.Scan()
// skip the neccessary tables based on where the program aborted
// for example if account_tx, all tables before account_tx
// should be already deleted so we skip for deletion
tableFound := false
switch scanner.Text() {
case "nf_token_transactions":
*skipNFTokenURITable = true
fallthrough
case "nf_token_uris":
*skipNFTokenTable = true
fallthrough
case "nf_tokens":
*skipAccTransactionsTable = true
fallthrough
case "account_tx":
*skipLedgersTable = true
fallthrough
case "ledgers":
*skipLedgerTransactionsTable = true
fallthrough
case "ledger_transactions":
*skipDiffTable = true
fallthrough
case "diff":
*skipTransactionsTable = true
fallthrough
case "transactions":
*skipLedgerHashesTable = true
fallthrough
case "ledger_hashes":
*skipObjectsTable = true
fallthrough
case "objects":
*skipSuccessorTable = true
fallthrough
case "successor":
tableFound = true
}
if !tableFound {
log.Fatalf("Invalid table: %s", scanner.Text())
}
scanner.Scan()
rangeRead := make(map[int64]int64)
// now go through all the ledger range and load it to a set
for scanner.Scan() {
line := scanner.Text()
tokenRange := strings.Split(line, ",")
if len(tokenRange) != 2 {
log.Fatalf("Range is not two integers. %s . Aborting...", tokenRange)
}
startStr := strings.TrimSpace(tokenRange[0])
endStr := strings.TrimSpace(tokenRange[1])
// convert string to int64
start, err1 := strconv.ParseInt(startStr, 10, 64)
end, err2 := strconv.ParseInt(endStr, 10, 64)
if err1 != nil || err2 != nil {
log.Fatalf("Error converting integer: %s, %s", err1, err2)
}
rangeRead[start] = end
}
ledgerOrTokenRange = &util.StoredRange{}
ledgerOrTokenRange.TokenRange = rangeRead
}