diff --git a/tools/cassandra_delete_range/cassandra_delete_range.go b/tools/cassandra_delete_range/cassandra_delete_range.go new file mode 100644 index 00000000..c5941b9a --- /dev/null +++ b/tools/cassandra_delete_range/cassandra_delete_range.go @@ -0,0 +1,210 @@ +// +// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go +// + +package main + +import ( + "fmt" + "log" + "os" + "strings" + "time" + "xrplf/clio/cassandra_delete_range/internal/cass" + "xrplf/clio/cassandra_delete_range/internal/util" + + "github.com/alecthomas/kingpin/v2" + "github.com/gocql/gocql" +) + +const ( + defaultNumberOfNodesInCluster = 3 + defaultNumberOfCoresInNode = 8 + defaultSmudgeFactor = 3 +) + +var ( + app = kingpin.New("cassandra_delete_range", "A tool that prunes data from the Clio DB.") + hosts = app.Flag("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String() + + deleteAfter = app.Command("delete-after", "Prunes from the given ledger index until the end") + deleteAfterLedgerIdx = deleteAfter.Arg("idx", "Sets the earliest ledger_index to keep untouched (delete everything after this ledger index)").Required().Uint64() + + deleteBefore = app.Command("delete-before", "Prunes everything before the given ledger index") + deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64() + + getLedgerRange = app.Command("get-ledger-range", "Fetch the current lender_range table values") + + nodesInCluster = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int() + coresInNode = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int() + smudgeFactor = app.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int() + clusterConsistency = app.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String() + clusterTimeout = app.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int() + clusterNumConnections = app.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int() + clusterCQLVersion = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String() + clusterPageSize = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int() + keyspace = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String() + + userName = app.Flag("username", "Username to use when connecting to the cluster").String() + password = app.Flag("password", "Password to use when connecting to the cluster").String() + + skipSuccessorTable = app.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool() + skipObjectsTable = app.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool() + skipLedgerHashesTable = app.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool() + skipTransactionsTable = app.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool() + skipDiffTable = app.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool() + skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool() + skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool() + skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool() + + workerCount = 1 // the calculated number of parallel goroutines the client should run + ranges []*util.TokenRange // the calculated ranges to be executed in parallel +) + +func main() { + log.SetOutput(os.Stdout) + + command := kingpin.MustParse(app.Parse(os.Args[1:])) + cluster, err := prepareDb(hosts) + if err != nil { + log.Fatal(err) + } + + clioCass := cass.NewClioCass(&cass.Settings{ + SkipSuccessorTable: *skipSuccessorTable, + SkipObjectsTable: *skipObjectsTable, + SkipLedgerHashesTable: *skipLedgerHashesTable, + SkipTransactionsTable: *skipTransactionsTable, + SkipDiffTable: *skipDiffTable, + SkipLedgerTransactionsTable: *skipLedgerHashesTable, + SkipLedgersTable: *skipLedgersTable, + SkipWriteLatestLedger: *skipWriteLatestLedger, + WorkerCount: workerCount, + Ranges: ranges}, cluster) + + switch command { + case deleteAfter.FullCommand(): + if *deleteAfterLedgerIdx == 0 { + log.Println("Please specify ledger index to delete from") + return + } + + displayParams("delete-after", hosts, cluster.Timeout/1000/1000, *deleteAfterLedgerIdx) + log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *deleteAfterLedgerIdx) + log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running") + + if !util.PromptContinue() { + log.Fatal("Aborted") + } + + startTime := time.Now().UTC() + clioCass.DeleteAfter(*deleteAfterLedgerIdx) + + fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime)) + fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.") + + case deleteBefore.FullCommand(): + if *deleteBeforeLedgerIdx == 0 { + log.Println("Please specify ledger index to delete until") + return + } + + displayParams("delete-before", hosts, cluster.Timeout/1000/1000, *deleteBeforeLedgerIdx) + log.Printf("Will delete everything before ledger index %d (exclusive)\n", *deleteBeforeLedgerIdx) + log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running") + + if !util.PromptContinue() { + log.Fatal("Aborted") + } + + startTime := time.Now().UTC() + clioCass.DeleteBefore(*deleteBeforeLedgerIdx) + + fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime)) + fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.") + case getLedgerRange.FullCommand(): + from, to, err := clioCass.GetLedgerRange() + if err != nil { + log.Fatal(err) + } + + fmt.Printf("Range: %d -> %d\n", from, to) + } +} + +func displayParams(command string, hosts *string, timeout time.Duration, ledgerIdx uint64) { + runParameters := fmt.Sprintf(` +Execution Parameters: +===================== + +Command : %s +Ledger index : %d +Scylla cluster nodes : %s +Keyspace : %s +Consistency : %s +Timeout (ms) : %d +Connections per host : %d +CQL Version : %s +Page size : %d +# of parallel threads : %d +# of ranges to be executed : %d + +Skip deletion of: +- successor table : %t +- objects table : %t +- ledger_hashes table : %t +- transactions table : %t +- diff table : %t +- ledger_transactions table : %t +- ledgers table : %t + +Will update ledger_range : %t + +`, + command, + ledgerIdx, + *hosts, + *keyspace, + *clusterConsistency, + timeout, + *clusterNumConnections, + *clusterCQLVersion, + *clusterPageSize, + workerCount, + len(ranges), + *skipSuccessorTable || command == "delete-before", + *skipObjectsTable, + *skipLedgerHashesTable, + *skipTransactionsTable, + *skipDiffTable, + *skipLedgerTransactionsTable, + *skipLedgersTable, + !*skipWriteLatestLedger) + + fmt.Println(runParameters) +} + +func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) { + workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor) + ranges = util.GetTokenRanges(workerCount) + util.Shuffle(ranges) + + hosts := strings.Split(*dbHosts, ",") + + cluster := gocql.NewCluster(hosts...) + cluster.Consistency = util.GetConsistencyLevel(*clusterConsistency) + cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000) + cluster.NumConns = *clusterNumConnections + cluster.CQLVersion = *clusterCQLVersion + cluster.PageSize = *clusterPageSize + cluster.Keyspace = *keyspace + + if *userName != "" { + cluster.Authenticator = gocql.PasswordAuthenticator{ + Username: *userName, + Password: *password, + } + } + + return cluster, nil +} diff --git a/tools/cassandra_delete_range/go.mod b/tools/cassandra_delete_range/go.mod index 6725dca0..5403680c 100644 --- a/tools/cassandra_delete_range/go.mod +++ b/tools/cassandra_delete_range/go.mod @@ -3,9 +3,13 @@ module xrplf/clio/cassandra_delete_range go 1.21.6 require ( - github.com/alecthomas/kingpin/v2 v2.4.0 // indirect + github.com/alecthomas/kingpin/v2 v2.4.0 + github.com/gocql/gocql v1.6.0 + github.com/pmorelli92/maybe v1.1.0 +) + +require ( github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect - github.com/gocql/gocql v1.6.0 // indirect github.com/golang/snappy v0.0.3 // indirect github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect github.com/xhit/go-str2duration/v2 v2.1.0 // indirect diff --git a/tools/cassandra_delete_range/go.sum b/tools/cassandra_delete_range/go.sum index 732b9e4e..d28f7104 100644 --- a/tools/cassandra_delete_range/go.sum +++ b/tools/cassandra_delete_range/go.sum @@ -2,25 +2,38 @@ github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjH github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc= github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE= +github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= +github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU= github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8= github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA= github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8= github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmorelli92/maybe v1.1.0 h1:uyV6NLF4453AQARZ6rKpJNzc9PBsQmpGDtUonhxInPU= +github.com/pmorelli92/maybe v1.1.0/go.mod h1:5PrW2+fo4/j/LMX6HT49Hb3/HOKv1tbodkzgy4lEopA= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8= +github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc= github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/cassandra_delete_range/internal/cass/cass.go b/tools/cassandra_delete_range/internal/cass/cass.go new file mode 100644 index 00000000..f60fe482 --- /dev/null +++ b/tools/cassandra_delete_range/internal/cass/cass.go @@ -0,0 +1,564 @@ +package cass + +import ( + "fmt" + "log" + "os" + "slices" + "strconv" + "strings" + "sync" + "sync/atomic" + "xrplf/clio/cassandra_delete_range/internal/util" + + "github.com/gocql/gocql" + "github.com/pmorelli92/maybe" +) + +type deleteInfo struct { + Query string + Data []deleteParams +} + +type deleteParams struct { + Seq uint64 + Blob []byte // hash, key, etc +} + +type columnSettings struct { + UseSeq bool + UseBlob bool +} + +type Settings struct { + SkipSuccessorTable bool + SkipObjectsTable bool + SkipLedgerHashesTable bool + SkipTransactionsTable bool + SkipDiffTable bool + SkipLedgerTransactionsTable bool + SkipLedgersTable bool + SkipWriteLatestLedger bool + + WorkerCount int + Ranges []*util.TokenRange +} + +type Cass interface { + GetLedgerRange() (uint64, uint64, error) + DeleteBefore(ledgerIdx uint64) + DeleteAfter(ledgerIdx uint64) +} + +type ClioCass struct { + settings *Settings + clusterConfig *gocql.ClusterConfig +} + +func NewClioCass(settings *Settings, cluster *gocql.ClusterConfig) *ClioCass { + return &ClioCass{settings, cluster} +} + +func (c *ClioCass) DeleteBefore(ledgerIdx uint64) { + firstLedgerIdxInDB, latestLedgerIdxInDB, err := c.GetLedgerRange() + if err != nil { + log.Fatal(err) + } + + log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB) + + if firstLedgerIdxInDB > ledgerIdx { + log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...") + } + + if latestLedgerIdxInDB < ledgerIdx { + log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...") + } + + var ( + from maybe.Maybe[uint64] // not used + to maybe.Maybe[uint64] = maybe.Set(ledgerIdx - 1) + ) + + c.settings.SkipSuccessorTable = true // skip successor update until we know how to do it + if err := c.pruneData(from, to, firstLedgerIdxInDB, latestLedgerIdxInDB); err != nil { + log.Fatal(err) + } +} + +func (c *ClioCass) DeleteAfter(ledgerIdx uint64) { + firstLedgerIdxInDB, latestLedgerIdxInDB, err := c.GetLedgerRange() + if err != nil { + log.Fatal(err) + } + + log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB) + + if firstLedgerIdxInDB > ledgerIdx { + log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...") + } + + if latestLedgerIdxInDB < ledgerIdx { + log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...") + } + + var ( + from maybe.Maybe[uint64] = maybe.Set(ledgerIdx + 1) + to maybe.Maybe[uint64] // not used + ) + + if err := c.pruneData(from, to, firstLedgerIdxInDB, latestLedgerIdxInDB); err != nil { + log.Fatal(err) + } +} + +func (c *ClioCass) GetLedgerRange() (uint64, uint64, error) { + var ( + firstLedgerIdx uint64 + latestLedgerIdx uint64 + ) + + session, err := c.clusterConfig.CreateSession() + if err != nil { + log.Fatal(err) + } + + defer session.Close() + + if err := session.Query("SELECT sequence FROM ledger_range WHERE is_latest = ?", false).Scan(&firstLedgerIdx); err != nil { + return 0, 0, err + } + + if err := session.Query("SELECT sequence FROM ledger_range WHERE is_latest = ?", true).Scan(&latestLedgerIdx); err != nil { + return 0, 0, err + } + + return firstLedgerIdx, latestLedgerIdx, nil +} + +func (c *ClioCass) pruneData( + fromLedgerIdx maybe.Maybe[uint64], + toLedgerIdx maybe.Maybe[uint64], + firstLedgerIdxInDB uint64, + latestLedgerIdxInDB uint64, +) error { + var totalErrors uint64 + var totalRows uint64 + var totalDeletes uint64 + + var info deleteInfo + var rowsCount uint64 + var deleteCount uint64 + var errCount uint64 + + // calculate range of simple delete queries + var ( + rangeFrom uint64 = firstLedgerIdxInDB + rangeTo uint64 = latestLedgerIdxInDB + ) + + if fromLedgerIdx.HasValue() { + rangeFrom = fromLedgerIdx.Value() + } + + if toLedgerIdx.HasValue() { + rangeTo = toLedgerIdx.Value() + } + + // calculate and print deletion plan + fromStr := "beginning" + if fromLedgerIdx.HasValue() { + fromStr = strconv.Itoa(int(fromLedgerIdx.Value())) + } + + toStr := "latest" + if toLedgerIdx.HasValue() { + toStr = strconv.Itoa(int(toLedgerIdx.Value())) + } + + log.Printf("Start scanning and removing data for %s -> %s\n\n", fromStr, toStr) + + // successor queries + if !c.settings.SkipSuccessorTable { + log.Println("Generating delete queries for successor table") + info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + "SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?", + "DELETE FROM successor WHERE key = ? AND seq = ?", false) + log.Printf("Total delete queries: %d\n", len(info.Data)) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalErrors += errCount + totalRows += rowsCount + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // objects queries + if !c.settings.SkipObjectsTable { + log.Println("Generating delete queries for objects table") + info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + "SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?", + "DELETE FROM objects WHERE key = ? AND sequence = ?", true) + log.Printf("Total delete queries: %d\n", len(info.Data)) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalErrors += errCount + totalRows += rowsCount + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // ledger_hashes queries + if !c.settings.SkipLedgerHashesTable { + log.Println("Generating delete queries for ledger_hashes table") + info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + "SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?", + "DELETE FROM ledger_hashes WHERE hash = ?", false) + log.Printf("Total delete queries: %d\n", len(info.Data)) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalErrors += errCount + totalRows += rowsCount + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // transactions queries + if !c.settings.SkipTransactionsTable { + log.Println("Generating delete queries for transactions table") + info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx, + "SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?", + "DELETE FROM transactions WHERE hash = ?", false) + log.Printf("Total delete queries: %d\n", len(info.Data)) + log.Printf("Total traversed rows: %d\n\n", rowsCount) + totalErrors += errCount + totalRows += rowsCount + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // diff queries + if !c.settings.SkipDiffTable { + log.Println("Generating delete queries for diff table") + info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM diff WHERE seq = ?") + log.Printf("Total delete queries: %d\n\n", len(info.Data)) + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // ledger_transactions queries + if !c.settings.SkipLedgerTransactionsTable { + log.Println("Generating delete queries for ledger_transactions table") + info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM ledger_transactions WHERE ledger_sequence = ?") + log.Printf("Total delete queries: %d\n\n", len(info.Data)) + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // ledgers queries + if !c.settings.SkipLedgersTable { + log.Println("Generating delete queries for ledgers table") + + info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo, + "DELETE FROM ledgers WHERE sequence = ?") + log.Printf("Total delete queries: %d\n\n", len(info.Data)) + deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true}) + totalErrors += errCount + totalDeletes += deleteCount + } + + // TODO: tbd what to do with account_tx as it got tuple for seq_idx + // TODO: also, whether we need to take care of nft tables and other stuff like that + + if !c.settings.SkipWriteLatestLedger { + var ( + first maybe.Maybe[uint64] + last maybe.Maybe[uint64] + ) + + if fromLedgerIdx.HasValue() { + last = maybe.Set(fromLedgerIdx.Value() - 1) + } + + if toLedgerIdx.HasValue() { + first = maybe.Set(toLedgerIdx.Value() + 1) + } + + if err := c.updateLedgerRange(first, last); err != nil { + log.Printf("ERROR failed updating ledger range: %s\n", err) + return err + } + } + + log.Printf("TOTAL ERRORS: %d\n", totalErrors) + log.Printf("TOTAL ROWS TRAVERSED: %d\n", totalRows) + log.Printf("TOTAL DELETES: %d\n\n", totalDeletes) + + log.Printf("Completed deletion for %s -> %s\n\n", fromStr, toStr) + + return nil +} + +func (c *ClioCass) prepareSimpleDeleteQueries( + fromLedgerIdx uint64, + toLedgerIdx uint64, + deleteQueryTemplate string, +) deleteInfo { + var info = deleteInfo{Query: deleteQueryTemplate} + + for i := fromLedgerIdx; i <= toLedgerIdx; i++ { + info.Data = append(info.Data, deleteParams{Seq: i}) + } + + return info +} + +func (c *ClioCass) prepareDeleteQueries( + fromLedgerIdx maybe.Maybe[uint64], + toLedgerIdx maybe.Maybe[uint64], + queryTemplate string, + deleteQueryTemplate string, + keepLastValid bool, +) (deleteInfo, uint64, uint64) { + rangesChannel := make(chan *util.TokenRange, len(c.settings.Ranges)) + for i := range c.settings.Ranges { + rangesChannel <- c.settings.Ranges[i] + } + + close(rangesChannel) + + outChannel := make(chan deleteParams) + var info = deleteInfo{Query: deleteQueryTemplate} + + go func() { + total := uint64(0) + for params := range outChannel { + total += 1 + if total%1000 == 0 { + log.Printf("... %d queries ...\n", total) + } + info.Data = append(info.Data, params) + } + }() + + var wg sync.WaitGroup + var sessionCreationWaitGroup sync.WaitGroup + var totalRows uint64 + var totalErrors uint64 + + wg.Add(c.settings.WorkerCount) + sessionCreationWaitGroup.Add(c.settings.WorkerCount) + + for i := 0; i < c.settings.WorkerCount; i++ { + go func(q string) { + defer wg.Done() + + var session *gocql.Session + var err error + if session, err = c.clusterConfig.CreateSession(); err == nil { + defer session.Close() + + sessionCreationWaitGroup.Done() + sessionCreationWaitGroup.Wait() + preparedQuery := session.Query(q) + + for r := range rangesChannel { + preparedQuery.Bind(r.StartRange, r.EndRange) + + var pageState []byte + var rowsRetrieved uint64 + + var previousKey []byte + var foundLastValid bool + + for { + iter := preparedQuery.PageSize(c.clusterConfig.PageSize).PageState(pageState).Iter() + nextPageState := iter.PageState() + scanner := iter.Scanner() + + for scanner.Next() { + var key []byte + var seq uint64 + + err = scanner.Scan(&key, &seq) + if err == nil { + rowsRetrieved++ + + if keepLastValid && !slices.Equal(previousKey, key) { + previousKey = key + foundLastValid = false + } + + // only grab the rows that are in the correct range of sequence numbers + if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq { + outChannel <- deleteParams{Seq: seq, Blob: key} + } else if toLedgerIdx.HasValue() { + if seq < toLedgerIdx.Value() && (!keepLastValid || foundLastValid) { + outChannel <- deleteParams{Seq: seq, Blob: key} + } else if seq <= toLedgerIdx.Value()+1 { + foundLastValid = true + } + } + } else { + log.Printf("ERROR: page iteration failed: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState)) + atomic.AddUint64(&totalErrors, 1) + } + } + + if len(nextPageState) == 0 { + break + } + + pageState = nextPageState + } + + atomic.AddUint64(&totalRows, rowsRetrieved) + } + } else { + log.Printf("ERROR: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) + atomic.AddUint64(&totalErrors, 1) + } + }(queryTemplate) + } + + wg.Wait() + close(outChannel) + + return info, totalRows, totalErrors +} + +func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams { + var n = c.settings.WorkerCount + var chunkSize = len(info.Data) / n + var chunks [][]deleteParams + + if len(info.Data) == 0 { + return chunks + } + + if chunkSize < 1 { + chunks = append(chunks, info.Data) + return chunks + } + + for i := 0; i < len(info.Data); i += chunkSize { + end := i + chunkSize + + if end > len(info.Data) { + end = len(info.Data) + } + + chunks = append(chunks, info.Data[i:end]) + } + + return chunks +} + +func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSettings) (uint64, uint64) { + var wg sync.WaitGroup + var sessionCreationWaitGroup sync.WaitGroup + var totalDeletes uint64 + var totalErrors uint64 + + chunks := c.splitDeleteWork(info) + chunksChannel := make(chan []deleteParams, len(chunks)) + for i := range chunks { + chunksChannel <- chunks[i] + } + + close(chunksChannel) + + wg.Add(c.settings.WorkerCount) + sessionCreationWaitGroup.Add(c.settings.WorkerCount) + + query := info.Query + bindCount := strings.Count(query, "?") + + for i := 0; i < c.settings.WorkerCount; i++ { + go func(number int, q string, bc int) { + defer wg.Done() + + var session *gocql.Session + var err error + if session, err = c.clusterConfig.CreateSession(); err == nil { + defer session.Close() + + sessionCreationWaitGroup.Done() + sessionCreationWaitGroup.Wait() + preparedQuery := session.Query(q) + + for chunk := range chunksChannel { + for _, r := range chunk { + if bc == 2 { + preparedQuery.Bind(r.Blob, r.Seq) + } else if bc == 1 { + if colSettings.UseSeq { + preparedQuery.Bind(r.Seq) + } else if colSettings.UseBlob { + preparedQuery.Bind(r.Blob) + } + } + + if err := preparedQuery.Exec(); err != nil { + log.Printf("DELETE ERROR: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq)) + atomic.AddUint64(&totalErrors, 1) + } else { + atomic.AddUint64(&totalDeletes, 1) + if atomic.LoadUint64(&totalDeletes)%10000 == 0 { + log.Printf("... %d deletes ...\n", totalDeletes) + } + } + } + } + } else { + log.Printf("ERROR: %s\n", err) + fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) + atomic.AddUint64(&totalErrors, 1) + } + }(i, query, bindCount) + } + + wg.Wait() + return totalDeletes, totalErrors +} + +func (c *ClioCass) updateLedgerRange(newStartLedger maybe.Maybe[uint64], newEndLedger maybe.Maybe[uint64]) error { + if session, err := c.clusterConfig.CreateSession(); err == nil { + defer session.Close() + + query := "UPDATE ledger_range SET sequence = ? WHERE is_latest = ?" + + if newEndLedger.HasValue() { + log.Printf("Updating ledger range end to %d\n", newEndLedger.Value()) + + preparedQuery := session.Query(query, newEndLedger.Value(), true) + if err := preparedQuery.Exec(); err != nil { + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][true]\n", query, newEndLedger.Value()) + return err + } + } + + if newStartLedger.HasValue() { + log.Printf("Updating ledger range start to %d\n", newStartLedger.Value()) + + preparedQuery := session.Query(query, newStartLedger.Value(), false) + if err := preparedQuery.Exec(); err != nil { + fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][false]\n", query, newStartLedger.Value()) + return err + } + } + } else { + fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) + return err + } + + return nil +} diff --git a/tools/cassandra_delete_range/internal/util/util.go b/tools/cassandra_delete_range/internal/util/util.go new file mode 100644 index 00000000..9a2aa1a4 --- /dev/null +++ b/tools/cassandra_delete_range/internal/util/util.go @@ -0,0 +1,91 @@ +package util + +import ( + "fmt" + "log" + "math" + "math/rand" + + "github.com/gocql/gocql" +) + +type TokenRange struct { + StartRange int64 + EndRange int64 +} + +func Shuffle(data []*TokenRange) { + for i := 1; i < len(data); i++ { + r := rand.Intn(i + 1) + if i != r { + data[r], data[i] = data[i], data[r] + } + } +} + +func PromptContinue() bool { + var continueFlag string + + log.Println("Are you sure you want to continue? (y/n)") + if fmt.Scanln(&continueFlag); continueFlag != "y" { + return false + } + + return true +} + +func GetTokenRanges(workerCount int) []*TokenRange { + var n = workerCount + var m = int64(n * 100) + var maxSize uint64 = math.MaxInt64 * 2 + var rangeSize = maxSize / uint64(m) + + var start int64 = math.MinInt64 + var end int64 + var shouldBreak = false + + var ranges = make([]*TokenRange, m) + + for i := int64(0); i < m; i++ { + end = start + int64(rangeSize) + if start > 0 && end < 0 { + end = math.MaxInt64 + shouldBreak = true + } + + ranges[i] = &TokenRange{StartRange: start, EndRange: end} + + if shouldBreak { + break + } + + start = end + 1 + } + + return ranges +} + +func GetConsistencyLevel(consistencyValue string) gocql.Consistency { + switch consistencyValue { + case "any": + return gocql.Any + case "one": + return gocql.One + case "two": + return gocql.Two + case "three": + return gocql.Three + case "quorum": + return gocql.Quorum + case "all": + return gocql.All + case "localquorum": + return gocql.LocalQuorum + case "eachquorum": + return gocql.EachQuorum + case "localone": + return gocql.LocalOne + default: + return gocql.One + } +} diff --git a/tools/cassandra_delete_range/main.go b/tools/cassandra_delete_range/main.go deleted file mode 100644 index ef4ebc10..00000000 --- a/tools/cassandra_delete_range/main.go +++ /dev/null @@ -1,619 +0,0 @@ -// -// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go -// - -package main - -import ( - "fmt" - "log" - "math" - "math/rand" - "os" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/alecthomas/kingpin/v2" - "github.com/gocql/gocql" -) - -const ( - defaultNumberOfNodesInCluster = 3 - defaultNumberOfCoresInNode = 8 - defaultSmudgeFactor = 3 -) - -var ( - clusterHosts = kingpin.Arg("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String() - earliestLedgerIdx = kingpin.Flag("ledgerIdx", "Sets the earliest ledger_index to keep untouched").Short('i').Required().Uint64() - - nodesInCluster = kingpin.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int() - coresInNode = kingpin.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int() - smudgeFactor = kingpin.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int() - clusterConsistency = kingpin.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String() - clusterTimeout = kingpin.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int() - clusterNumConnections = kingpin.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int() - clusterCQLVersion = kingpin.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String() - clusterPageSize = kingpin.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int() - keyspace = kingpin.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String() - - userName = kingpin.Flag("username", "Username to use when connecting to the cluster").String() - password = kingpin.Flag("password", "Password to use when connecting to the cluster").String() - - skipSuccessorTable = kingpin.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool() - skipObjectsTable = kingpin.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool() - skipLedgerHashesTable = kingpin.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool() - skipTransactionsTable = kingpin.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool() - skipDiffTable = kingpin.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool() - skipLedgerTransactionsTable = kingpin.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool() - skipLedgersTable = kingpin.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool() - skipWriteLatestLedger = kingpin.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool() - - workerCount = 1 // the calculated number of parallel goroutines the client should run - ranges []*tokenRange // the calculated ranges to be executed in parallel -) - -type tokenRange struct { - StartRange int64 - EndRange int64 -} - -type deleteParams struct { - Seq uint64 - Blob []byte // hash, key, etc -} - -type columnSettings struct { - UseSeq bool - UseBlob bool -} - -type deleteInfo struct { - Query string - Data []deleteParams -} - -func getTokenRanges() []*tokenRange { - var n = workerCount - var m = int64(n * 100) - var maxSize uint64 = math.MaxInt64 * 2 - var rangeSize = maxSize / uint64(m) - - var start int64 = math.MinInt64 - var end int64 - var shouldBreak = false - - var ranges = make([]*tokenRange, m) - - for i := int64(0); i < m; i++ { - end = start + int64(rangeSize) - if start > 0 && end < 0 { - end = math.MaxInt64 - shouldBreak = true - } - - ranges[i] = &tokenRange{StartRange: start, EndRange: end} - - if shouldBreak { - break - } - - start = end + 1 - } - - return ranges -} - -func splitDeleteWork(info *deleteInfo) [][]deleteParams { - var n = workerCount - var chunkSize = len(info.Data) / n - var chunks [][]deleteParams - - if len(info.Data) == 0 { - return chunks - } - - if chunkSize < 1 { - chunks = append(chunks, info.Data) - return chunks - } - - for i := 0; i < len(info.Data); i += chunkSize { - end := i + chunkSize - - if end > len(info.Data) { - end = len(info.Data) - } - - chunks = append(chunks, info.Data[i:end]) - } - - return chunks -} - -func shuffle(data []*tokenRange) { - for i := 1; i < len(data); i++ { - r := rand.Intn(i + 1) - if i != r { - data[r], data[i] = data[i], data[r] - } - } -} - -func getConsistencyLevel(consistencyValue string) gocql.Consistency { - switch consistencyValue { - case "any": - return gocql.Any - case "one": - return gocql.One - case "two": - return gocql.Two - case "three": - return gocql.Three - case "quorum": - return gocql.Quorum - case "all": - return gocql.All - case "localquorum": - return gocql.LocalQuorum - case "eachquorum": - return gocql.EachQuorum - case "localone": - return gocql.LocalOne - default: - return gocql.One - } -} - -func main() { - log.SetOutput(os.Stdout) - kingpin.Parse() - - workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor) - ranges = getTokenRanges() - shuffle(ranges) - - hosts := strings.Split(*clusterHosts, ",") - - cluster := gocql.NewCluster(hosts...) - cluster.Consistency = getConsistencyLevel(*clusterConsistency) - cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000) - cluster.NumConns = *clusterNumConnections - cluster.CQLVersion = *clusterCQLVersion - cluster.PageSize = *clusterPageSize - cluster.Keyspace = *keyspace - - if *userName != "" { - cluster.Authenticator = gocql.PasswordAuthenticator{ - Username: *userName, - Password: *password, - } - } - - if *earliestLedgerIdx == 0 { - log.Println("Please specify ledger index to delete from") - return - } - - runParameters := fmt.Sprintf(` -Execution Parameters: -===================== - -Range to be deleted : %d -> latest -Scylla cluster nodes : %s -Keyspace : %s -Consistency : %s -Timeout (ms) : %d -Connections per host : %d -CQL Version : %s -Page size : %d -# of parallel threads : %d -# of ranges to be executed : %d - -Skip deletion of: -- successor table : %t -- objects table : %t -- ledger_hashes table : %t -- transactions table : %t -- diff table : %t -- ledger_transactions table : %t -- ledgers table : %t - -Will rite latest ledger : %t - -`, - *earliestLedgerIdx, - *clusterHosts, - *keyspace, - *clusterConsistency, - cluster.Timeout/1000/1000, - *clusterNumConnections, - *clusterCQLVersion, - *clusterPageSize, - workerCount, - len(ranges), - *skipSuccessorTable, - *skipObjectsTable, - *skipLedgerHashesTable, - *skipTransactionsTable, - *skipDiffTable, - *skipLedgerTransactionsTable, - *skipLedgersTable, - !*skipWriteLatestLedger) - - fmt.Println(runParameters) - - log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *earliestLedgerIdx) - log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running") - log.Println("Are you sure you want to continue? (y/n)") - - var continueFlag string - if fmt.Scanln(&continueFlag); continueFlag != "y" { - log.Println("Aborting...") - return - } - - startTime := time.Now().UTC() - - earliestLedgerIdxInDB, latestLedgerIdxInDB, err := getLedgerRange(cluster) - if err != nil { - log.Fatal(err) - } - - if earliestLedgerIdxInDB > *earliestLedgerIdx { - log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...") - } - - if latestLedgerIdxInDB < *earliestLedgerIdx { - log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...") - } - - if err := deleteLedgerData(cluster, *earliestLedgerIdx+1, latestLedgerIdxInDB); err != nil { - log.Fatal(err) - } - - fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime)) - fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.") -} - -func getLedgerRange(cluster *gocql.ClusterConfig) (uint64, uint64, error) { - var ( - firstLedgerIdx uint64 - latestLedgerIdx uint64 - ) - - session, err := cluster.CreateSession() - if err != nil { - log.Fatal(err) - } - - defer session.Close() - - if err := session.Query("select sequence from ledger_range where is_latest = ?", false).Scan(&firstLedgerIdx); err != nil { - return 0, 0, err - } - - if err := session.Query("select sequence from ledger_range where is_latest = ?", true).Scan(&latestLedgerIdx); err != nil { - return 0, 0, err - } - - log.Printf("DB ledger range is %d:%d\n", firstLedgerIdx, latestLedgerIdx) - return firstLedgerIdx, latestLedgerIdx, nil -} - -func deleteLedgerData(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, toLedgerIdx uint64) error { - var totalErrors uint64 - var totalRows uint64 - var totalDeletes uint64 - - var info deleteInfo - var rowsCount uint64 - var deleteCount uint64 - var errCount uint64 - - log.Printf("Start scanning and removing data for %d -> latest (%d according to ledger_range table)\n\n", fromLedgerIdx, toLedgerIdx) - - // successor queries - if !*skipSuccessorTable { - log.Println("Generating delete queries for successor table") - info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx, - "SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM successor WHERE key = ? AND seq = ?") - log.Printf("Total delete queries: %d\n", len(info.Data)) - log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount - totalRows += rowsCount - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // objects queries - if !*skipObjectsTable { - log.Println("Generating delete queries for objects table") - info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx, - "SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?", - "DELETE FROM objects WHERE key = ? AND sequence = ?") - log.Printf("Total delete queries: %d\n", len(info.Data)) - log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount - totalRows += rowsCount - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // ledger_hashes queries - if !*skipLedgerHashesTable { - log.Println("Generating delete queries for ledger_hashes table") - info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx, - "SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?", - "DELETE FROM ledger_hashes WHERE hash = ?") - log.Printf("Total delete queries: %d\n", len(info.Data)) - log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount - totalRows += rowsCount - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // transactions queries - if !*skipTransactionsTable { - log.Println("Generating delete queries for transactions table") - info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx, - "SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?", - "DELETE FROM transactions WHERE hash = ?") - log.Printf("Total delete queries: %d\n", len(info.Data)) - log.Printf("Total traversed rows: %d\n\n", rowsCount) - totalErrors += errCount - totalRows += rowsCount - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // diff queries - if !*skipDiffTable { - log.Println("Generating delete queries for diff table") - info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx, - "DELETE FROM diff WHERE seq = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // ledger_transactions queries - if !*skipLedgerTransactionsTable { - log.Println("Generating delete queries for ledger_transactions table") - info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx, - "DELETE FROM ledger_transactions WHERE ledger_sequence = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // ledgers queries - if !*skipLedgersTable { - log.Println("Generating delete queries for ledgers table") - info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx, - "DELETE FROM ledgers WHERE sequence = ?") - log.Printf("Total delete queries: %d\n\n", len(info.Data)) - deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true}) - totalErrors += errCount - totalDeletes += deleteCount - } - - // TODO: tbd what to do with account_tx as it got tuple for seq_idx - // TODO: also, whether we need to take care of nft tables and other stuff like that - - if !*skipWriteLatestLedger { - if err := updateLedgerRange(cluster, fromLedgerIdx-1); err != nil { - log.Printf("ERROR failed updating ledger range: %s\n", err) - return err - } - - log.Printf("Updated latest ledger to %d in ledger_range table\n\n", fromLedgerIdx-1) - } - - log.Printf("TOTAL ERRORS: %d\n", totalErrors) - log.Printf("TOTAL ROWS TRAVERSED: %d\n", totalRows) - log.Printf("TOTAL DELETES: %d\n\n", totalDeletes) - - log.Printf("Completed deletion for %d -> %d\n\n", fromLedgerIdx, toLedgerIdx) - - return nil -} - -func prepareSimpleDeleteQueries(fromLedgerIdx uint64, toLedgerIdx uint64, deleteQueryTemplate string) deleteInfo { - var info = deleteInfo{Query: deleteQueryTemplate} - - // Note: we deliberately add 1 extra ledger to make sure we delete any data Clio might have written - // if it crashed or was stopped in the middle of writing just before it wrote ledger_range. - for i := fromLedgerIdx; i <= toLedgerIdx+1; i++ { - info.Data = append(info.Data, deleteParams{Seq: i}) - } - - return info -} - -func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, queryTemplate string, deleteQueryTemplate string) (deleteInfo, uint64, uint64) { - rangesChannel := make(chan *tokenRange, len(ranges)) - for i := range ranges { - rangesChannel <- ranges[i] - } - - close(rangesChannel) - - outChannel := make(chan deleteParams) - var info = deleteInfo{Query: deleteQueryTemplate} - - go func() { - for params := range outChannel { - info.Data = append(info.Data, params) - } - }() - - var wg sync.WaitGroup - var sessionCreationWaitGroup sync.WaitGroup - var totalRows uint64 - var totalErrors uint64 - - wg.Add(workerCount) - sessionCreationWaitGroup.Add(workerCount) - - for i := 0; i < workerCount; i++ { - go func(q string) { - defer wg.Done() - - var session *gocql.Session - var err error - if session, err = cluster.CreateSession(); err == nil { - defer session.Close() - - sessionCreationWaitGroup.Done() - sessionCreationWaitGroup.Wait() - preparedQuery := session.Query(q) - - for r := range rangesChannel { - preparedQuery.Bind(r.StartRange, r.EndRange) - - var pageState []byte - var rowsRetrieved uint64 - - for { - iter := preparedQuery.PageSize(*clusterPageSize).PageState(pageState).Iter() - nextPageState := iter.PageState() - scanner := iter.Scanner() - - for scanner.Next() { - var key []byte - var seq uint64 - - err = scanner.Scan(&key, &seq) - if err == nil { - rowsRetrieved++ - - // only grab the rows that are in the correct range of sequence numbers - if fromLedgerIdx <= seq { - outChannel <- deleteParams{Seq: seq, Blob: key} - } - } else { - log.Printf("ERROR: page iteration failed: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState)) - atomic.AddUint64(&totalErrors, 1) - } - } - - if len(nextPageState) == 0 { - break - } - - pageState = nextPageState - } - - atomic.AddUint64(&totalRows, rowsRetrieved) - } - } else { - log.Printf("ERROR: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) - atomic.AddUint64(&totalErrors, 1) - } - }(queryTemplate) - } - - wg.Wait() - close(outChannel) - - return info, totalRows, totalErrors -} - -func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSettings columnSettings) (uint64, uint64) { - var wg sync.WaitGroup - var sessionCreationWaitGroup sync.WaitGroup - var totalDeletes uint64 - var totalErrors uint64 - - chunks := splitDeleteWork(info) - chunksChannel := make(chan []deleteParams, len(chunks)) - for i := range chunks { - chunksChannel <- chunks[i] - } - - close(chunksChannel) - - wg.Add(workerCount) - sessionCreationWaitGroup.Add(workerCount) - - query := info.Query - bindCount := strings.Count(query, "?") - - for i := 0; i < workerCount; i++ { - go func(number int, q string, bc int) { - defer wg.Done() - - var session *gocql.Session - var err error - if session, err = cluster.CreateSession(); err == nil { - defer session.Close() - - sessionCreationWaitGroup.Done() - sessionCreationWaitGroup.Wait() - preparedQuery := session.Query(q) - - for chunk := range chunksChannel { - for _, r := range chunk { - if bc == 2 { - preparedQuery.Bind(r.Blob, r.Seq) - } else if bc == 1 { - if colSettings.UseSeq { - preparedQuery.Bind(r.Seq) - } else if colSettings.UseBlob { - preparedQuery.Bind(r.Blob) - } - } - - if err := preparedQuery.Exec(); err != nil { - log.Printf("DELETE ERROR: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq)) - atomic.AddUint64(&totalErrors, 1) - } else { - atomic.AddUint64(&totalDeletes, 1) - } - } - } - } else { - log.Printf("ERROR: %s\n", err) - fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) - atomic.AddUint64(&totalErrors, 1) - } - }(i, query, bindCount) - } - - wg.Wait() - return totalDeletes, totalErrors -} - -func updateLedgerRange(cluster *gocql.ClusterConfig, ledgerIndex uint64) error { - log.Printf("Updating latest ledger to %d\n", ledgerIndex) - - if session, err := cluster.CreateSession(); err == nil { - defer session.Close() - - query := "UPDATE ledger_range SET sequence = ? WHERE is_latest = ?" - preparedQuery := session.Query(query, ledgerIndex, true) - if err := preparedQuery.Exec(); err != nil { - fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][true]\n", query, ledgerIndex) - return err - } - } else { - fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err) - return err - } - - return nil -}