mirror of
https://github.com/XRPLF/clio.git
synced 2025-11-04 20:05:51 +00:00
committed by
Sergey Kuznetsov
parent
9cb1e06c8e
commit
6d20f39f67
210
tools/cassandra_delete_range/cassandra_delete_range.go
Normal file
210
tools/cassandra_delete_range/cassandra_delete_range.go
Normal file
@@ -0,0 +1,210 @@
|
|||||||
|
//
|
||||||
|
// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go
|
||||||
|
//
|
||||||
|
|
||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"strings"
|
||||||
|
"time"
|
||||||
|
"xrplf/clio/cassandra_delete_range/internal/cass"
|
||||||
|
"xrplf/clio/cassandra_delete_range/internal/util"
|
||||||
|
|
||||||
|
"github.com/alecthomas/kingpin/v2"
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
)
|
||||||
|
|
||||||
|
const (
|
||||||
|
defaultNumberOfNodesInCluster = 3
|
||||||
|
defaultNumberOfCoresInNode = 8
|
||||||
|
defaultSmudgeFactor = 3
|
||||||
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
app = kingpin.New("cassandra_delete_range", "A tool that prunes data from the Clio DB.")
|
||||||
|
hosts = app.Flag("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String()
|
||||||
|
|
||||||
|
deleteAfter = app.Command("delete-after", "Prunes from the given ledger index until the end")
|
||||||
|
deleteAfterLedgerIdx = deleteAfter.Arg("idx", "Sets the earliest ledger_index to keep untouched (delete everything after this ledger index)").Required().Uint64()
|
||||||
|
|
||||||
|
deleteBefore = app.Command("delete-before", "Prunes everything before the given ledger index")
|
||||||
|
deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64()
|
||||||
|
|
||||||
|
getLedgerRange = app.Command("get-ledger-range", "Fetch the current lender_range table values")
|
||||||
|
|
||||||
|
nodesInCluster = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
|
||||||
|
coresInNode = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
|
||||||
|
smudgeFactor = app.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int()
|
||||||
|
clusterConsistency = app.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String()
|
||||||
|
clusterTimeout = app.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int()
|
||||||
|
clusterNumConnections = app.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int()
|
||||||
|
clusterCQLVersion = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
|
||||||
|
clusterPageSize = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
|
||||||
|
keyspace = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
|
||||||
|
|
||||||
|
userName = app.Flag("username", "Username to use when connecting to the cluster").String()
|
||||||
|
password = app.Flag("password", "Password to use when connecting to the cluster").String()
|
||||||
|
|
||||||
|
skipSuccessorTable = app.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool()
|
||||||
|
skipObjectsTable = app.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool()
|
||||||
|
skipLedgerHashesTable = app.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool()
|
||||||
|
skipTransactionsTable = app.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool()
|
||||||
|
skipDiffTable = app.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool()
|
||||||
|
skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
|
||||||
|
skipLedgersTable = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
|
||||||
|
skipWriteLatestLedger = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
|
||||||
|
|
||||||
|
workerCount = 1 // the calculated number of parallel goroutines the client should run
|
||||||
|
ranges []*util.TokenRange // the calculated ranges to be executed in parallel
|
||||||
|
)
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
log.SetOutput(os.Stdout)
|
||||||
|
|
||||||
|
command := kingpin.MustParse(app.Parse(os.Args[1:]))
|
||||||
|
cluster, err := prepareDb(hosts)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
clioCass := cass.NewClioCass(&cass.Settings{
|
||||||
|
SkipSuccessorTable: *skipSuccessorTable,
|
||||||
|
SkipObjectsTable: *skipObjectsTable,
|
||||||
|
SkipLedgerHashesTable: *skipLedgerHashesTable,
|
||||||
|
SkipTransactionsTable: *skipTransactionsTable,
|
||||||
|
SkipDiffTable: *skipDiffTable,
|
||||||
|
SkipLedgerTransactionsTable: *skipLedgerHashesTable,
|
||||||
|
SkipLedgersTable: *skipLedgersTable,
|
||||||
|
SkipWriteLatestLedger: *skipWriteLatestLedger,
|
||||||
|
WorkerCount: workerCount,
|
||||||
|
Ranges: ranges}, cluster)
|
||||||
|
|
||||||
|
switch command {
|
||||||
|
case deleteAfter.FullCommand():
|
||||||
|
if *deleteAfterLedgerIdx == 0 {
|
||||||
|
log.Println("Please specify ledger index to delete from")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
displayParams("delete-after", hosts, cluster.Timeout/1000/1000, *deleteAfterLedgerIdx)
|
||||||
|
log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *deleteAfterLedgerIdx)
|
||||||
|
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
|
||||||
|
|
||||||
|
if !util.PromptContinue() {
|
||||||
|
log.Fatal("Aborted")
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime := time.Now().UTC()
|
||||||
|
clioCass.DeleteAfter(*deleteAfterLedgerIdx)
|
||||||
|
|
||||||
|
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
|
||||||
|
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
|
||||||
|
|
||||||
|
case deleteBefore.FullCommand():
|
||||||
|
if *deleteBeforeLedgerIdx == 0 {
|
||||||
|
log.Println("Please specify ledger index to delete until")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
displayParams("delete-before", hosts, cluster.Timeout/1000/1000, *deleteBeforeLedgerIdx)
|
||||||
|
log.Printf("Will delete everything before ledger index %d (exclusive)\n", *deleteBeforeLedgerIdx)
|
||||||
|
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
|
||||||
|
|
||||||
|
if !util.PromptContinue() {
|
||||||
|
log.Fatal("Aborted")
|
||||||
|
}
|
||||||
|
|
||||||
|
startTime := time.Now().UTC()
|
||||||
|
clioCass.DeleteBefore(*deleteBeforeLedgerIdx)
|
||||||
|
|
||||||
|
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
|
||||||
|
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
|
||||||
|
case getLedgerRange.FullCommand():
|
||||||
|
from, to, err := clioCass.GetLedgerRange()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("Range: %d -> %d\n", from, to)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func displayParams(command string, hosts *string, timeout time.Duration, ledgerIdx uint64) {
|
||||||
|
runParameters := fmt.Sprintf(`
|
||||||
|
Execution Parameters:
|
||||||
|
=====================
|
||||||
|
|
||||||
|
Command : %s
|
||||||
|
Ledger index : %d
|
||||||
|
Scylla cluster nodes : %s
|
||||||
|
Keyspace : %s
|
||||||
|
Consistency : %s
|
||||||
|
Timeout (ms) : %d
|
||||||
|
Connections per host : %d
|
||||||
|
CQL Version : %s
|
||||||
|
Page size : %d
|
||||||
|
# of parallel threads : %d
|
||||||
|
# of ranges to be executed : %d
|
||||||
|
|
||||||
|
Skip deletion of:
|
||||||
|
- successor table : %t
|
||||||
|
- objects table : %t
|
||||||
|
- ledger_hashes table : %t
|
||||||
|
- transactions table : %t
|
||||||
|
- diff table : %t
|
||||||
|
- ledger_transactions table : %t
|
||||||
|
- ledgers table : %t
|
||||||
|
|
||||||
|
Will update ledger_range : %t
|
||||||
|
|
||||||
|
`,
|
||||||
|
command,
|
||||||
|
ledgerIdx,
|
||||||
|
*hosts,
|
||||||
|
*keyspace,
|
||||||
|
*clusterConsistency,
|
||||||
|
timeout,
|
||||||
|
*clusterNumConnections,
|
||||||
|
*clusterCQLVersion,
|
||||||
|
*clusterPageSize,
|
||||||
|
workerCount,
|
||||||
|
len(ranges),
|
||||||
|
*skipSuccessorTable || command == "delete-before",
|
||||||
|
*skipObjectsTable,
|
||||||
|
*skipLedgerHashesTable,
|
||||||
|
*skipTransactionsTable,
|
||||||
|
*skipDiffTable,
|
||||||
|
*skipLedgerTransactionsTable,
|
||||||
|
*skipLedgersTable,
|
||||||
|
!*skipWriteLatestLedger)
|
||||||
|
|
||||||
|
fmt.Println(runParameters)
|
||||||
|
}
|
||||||
|
|
||||||
|
func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) {
|
||||||
|
workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
|
||||||
|
ranges = util.GetTokenRanges(workerCount)
|
||||||
|
util.Shuffle(ranges)
|
||||||
|
|
||||||
|
hosts := strings.Split(*dbHosts, ",")
|
||||||
|
|
||||||
|
cluster := gocql.NewCluster(hosts...)
|
||||||
|
cluster.Consistency = util.GetConsistencyLevel(*clusterConsistency)
|
||||||
|
cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000)
|
||||||
|
cluster.NumConns = *clusterNumConnections
|
||||||
|
cluster.CQLVersion = *clusterCQLVersion
|
||||||
|
cluster.PageSize = *clusterPageSize
|
||||||
|
cluster.Keyspace = *keyspace
|
||||||
|
|
||||||
|
if *userName != "" {
|
||||||
|
cluster.Authenticator = gocql.PasswordAuthenticator{
|
||||||
|
Username: *userName,
|
||||||
|
Password: *password,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return cluster, nil
|
||||||
|
}
|
||||||
@@ -3,9 +3,13 @@ module xrplf/clio/cassandra_delete_range
|
|||||||
go 1.21.6
|
go 1.21.6
|
||||||
|
|
||||||
require (
|
require (
|
||||||
github.com/alecthomas/kingpin/v2 v2.4.0 // indirect
|
github.com/alecthomas/kingpin/v2 v2.4.0
|
||||||
|
github.com/gocql/gocql v1.6.0
|
||||||
|
github.com/pmorelli92/maybe v1.1.0
|
||||||
|
)
|
||||||
|
|
||||||
|
require (
|
||||||
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
|
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
|
||||||
github.com/gocql/gocql v1.6.0 // indirect
|
|
||||||
github.com/golang/snappy v0.0.3 // indirect
|
github.com/golang/snappy v0.0.3 // indirect
|
||||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
|
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
|
||||||
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
|
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
|
||||||
|
|||||||
@@ -2,25 +2,38 @@ github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjH
|
|||||||
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
|
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
|
||||||
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
|
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
|
||||||
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
|
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
|
||||||
|
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY=
|
||||||
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
|
||||||
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY=
|
||||||
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
|
||||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
|
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
|
||||||
|
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
|
||||||
github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU=
|
github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU=
|
||||||
github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
|
github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
|
||||||
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
|
||||||
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
|
||||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
|
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
|
||||||
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
|
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
|
||||||
|
github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
|
||||||
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
|
||||||
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
|
||||||
|
github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
|
||||||
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
|
||||||
|
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
|
||||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
|
||||||
|
github.com/pmorelli92/maybe v1.1.0 h1:uyV6NLF4453AQARZ6rKpJNzc9PBsQmpGDtUonhxInPU=
|
||||||
|
github.com/pmorelli92/maybe v1.1.0/go.mod h1:5PrW2+fo4/j/LMX6HT49Hb3/HOKv1tbodkzgy4lEopA=
|
||||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
|
||||||
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
|
||||||
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
|
||||||
|
github.com/stretchr/testify v1.8.2 h1:+h33VjcLVPDHtOdpUCuF+7gSuG3yGIftsP1YvFihtJ8=
|
||||||
|
github.com/stretchr/testify v1.8.2/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4=
|
||||||
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
|
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
|
||||||
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
|
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
|
||||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
|
||||||
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
|
||||||
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
|
||||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
|
||||||
|
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
|
||||||
|
|||||||
564
tools/cassandra_delete_range/internal/cass/cass.go
Normal file
564
tools/cassandra_delete_range/internal/cass/cass.go
Normal file
@@ -0,0 +1,564 @@
|
|||||||
|
package cass
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"os"
|
||||||
|
"slices"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"xrplf/clio/cassandra_delete_range/internal/util"
|
||||||
|
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
"github.com/pmorelli92/maybe"
|
||||||
|
)
|
||||||
|
|
||||||
|
type deleteInfo struct {
|
||||||
|
Query string
|
||||||
|
Data []deleteParams
|
||||||
|
}
|
||||||
|
|
||||||
|
type deleteParams struct {
|
||||||
|
Seq uint64
|
||||||
|
Blob []byte // hash, key, etc
|
||||||
|
}
|
||||||
|
|
||||||
|
type columnSettings struct {
|
||||||
|
UseSeq bool
|
||||||
|
UseBlob bool
|
||||||
|
}
|
||||||
|
|
||||||
|
type Settings struct {
|
||||||
|
SkipSuccessorTable bool
|
||||||
|
SkipObjectsTable bool
|
||||||
|
SkipLedgerHashesTable bool
|
||||||
|
SkipTransactionsTable bool
|
||||||
|
SkipDiffTable bool
|
||||||
|
SkipLedgerTransactionsTable bool
|
||||||
|
SkipLedgersTable bool
|
||||||
|
SkipWriteLatestLedger bool
|
||||||
|
|
||||||
|
WorkerCount int
|
||||||
|
Ranges []*util.TokenRange
|
||||||
|
}
|
||||||
|
|
||||||
|
type Cass interface {
|
||||||
|
GetLedgerRange() (uint64, uint64, error)
|
||||||
|
DeleteBefore(ledgerIdx uint64)
|
||||||
|
DeleteAfter(ledgerIdx uint64)
|
||||||
|
}
|
||||||
|
|
||||||
|
type ClioCass struct {
|
||||||
|
settings *Settings
|
||||||
|
clusterConfig *gocql.ClusterConfig
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewClioCass(settings *Settings, cluster *gocql.ClusterConfig) *ClioCass {
|
||||||
|
return &ClioCass{settings, cluster}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) DeleteBefore(ledgerIdx uint64) {
|
||||||
|
firstLedgerIdxInDB, latestLedgerIdxInDB, err := c.GetLedgerRange()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB)
|
||||||
|
|
||||||
|
if firstLedgerIdxInDB > ledgerIdx {
|
||||||
|
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestLedgerIdxInDB < ledgerIdx {
|
||||||
|
log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
from maybe.Maybe[uint64] // not used
|
||||||
|
to maybe.Maybe[uint64] = maybe.Set(ledgerIdx - 1)
|
||||||
|
)
|
||||||
|
|
||||||
|
c.settings.SkipSuccessorTable = true // skip successor update until we know how to do it
|
||||||
|
if err := c.pruneData(from, to, firstLedgerIdxInDB, latestLedgerIdxInDB); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) DeleteAfter(ledgerIdx uint64) {
|
||||||
|
firstLedgerIdxInDB, latestLedgerIdxInDB, err := c.GetLedgerRange()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB)
|
||||||
|
|
||||||
|
if firstLedgerIdxInDB > ledgerIdx {
|
||||||
|
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
if latestLedgerIdxInDB < ledgerIdx {
|
||||||
|
log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
from maybe.Maybe[uint64] = maybe.Set(ledgerIdx + 1)
|
||||||
|
to maybe.Maybe[uint64] // not used
|
||||||
|
)
|
||||||
|
|
||||||
|
if err := c.pruneData(from, to, firstLedgerIdxInDB, latestLedgerIdxInDB); err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) GetLedgerRange() (uint64, uint64, error) {
|
||||||
|
var (
|
||||||
|
firstLedgerIdx uint64
|
||||||
|
latestLedgerIdx uint64
|
||||||
|
)
|
||||||
|
|
||||||
|
session, err := c.clusterConfig.CreateSession()
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
if err := session.Query("SELECT sequence FROM ledger_range WHERE is_latest = ?", false).Scan(&firstLedgerIdx); err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := session.Query("SELECT sequence FROM ledger_range WHERE is_latest = ?", true).Scan(&latestLedgerIdx); err != nil {
|
||||||
|
return 0, 0, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return firstLedgerIdx, latestLedgerIdx, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) pruneData(
|
||||||
|
fromLedgerIdx maybe.Maybe[uint64],
|
||||||
|
toLedgerIdx maybe.Maybe[uint64],
|
||||||
|
firstLedgerIdxInDB uint64,
|
||||||
|
latestLedgerIdxInDB uint64,
|
||||||
|
) error {
|
||||||
|
var totalErrors uint64
|
||||||
|
var totalRows uint64
|
||||||
|
var totalDeletes uint64
|
||||||
|
|
||||||
|
var info deleteInfo
|
||||||
|
var rowsCount uint64
|
||||||
|
var deleteCount uint64
|
||||||
|
var errCount uint64
|
||||||
|
|
||||||
|
// calculate range of simple delete queries
|
||||||
|
var (
|
||||||
|
rangeFrom uint64 = firstLedgerIdxInDB
|
||||||
|
rangeTo uint64 = latestLedgerIdxInDB
|
||||||
|
)
|
||||||
|
|
||||||
|
if fromLedgerIdx.HasValue() {
|
||||||
|
rangeFrom = fromLedgerIdx.Value()
|
||||||
|
}
|
||||||
|
|
||||||
|
if toLedgerIdx.HasValue() {
|
||||||
|
rangeTo = toLedgerIdx.Value()
|
||||||
|
}
|
||||||
|
|
||||||
|
// calculate and print deletion plan
|
||||||
|
fromStr := "beginning"
|
||||||
|
if fromLedgerIdx.HasValue() {
|
||||||
|
fromStr = strconv.Itoa(int(fromLedgerIdx.Value()))
|
||||||
|
}
|
||||||
|
|
||||||
|
toStr := "latest"
|
||||||
|
if toLedgerIdx.HasValue() {
|
||||||
|
toStr = strconv.Itoa(int(toLedgerIdx.Value()))
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("Start scanning and removing data for %s -> %s\n\n", fromStr, toStr)
|
||||||
|
|
||||||
|
// successor queries
|
||||||
|
if !c.settings.SkipSuccessorTable {
|
||||||
|
log.Println("Generating delete queries for successor table")
|
||||||
|
info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
||||||
|
"SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?",
|
||||||
|
"DELETE FROM successor WHERE key = ? AND seq = ?", false)
|
||||||
|
log.Printf("Total delete queries: %d\n", len(info.Data))
|
||||||
|
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
||||||
|
totalErrors += errCount
|
||||||
|
totalRows += rowsCount
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// objects queries
|
||||||
|
if !c.settings.SkipObjectsTable {
|
||||||
|
log.Println("Generating delete queries for objects table")
|
||||||
|
info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
||||||
|
"SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?",
|
||||||
|
"DELETE FROM objects WHERE key = ? AND sequence = ?", true)
|
||||||
|
log.Printf("Total delete queries: %d\n", len(info.Data))
|
||||||
|
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
||||||
|
totalErrors += errCount
|
||||||
|
totalRows += rowsCount
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// ledger_hashes queries
|
||||||
|
if !c.settings.SkipLedgerHashesTable {
|
||||||
|
log.Println("Generating delete queries for ledger_hashes table")
|
||||||
|
info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
||||||
|
"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?",
|
||||||
|
"DELETE FROM ledger_hashes WHERE hash = ?", false)
|
||||||
|
log.Printf("Total delete queries: %d\n", len(info.Data))
|
||||||
|
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
||||||
|
totalErrors += errCount
|
||||||
|
totalRows += rowsCount
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// transactions queries
|
||||||
|
if !c.settings.SkipTransactionsTable {
|
||||||
|
log.Println("Generating delete queries for transactions table")
|
||||||
|
info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
||||||
|
"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?",
|
||||||
|
"DELETE FROM transactions WHERE hash = ?", false)
|
||||||
|
log.Printf("Total delete queries: %d\n", len(info.Data))
|
||||||
|
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
||||||
|
totalErrors += errCount
|
||||||
|
totalRows += rowsCount
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// diff queries
|
||||||
|
if !c.settings.SkipDiffTable {
|
||||||
|
log.Println("Generating delete queries for diff table")
|
||||||
|
info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
|
||||||
|
"DELETE FROM diff WHERE seq = ?")
|
||||||
|
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// ledger_transactions queries
|
||||||
|
if !c.settings.SkipLedgerTransactionsTable {
|
||||||
|
log.Println("Generating delete queries for ledger_transactions table")
|
||||||
|
info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
|
||||||
|
"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
|
||||||
|
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// ledgers queries
|
||||||
|
if !c.settings.SkipLedgersTable {
|
||||||
|
log.Println("Generating delete queries for ledgers table")
|
||||||
|
|
||||||
|
info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
|
||||||
|
"DELETE FROM ledgers WHERE sequence = ?")
|
||||||
|
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
||||||
|
deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
|
||||||
|
totalErrors += errCount
|
||||||
|
totalDeletes += deleteCount
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: tbd what to do with account_tx as it got tuple for seq_idx
|
||||||
|
// TODO: also, whether we need to take care of nft tables and other stuff like that
|
||||||
|
|
||||||
|
if !c.settings.SkipWriteLatestLedger {
|
||||||
|
var (
|
||||||
|
first maybe.Maybe[uint64]
|
||||||
|
last maybe.Maybe[uint64]
|
||||||
|
)
|
||||||
|
|
||||||
|
if fromLedgerIdx.HasValue() {
|
||||||
|
last = maybe.Set(fromLedgerIdx.Value() - 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if toLedgerIdx.HasValue() {
|
||||||
|
first = maybe.Set(toLedgerIdx.Value() + 1)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := c.updateLedgerRange(first, last); err != nil {
|
||||||
|
log.Printf("ERROR failed updating ledger range: %s\n", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("TOTAL ERRORS: %d\n", totalErrors)
|
||||||
|
log.Printf("TOTAL ROWS TRAVERSED: %d\n", totalRows)
|
||||||
|
log.Printf("TOTAL DELETES: %d\n\n", totalDeletes)
|
||||||
|
|
||||||
|
log.Printf("Completed deletion for %s -> %s\n\n", fromStr, toStr)
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) prepareSimpleDeleteQueries(
|
||||||
|
fromLedgerIdx uint64,
|
||||||
|
toLedgerIdx uint64,
|
||||||
|
deleteQueryTemplate string,
|
||||||
|
) deleteInfo {
|
||||||
|
var info = deleteInfo{Query: deleteQueryTemplate}
|
||||||
|
|
||||||
|
for i := fromLedgerIdx; i <= toLedgerIdx; i++ {
|
||||||
|
info.Data = append(info.Data, deleteParams{Seq: i})
|
||||||
|
}
|
||||||
|
|
||||||
|
return info
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) prepareDeleteQueries(
|
||||||
|
fromLedgerIdx maybe.Maybe[uint64],
|
||||||
|
toLedgerIdx maybe.Maybe[uint64],
|
||||||
|
queryTemplate string,
|
||||||
|
deleteQueryTemplate string,
|
||||||
|
keepLastValid bool,
|
||||||
|
) (deleteInfo, uint64, uint64) {
|
||||||
|
rangesChannel := make(chan *util.TokenRange, len(c.settings.Ranges))
|
||||||
|
for i := range c.settings.Ranges {
|
||||||
|
rangesChannel <- c.settings.Ranges[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
close(rangesChannel)
|
||||||
|
|
||||||
|
outChannel := make(chan deleteParams)
|
||||||
|
var info = deleteInfo{Query: deleteQueryTemplate}
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
total := uint64(0)
|
||||||
|
for params := range outChannel {
|
||||||
|
total += 1
|
||||||
|
if total%1000 == 0 {
|
||||||
|
log.Printf("... %d queries ...\n", total)
|
||||||
|
}
|
||||||
|
info.Data = append(info.Data, params)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var sessionCreationWaitGroup sync.WaitGroup
|
||||||
|
var totalRows uint64
|
||||||
|
var totalErrors uint64
|
||||||
|
|
||||||
|
wg.Add(c.settings.WorkerCount)
|
||||||
|
sessionCreationWaitGroup.Add(c.settings.WorkerCount)
|
||||||
|
|
||||||
|
for i := 0; i < c.settings.WorkerCount; i++ {
|
||||||
|
go func(q string) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
var session *gocql.Session
|
||||||
|
var err error
|
||||||
|
if session, err = c.clusterConfig.CreateSession(); err == nil {
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
sessionCreationWaitGroup.Done()
|
||||||
|
sessionCreationWaitGroup.Wait()
|
||||||
|
preparedQuery := session.Query(q)
|
||||||
|
|
||||||
|
for r := range rangesChannel {
|
||||||
|
preparedQuery.Bind(r.StartRange, r.EndRange)
|
||||||
|
|
||||||
|
var pageState []byte
|
||||||
|
var rowsRetrieved uint64
|
||||||
|
|
||||||
|
var previousKey []byte
|
||||||
|
var foundLastValid bool
|
||||||
|
|
||||||
|
for {
|
||||||
|
iter := preparedQuery.PageSize(c.clusterConfig.PageSize).PageState(pageState).Iter()
|
||||||
|
nextPageState := iter.PageState()
|
||||||
|
scanner := iter.Scanner()
|
||||||
|
|
||||||
|
for scanner.Next() {
|
||||||
|
var key []byte
|
||||||
|
var seq uint64
|
||||||
|
|
||||||
|
err = scanner.Scan(&key, &seq)
|
||||||
|
if err == nil {
|
||||||
|
rowsRetrieved++
|
||||||
|
|
||||||
|
if keepLastValid && !slices.Equal(previousKey, key) {
|
||||||
|
previousKey = key
|
||||||
|
foundLastValid = false
|
||||||
|
}
|
||||||
|
|
||||||
|
// only grab the rows that are in the correct range of sequence numbers
|
||||||
|
if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq {
|
||||||
|
outChannel <- deleteParams{Seq: seq, Blob: key}
|
||||||
|
} else if toLedgerIdx.HasValue() {
|
||||||
|
if seq < toLedgerIdx.Value() && (!keepLastValid || foundLastValid) {
|
||||||
|
outChannel <- deleteParams{Seq: seq, Blob: key}
|
||||||
|
} else if seq <= toLedgerIdx.Value()+1 {
|
||||||
|
foundLastValid = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("ERROR: page iteration failed: %s\n", err)
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState))
|
||||||
|
atomic.AddUint64(&totalErrors, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(nextPageState) == 0 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
pageState = nextPageState
|
||||||
|
}
|
||||||
|
|
||||||
|
atomic.AddUint64(&totalRows, rowsRetrieved)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("ERROR: %s\n", err)
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
||||||
|
atomic.AddUint64(&totalErrors, 1)
|
||||||
|
}
|
||||||
|
}(queryTemplate)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
close(outChannel)
|
||||||
|
|
||||||
|
return info, totalRows, totalErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams {
|
||||||
|
var n = c.settings.WorkerCount
|
||||||
|
var chunkSize = len(info.Data) / n
|
||||||
|
var chunks [][]deleteParams
|
||||||
|
|
||||||
|
if len(info.Data) == 0 {
|
||||||
|
return chunks
|
||||||
|
}
|
||||||
|
|
||||||
|
if chunkSize < 1 {
|
||||||
|
chunks = append(chunks, info.Data)
|
||||||
|
return chunks
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < len(info.Data); i += chunkSize {
|
||||||
|
end := i + chunkSize
|
||||||
|
|
||||||
|
if end > len(info.Data) {
|
||||||
|
end = len(info.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
chunks = append(chunks, info.Data[i:end])
|
||||||
|
}
|
||||||
|
|
||||||
|
return chunks
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSettings) (uint64, uint64) {
|
||||||
|
var wg sync.WaitGroup
|
||||||
|
var sessionCreationWaitGroup sync.WaitGroup
|
||||||
|
var totalDeletes uint64
|
||||||
|
var totalErrors uint64
|
||||||
|
|
||||||
|
chunks := c.splitDeleteWork(info)
|
||||||
|
chunksChannel := make(chan []deleteParams, len(chunks))
|
||||||
|
for i := range chunks {
|
||||||
|
chunksChannel <- chunks[i]
|
||||||
|
}
|
||||||
|
|
||||||
|
close(chunksChannel)
|
||||||
|
|
||||||
|
wg.Add(c.settings.WorkerCount)
|
||||||
|
sessionCreationWaitGroup.Add(c.settings.WorkerCount)
|
||||||
|
|
||||||
|
query := info.Query
|
||||||
|
bindCount := strings.Count(query, "?")
|
||||||
|
|
||||||
|
for i := 0; i < c.settings.WorkerCount; i++ {
|
||||||
|
go func(number int, q string, bc int) {
|
||||||
|
defer wg.Done()
|
||||||
|
|
||||||
|
var session *gocql.Session
|
||||||
|
var err error
|
||||||
|
if session, err = c.clusterConfig.CreateSession(); err == nil {
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
sessionCreationWaitGroup.Done()
|
||||||
|
sessionCreationWaitGroup.Wait()
|
||||||
|
preparedQuery := session.Query(q)
|
||||||
|
|
||||||
|
for chunk := range chunksChannel {
|
||||||
|
for _, r := range chunk {
|
||||||
|
if bc == 2 {
|
||||||
|
preparedQuery.Bind(r.Blob, r.Seq)
|
||||||
|
} else if bc == 1 {
|
||||||
|
if colSettings.UseSeq {
|
||||||
|
preparedQuery.Bind(r.Seq)
|
||||||
|
} else if colSettings.UseBlob {
|
||||||
|
preparedQuery.Bind(r.Blob)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := preparedQuery.Exec(); err != nil {
|
||||||
|
log.Printf("DELETE ERROR: %s\n", err)
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
|
||||||
|
atomic.AddUint64(&totalErrors, 1)
|
||||||
|
} else {
|
||||||
|
atomic.AddUint64(&totalDeletes, 1)
|
||||||
|
if atomic.LoadUint64(&totalDeletes)%10000 == 0 {
|
||||||
|
log.Printf("... %d deletes ...\n", totalDeletes)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
log.Printf("ERROR: %s\n", err)
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
||||||
|
atomic.AddUint64(&totalErrors, 1)
|
||||||
|
}
|
||||||
|
}(i, query, bindCount)
|
||||||
|
}
|
||||||
|
|
||||||
|
wg.Wait()
|
||||||
|
return totalDeletes, totalErrors
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ClioCass) updateLedgerRange(newStartLedger maybe.Maybe[uint64], newEndLedger maybe.Maybe[uint64]) error {
|
||||||
|
if session, err := c.clusterConfig.CreateSession(); err == nil {
|
||||||
|
defer session.Close()
|
||||||
|
|
||||||
|
query := "UPDATE ledger_range SET sequence = ? WHERE is_latest = ?"
|
||||||
|
|
||||||
|
if newEndLedger.HasValue() {
|
||||||
|
log.Printf("Updating ledger range end to %d\n", newEndLedger.Value())
|
||||||
|
|
||||||
|
preparedQuery := session.Query(query, newEndLedger.Value(), true)
|
||||||
|
if err := preparedQuery.Exec(); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][true]\n", query, newEndLedger.Value())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if newStartLedger.HasValue() {
|
||||||
|
log.Printf("Updating ledger range start to %d\n", newStartLedger.Value())
|
||||||
|
|
||||||
|
preparedQuery := session.Query(query, newStartLedger.Value(), false)
|
||||||
|
if err := preparedQuery.Exec(); err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][false]\n", query, newStartLedger.Value())
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
91
tools/cassandra_delete_range/internal/util/util.go
Normal file
91
tools/cassandra_delete_range/internal/util/util.go
Normal file
@@ -0,0 +1,91 @@
|
|||||||
|
package util
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
|
"github.com/gocql/gocql"
|
||||||
|
)
|
||||||
|
|
||||||
|
type TokenRange struct {
|
||||||
|
StartRange int64
|
||||||
|
EndRange int64
|
||||||
|
}
|
||||||
|
|
||||||
|
func Shuffle(data []*TokenRange) {
|
||||||
|
for i := 1; i < len(data); i++ {
|
||||||
|
r := rand.Intn(i + 1)
|
||||||
|
if i != r {
|
||||||
|
data[r], data[i] = data[i], data[r]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func PromptContinue() bool {
|
||||||
|
var continueFlag string
|
||||||
|
|
||||||
|
log.Println("Are you sure you want to continue? (y/n)")
|
||||||
|
if fmt.Scanln(&continueFlag); continueFlag != "y" {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetTokenRanges(workerCount int) []*TokenRange {
|
||||||
|
var n = workerCount
|
||||||
|
var m = int64(n * 100)
|
||||||
|
var maxSize uint64 = math.MaxInt64 * 2
|
||||||
|
var rangeSize = maxSize / uint64(m)
|
||||||
|
|
||||||
|
var start int64 = math.MinInt64
|
||||||
|
var end int64
|
||||||
|
var shouldBreak = false
|
||||||
|
|
||||||
|
var ranges = make([]*TokenRange, m)
|
||||||
|
|
||||||
|
for i := int64(0); i < m; i++ {
|
||||||
|
end = start + int64(rangeSize)
|
||||||
|
if start > 0 && end < 0 {
|
||||||
|
end = math.MaxInt64
|
||||||
|
shouldBreak = true
|
||||||
|
}
|
||||||
|
|
||||||
|
ranges[i] = &TokenRange{StartRange: start, EndRange: end}
|
||||||
|
|
||||||
|
if shouldBreak {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
|
||||||
|
start = end + 1
|
||||||
|
}
|
||||||
|
|
||||||
|
return ranges
|
||||||
|
}
|
||||||
|
|
||||||
|
func GetConsistencyLevel(consistencyValue string) gocql.Consistency {
|
||||||
|
switch consistencyValue {
|
||||||
|
case "any":
|
||||||
|
return gocql.Any
|
||||||
|
case "one":
|
||||||
|
return gocql.One
|
||||||
|
case "two":
|
||||||
|
return gocql.Two
|
||||||
|
case "three":
|
||||||
|
return gocql.Three
|
||||||
|
case "quorum":
|
||||||
|
return gocql.Quorum
|
||||||
|
case "all":
|
||||||
|
return gocql.All
|
||||||
|
case "localquorum":
|
||||||
|
return gocql.LocalQuorum
|
||||||
|
case "eachquorum":
|
||||||
|
return gocql.EachQuorum
|
||||||
|
case "localone":
|
||||||
|
return gocql.LocalOne
|
||||||
|
default:
|
||||||
|
return gocql.One
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,619 +0,0 @@
|
|||||||
//
|
|
||||||
// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go
|
|
||||||
//
|
|
||||||
|
|
||||||
package main
|
|
||||||
|
|
||||||
import (
|
|
||||||
"fmt"
|
|
||||||
"log"
|
|
||||||
"math"
|
|
||||||
"math/rand"
|
|
||||||
"os"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/alecthomas/kingpin/v2"
|
|
||||||
"github.com/gocql/gocql"
|
|
||||||
)
|
|
||||||
|
|
||||||
const (
|
|
||||||
defaultNumberOfNodesInCluster = 3
|
|
||||||
defaultNumberOfCoresInNode = 8
|
|
||||||
defaultSmudgeFactor = 3
|
|
||||||
)
|
|
||||||
|
|
||||||
var (
|
|
||||||
clusterHosts = kingpin.Arg("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String()
|
|
||||||
earliestLedgerIdx = kingpin.Flag("ledgerIdx", "Sets the earliest ledger_index to keep untouched").Short('i').Required().Uint64()
|
|
||||||
|
|
||||||
nodesInCluster = kingpin.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
|
|
||||||
coresInNode = kingpin.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
|
|
||||||
smudgeFactor = kingpin.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int()
|
|
||||||
clusterConsistency = kingpin.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String()
|
|
||||||
clusterTimeout = kingpin.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int()
|
|
||||||
clusterNumConnections = kingpin.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int()
|
|
||||||
clusterCQLVersion = kingpin.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
|
|
||||||
clusterPageSize = kingpin.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
|
|
||||||
keyspace = kingpin.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
|
|
||||||
|
|
||||||
userName = kingpin.Flag("username", "Username to use when connecting to the cluster").String()
|
|
||||||
password = kingpin.Flag("password", "Password to use when connecting to the cluster").String()
|
|
||||||
|
|
||||||
skipSuccessorTable = kingpin.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool()
|
|
||||||
skipObjectsTable = kingpin.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool()
|
|
||||||
skipLedgerHashesTable = kingpin.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool()
|
|
||||||
skipTransactionsTable = kingpin.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool()
|
|
||||||
skipDiffTable = kingpin.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool()
|
|
||||||
skipLedgerTransactionsTable = kingpin.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
|
|
||||||
skipLedgersTable = kingpin.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
|
|
||||||
skipWriteLatestLedger = kingpin.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
|
|
||||||
|
|
||||||
workerCount = 1 // the calculated number of parallel goroutines the client should run
|
|
||||||
ranges []*tokenRange // the calculated ranges to be executed in parallel
|
|
||||||
)
|
|
||||||
|
|
||||||
type tokenRange struct {
|
|
||||||
StartRange int64
|
|
||||||
EndRange int64
|
|
||||||
}
|
|
||||||
|
|
||||||
type deleteParams struct {
|
|
||||||
Seq uint64
|
|
||||||
Blob []byte // hash, key, etc
|
|
||||||
}
|
|
||||||
|
|
||||||
type columnSettings struct {
|
|
||||||
UseSeq bool
|
|
||||||
UseBlob bool
|
|
||||||
}
|
|
||||||
|
|
||||||
type deleteInfo struct {
|
|
||||||
Query string
|
|
||||||
Data []deleteParams
|
|
||||||
}
|
|
||||||
|
|
||||||
func getTokenRanges() []*tokenRange {
|
|
||||||
var n = workerCount
|
|
||||||
var m = int64(n * 100)
|
|
||||||
var maxSize uint64 = math.MaxInt64 * 2
|
|
||||||
var rangeSize = maxSize / uint64(m)
|
|
||||||
|
|
||||||
var start int64 = math.MinInt64
|
|
||||||
var end int64
|
|
||||||
var shouldBreak = false
|
|
||||||
|
|
||||||
var ranges = make([]*tokenRange, m)
|
|
||||||
|
|
||||||
for i := int64(0); i < m; i++ {
|
|
||||||
end = start + int64(rangeSize)
|
|
||||||
if start > 0 && end < 0 {
|
|
||||||
end = math.MaxInt64
|
|
||||||
shouldBreak = true
|
|
||||||
}
|
|
||||||
|
|
||||||
ranges[i] = &tokenRange{StartRange: start, EndRange: end}
|
|
||||||
|
|
||||||
if shouldBreak {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
start = end + 1
|
|
||||||
}
|
|
||||||
|
|
||||||
return ranges
|
|
||||||
}
|
|
||||||
|
|
||||||
func splitDeleteWork(info *deleteInfo) [][]deleteParams {
|
|
||||||
var n = workerCount
|
|
||||||
var chunkSize = len(info.Data) / n
|
|
||||||
var chunks [][]deleteParams
|
|
||||||
|
|
||||||
if len(info.Data) == 0 {
|
|
||||||
return chunks
|
|
||||||
}
|
|
||||||
|
|
||||||
if chunkSize < 1 {
|
|
||||||
chunks = append(chunks, info.Data)
|
|
||||||
return chunks
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < len(info.Data); i += chunkSize {
|
|
||||||
end := i + chunkSize
|
|
||||||
|
|
||||||
if end > len(info.Data) {
|
|
||||||
end = len(info.Data)
|
|
||||||
}
|
|
||||||
|
|
||||||
chunks = append(chunks, info.Data[i:end])
|
|
||||||
}
|
|
||||||
|
|
||||||
return chunks
|
|
||||||
}
|
|
||||||
|
|
||||||
func shuffle(data []*tokenRange) {
|
|
||||||
for i := 1; i < len(data); i++ {
|
|
||||||
r := rand.Intn(i + 1)
|
|
||||||
if i != r {
|
|
||||||
data[r], data[i] = data[i], data[r]
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getConsistencyLevel(consistencyValue string) gocql.Consistency {
|
|
||||||
switch consistencyValue {
|
|
||||||
case "any":
|
|
||||||
return gocql.Any
|
|
||||||
case "one":
|
|
||||||
return gocql.One
|
|
||||||
case "two":
|
|
||||||
return gocql.Two
|
|
||||||
case "three":
|
|
||||||
return gocql.Three
|
|
||||||
case "quorum":
|
|
||||||
return gocql.Quorum
|
|
||||||
case "all":
|
|
||||||
return gocql.All
|
|
||||||
case "localquorum":
|
|
||||||
return gocql.LocalQuorum
|
|
||||||
case "eachquorum":
|
|
||||||
return gocql.EachQuorum
|
|
||||||
case "localone":
|
|
||||||
return gocql.LocalOne
|
|
||||||
default:
|
|
||||||
return gocql.One
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
log.SetOutput(os.Stdout)
|
|
||||||
kingpin.Parse()
|
|
||||||
|
|
||||||
workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
|
|
||||||
ranges = getTokenRanges()
|
|
||||||
shuffle(ranges)
|
|
||||||
|
|
||||||
hosts := strings.Split(*clusterHosts, ",")
|
|
||||||
|
|
||||||
cluster := gocql.NewCluster(hosts...)
|
|
||||||
cluster.Consistency = getConsistencyLevel(*clusterConsistency)
|
|
||||||
cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000)
|
|
||||||
cluster.NumConns = *clusterNumConnections
|
|
||||||
cluster.CQLVersion = *clusterCQLVersion
|
|
||||||
cluster.PageSize = *clusterPageSize
|
|
||||||
cluster.Keyspace = *keyspace
|
|
||||||
|
|
||||||
if *userName != "" {
|
|
||||||
cluster.Authenticator = gocql.PasswordAuthenticator{
|
|
||||||
Username: *userName,
|
|
||||||
Password: *password,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if *earliestLedgerIdx == 0 {
|
|
||||||
log.Println("Please specify ledger index to delete from")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
runParameters := fmt.Sprintf(`
|
|
||||||
Execution Parameters:
|
|
||||||
=====================
|
|
||||||
|
|
||||||
Range to be deleted : %d -> latest
|
|
||||||
Scylla cluster nodes : %s
|
|
||||||
Keyspace : %s
|
|
||||||
Consistency : %s
|
|
||||||
Timeout (ms) : %d
|
|
||||||
Connections per host : %d
|
|
||||||
CQL Version : %s
|
|
||||||
Page size : %d
|
|
||||||
# of parallel threads : %d
|
|
||||||
# of ranges to be executed : %d
|
|
||||||
|
|
||||||
Skip deletion of:
|
|
||||||
- successor table : %t
|
|
||||||
- objects table : %t
|
|
||||||
- ledger_hashes table : %t
|
|
||||||
- transactions table : %t
|
|
||||||
- diff table : %t
|
|
||||||
- ledger_transactions table : %t
|
|
||||||
- ledgers table : %t
|
|
||||||
|
|
||||||
Will rite latest ledger : %t
|
|
||||||
|
|
||||||
`,
|
|
||||||
*earliestLedgerIdx,
|
|
||||||
*clusterHosts,
|
|
||||||
*keyspace,
|
|
||||||
*clusterConsistency,
|
|
||||||
cluster.Timeout/1000/1000,
|
|
||||||
*clusterNumConnections,
|
|
||||||
*clusterCQLVersion,
|
|
||||||
*clusterPageSize,
|
|
||||||
workerCount,
|
|
||||||
len(ranges),
|
|
||||||
*skipSuccessorTable,
|
|
||||||
*skipObjectsTable,
|
|
||||||
*skipLedgerHashesTable,
|
|
||||||
*skipTransactionsTable,
|
|
||||||
*skipDiffTable,
|
|
||||||
*skipLedgerTransactionsTable,
|
|
||||||
*skipLedgersTable,
|
|
||||||
!*skipWriteLatestLedger)
|
|
||||||
|
|
||||||
fmt.Println(runParameters)
|
|
||||||
|
|
||||||
log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *earliestLedgerIdx)
|
|
||||||
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
|
|
||||||
log.Println("Are you sure you want to continue? (y/n)")
|
|
||||||
|
|
||||||
var continueFlag string
|
|
||||||
if fmt.Scanln(&continueFlag); continueFlag != "y" {
|
|
||||||
log.Println("Aborting...")
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
startTime := time.Now().UTC()
|
|
||||||
|
|
||||||
earliestLedgerIdxInDB, latestLedgerIdxInDB, err := getLedgerRange(cluster)
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if earliestLedgerIdxInDB > *earliestLedgerIdx {
|
|
||||||
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
|
|
||||||
}
|
|
||||||
|
|
||||||
if latestLedgerIdxInDB < *earliestLedgerIdx {
|
|
||||||
log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := deleteLedgerData(cluster, *earliestLedgerIdx+1, latestLedgerIdxInDB); err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
|
|
||||||
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
|
|
||||||
}
|
|
||||||
|
|
||||||
func getLedgerRange(cluster *gocql.ClusterConfig) (uint64, uint64, error) {
|
|
||||||
var (
|
|
||||||
firstLedgerIdx uint64
|
|
||||||
latestLedgerIdx uint64
|
|
||||||
)
|
|
||||||
|
|
||||||
session, err := cluster.CreateSession()
|
|
||||||
if err != nil {
|
|
||||||
log.Fatal(err)
|
|
||||||
}
|
|
||||||
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
if err := session.Query("select sequence from ledger_range where is_latest = ?", false).Scan(&firstLedgerIdx); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := session.Query("select sequence from ledger_range where is_latest = ?", true).Scan(&latestLedgerIdx); err != nil {
|
|
||||||
return 0, 0, err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("DB ledger range is %d:%d\n", firstLedgerIdx, latestLedgerIdx)
|
|
||||||
return firstLedgerIdx, latestLedgerIdx, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func deleteLedgerData(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, toLedgerIdx uint64) error {
|
|
||||||
var totalErrors uint64
|
|
||||||
var totalRows uint64
|
|
||||||
var totalDeletes uint64
|
|
||||||
|
|
||||||
var info deleteInfo
|
|
||||||
var rowsCount uint64
|
|
||||||
var deleteCount uint64
|
|
||||||
var errCount uint64
|
|
||||||
|
|
||||||
log.Printf("Start scanning and removing data for %d -> latest (%d according to ledger_range table)\n\n", fromLedgerIdx, toLedgerIdx)
|
|
||||||
|
|
||||||
// successor queries
|
|
||||||
if !*skipSuccessorTable {
|
|
||||||
log.Println("Generating delete queries for successor table")
|
|
||||||
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
|
|
||||||
"SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?",
|
|
||||||
"DELETE FROM successor WHERE key = ? AND seq = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n", len(info.Data))
|
|
||||||
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
|
||||||
totalErrors += errCount
|
|
||||||
totalRows += rowsCount
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// objects queries
|
|
||||||
if !*skipObjectsTable {
|
|
||||||
log.Println("Generating delete queries for objects table")
|
|
||||||
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
|
|
||||||
"SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?",
|
|
||||||
"DELETE FROM objects WHERE key = ? AND sequence = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n", len(info.Data))
|
|
||||||
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
|
||||||
totalErrors += errCount
|
|
||||||
totalRows += rowsCount
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// ledger_hashes queries
|
|
||||||
if !*skipLedgerHashesTable {
|
|
||||||
log.Println("Generating delete queries for ledger_hashes table")
|
|
||||||
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
|
|
||||||
"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?",
|
|
||||||
"DELETE FROM ledger_hashes WHERE hash = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n", len(info.Data))
|
|
||||||
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
|
||||||
totalErrors += errCount
|
|
||||||
totalRows += rowsCount
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// transactions queries
|
|
||||||
if !*skipTransactionsTable {
|
|
||||||
log.Println("Generating delete queries for transactions table")
|
|
||||||
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
|
|
||||||
"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?",
|
|
||||||
"DELETE FROM transactions WHERE hash = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n", len(info.Data))
|
|
||||||
log.Printf("Total traversed rows: %d\n\n", rowsCount)
|
|
||||||
totalErrors += errCount
|
|
||||||
totalRows += rowsCount
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// diff queries
|
|
||||||
if !*skipDiffTable {
|
|
||||||
log.Println("Generating delete queries for diff table")
|
|
||||||
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
|
||||||
"DELETE FROM diff WHERE seq = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// ledger_transactions queries
|
|
||||||
if !*skipLedgerTransactionsTable {
|
|
||||||
log.Println("Generating delete queries for ledger_transactions table")
|
|
||||||
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
|
||||||
"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// ledgers queries
|
|
||||||
if !*skipLedgersTable {
|
|
||||||
log.Println("Generating delete queries for ledgers table")
|
|
||||||
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
|
|
||||||
"DELETE FROM ledgers WHERE sequence = ?")
|
|
||||||
log.Printf("Total delete queries: %d\n\n", len(info.Data))
|
|
||||||
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
|
|
||||||
totalErrors += errCount
|
|
||||||
totalDeletes += deleteCount
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO: tbd what to do with account_tx as it got tuple for seq_idx
|
|
||||||
// TODO: also, whether we need to take care of nft tables and other stuff like that
|
|
||||||
|
|
||||||
if !*skipWriteLatestLedger {
|
|
||||||
if err := updateLedgerRange(cluster, fromLedgerIdx-1); err != nil {
|
|
||||||
log.Printf("ERROR failed updating ledger range: %s\n", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("Updated latest ledger to %d in ledger_range table\n\n", fromLedgerIdx-1)
|
|
||||||
}
|
|
||||||
|
|
||||||
log.Printf("TOTAL ERRORS: %d\n", totalErrors)
|
|
||||||
log.Printf("TOTAL ROWS TRAVERSED: %d\n", totalRows)
|
|
||||||
log.Printf("TOTAL DELETES: %d\n\n", totalDeletes)
|
|
||||||
|
|
||||||
log.Printf("Completed deletion for %d -> %d\n\n", fromLedgerIdx, toLedgerIdx)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareSimpleDeleteQueries(fromLedgerIdx uint64, toLedgerIdx uint64, deleteQueryTemplate string) deleteInfo {
|
|
||||||
var info = deleteInfo{Query: deleteQueryTemplate}
|
|
||||||
|
|
||||||
// Note: we deliberately add 1 extra ledger to make sure we delete any data Clio might have written
|
|
||||||
// if it crashed or was stopped in the middle of writing just before it wrote ledger_range.
|
|
||||||
for i := fromLedgerIdx; i <= toLedgerIdx+1; i++ {
|
|
||||||
info.Data = append(info.Data, deleteParams{Seq: i})
|
|
||||||
}
|
|
||||||
|
|
||||||
return info
|
|
||||||
}
|
|
||||||
|
|
||||||
func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, queryTemplate string, deleteQueryTemplate string) (deleteInfo, uint64, uint64) {
|
|
||||||
rangesChannel := make(chan *tokenRange, len(ranges))
|
|
||||||
for i := range ranges {
|
|
||||||
rangesChannel <- ranges[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
close(rangesChannel)
|
|
||||||
|
|
||||||
outChannel := make(chan deleteParams)
|
|
||||||
var info = deleteInfo{Query: deleteQueryTemplate}
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for params := range outChannel {
|
|
||||||
info.Data = append(info.Data, params)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var sessionCreationWaitGroup sync.WaitGroup
|
|
||||||
var totalRows uint64
|
|
||||||
var totalErrors uint64
|
|
||||||
|
|
||||||
wg.Add(workerCount)
|
|
||||||
sessionCreationWaitGroup.Add(workerCount)
|
|
||||||
|
|
||||||
for i := 0; i < workerCount; i++ {
|
|
||||||
go func(q string) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
var session *gocql.Session
|
|
||||||
var err error
|
|
||||||
if session, err = cluster.CreateSession(); err == nil {
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
sessionCreationWaitGroup.Done()
|
|
||||||
sessionCreationWaitGroup.Wait()
|
|
||||||
preparedQuery := session.Query(q)
|
|
||||||
|
|
||||||
for r := range rangesChannel {
|
|
||||||
preparedQuery.Bind(r.StartRange, r.EndRange)
|
|
||||||
|
|
||||||
var pageState []byte
|
|
||||||
var rowsRetrieved uint64
|
|
||||||
|
|
||||||
for {
|
|
||||||
iter := preparedQuery.PageSize(*clusterPageSize).PageState(pageState).Iter()
|
|
||||||
nextPageState := iter.PageState()
|
|
||||||
scanner := iter.Scanner()
|
|
||||||
|
|
||||||
for scanner.Next() {
|
|
||||||
var key []byte
|
|
||||||
var seq uint64
|
|
||||||
|
|
||||||
err = scanner.Scan(&key, &seq)
|
|
||||||
if err == nil {
|
|
||||||
rowsRetrieved++
|
|
||||||
|
|
||||||
// only grab the rows that are in the correct range of sequence numbers
|
|
||||||
if fromLedgerIdx <= seq {
|
|
||||||
outChannel <- deleteParams{Seq: seq, Blob: key}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("ERROR: page iteration failed: %s\n", err)
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState))
|
|
||||||
atomic.AddUint64(&totalErrors, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(nextPageState) == 0 {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
|
|
||||||
pageState = nextPageState
|
|
||||||
}
|
|
||||||
|
|
||||||
atomic.AddUint64(&totalRows, rowsRetrieved)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("ERROR: %s\n", err)
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
|
||||||
atomic.AddUint64(&totalErrors, 1)
|
|
||||||
}
|
|
||||||
}(queryTemplate)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
close(outChannel)
|
|
||||||
|
|
||||||
return info, totalRows, totalErrors
|
|
||||||
}
|
|
||||||
|
|
||||||
func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSettings columnSettings) (uint64, uint64) {
|
|
||||||
var wg sync.WaitGroup
|
|
||||||
var sessionCreationWaitGroup sync.WaitGroup
|
|
||||||
var totalDeletes uint64
|
|
||||||
var totalErrors uint64
|
|
||||||
|
|
||||||
chunks := splitDeleteWork(info)
|
|
||||||
chunksChannel := make(chan []deleteParams, len(chunks))
|
|
||||||
for i := range chunks {
|
|
||||||
chunksChannel <- chunks[i]
|
|
||||||
}
|
|
||||||
|
|
||||||
close(chunksChannel)
|
|
||||||
|
|
||||||
wg.Add(workerCount)
|
|
||||||
sessionCreationWaitGroup.Add(workerCount)
|
|
||||||
|
|
||||||
query := info.Query
|
|
||||||
bindCount := strings.Count(query, "?")
|
|
||||||
|
|
||||||
for i := 0; i < workerCount; i++ {
|
|
||||||
go func(number int, q string, bc int) {
|
|
||||||
defer wg.Done()
|
|
||||||
|
|
||||||
var session *gocql.Session
|
|
||||||
var err error
|
|
||||||
if session, err = cluster.CreateSession(); err == nil {
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
sessionCreationWaitGroup.Done()
|
|
||||||
sessionCreationWaitGroup.Wait()
|
|
||||||
preparedQuery := session.Query(q)
|
|
||||||
|
|
||||||
for chunk := range chunksChannel {
|
|
||||||
for _, r := range chunk {
|
|
||||||
if bc == 2 {
|
|
||||||
preparedQuery.Bind(r.Blob, r.Seq)
|
|
||||||
} else if bc == 1 {
|
|
||||||
if colSettings.UseSeq {
|
|
||||||
preparedQuery.Bind(r.Seq)
|
|
||||||
} else if colSettings.UseBlob {
|
|
||||||
preparedQuery.Bind(r.Blob)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := preparedQuery.Exec(); err != nil {
|
|
||||||
log.Printf("DELETE ERROR: %s\n", err)
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
|
|
||||||
atomic.AddUint64(&totalErrors, 1)
|
|
||||||
} else {
|
|
||||||
atomic.AddUint64(&totalDeletes, 1)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
log.Printf("ERROR: %s\n", err)
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
|
||||||
atomic.AddUint64(&totalErrors, 1)
|
|
||||||
}
|
|
||||||
}(i, query, bindCount)
|
|
||||||
}
|
|
||||||
|
|
||||||
wg.Wait()
|
|
||||||
return totalDeletes, totalErrors
|
|
||||||
}
|
|
||||||
|
|
||||||
func updateLedgerRange(cluster *gocql.ClusterConfig, ledgerIndex uint64) error {
|
|
||||||
log.Printf("Updating latest ledger to %d\n", ledgerIndex)
|
|
||||||
|
|
||||||
if session, err := cluster.CreateSession(); err == nil {
|
|
||||||
defer session.Close()
|
|
||||||
|
|
||||||
query := "UPDATE ledger_range SET sequence = ? WHERE is_latest = ?"
|
|
||||||
preparedQuery := session.Query(query, ledgerIndex, true)
|
|
||||||
if err := preparedQuery.Exec(); err != nil {
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][true]\n", query, ledgerIndex)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
Reference in New Issue
Block a user