Fix paging bug in range deletion tool (#1150)

This commit is contained in:
Alex Kremer
2024-02-02 16:29:17 +00:00
committed by GitHub
parent ecfe5e84e5
commit 8f89a5913d

View File

@@ -42,8 +42,17 @@ var (
userName = kingpin.Flag("username", "Username to use when connecting to the cluster").String()
password = kingpin.Flag("password", "Password to use when connecting to the cluster").String()
numberOfParallelClientThreads = 1 // the calculated number of parallel threads the client should run
ranges []*tokenRange // the calculated ranges to be executed in parallel
skipSuccessorTable = kingpin.Flag("skip-successor", "Whether to skip deletion from successor table").Default("false").Bool()
skipObjectsTable = kingpin.Flag("skip-objects", "Whether to skip deletion from objects table").Default("false").Bool()
skipLedgerHashesTable = kingpin.Flag("skip-ledger-hashes", "Whether to skip deletion from ledger_hashes table").Default("false").Bool()
skipTransactionsTable = kingpin.Flag("skip-transactions", "Whether to skip deletion from transactions table").Default("false").Bool()
skipDiffTable = kingpin.Flag("skip-diff", "Whether to skip deletion from diff table").Default("false").Bool()
skipLedgerTransactionsTable = kingpin.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
skipLedgersTable = kingpin.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
skipWriteLatestLedger = kingpin.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
workerCount = 1 // the calculated number of parallel goroutines the client should run
ranges []*tokenRange // the calculated ranges to be executed in parallel
)
type tokenRange struct {
@@ -67,7 +76,7 @@ type deleteInfo struct {
}
func getTokenRanges() []*tokenRange {
var n = numberOfParallelClientThreads
var n = workerCount
var m = int64(n * 100)
var maxSize uint64 = math.MaxInt64 * 2
var rangeSize = maxSize / uint64(m)
@@ -98,7 +107,7 @@ func getTokenRanges() []*tokenRange {
}
func splitDeleteWork(info *deleteInfo) [][]deleteParams {
var n = numberOfParallelClientThreads
var n = workerCount
var chunkSize = len(info.Data) / n
var chunks [][]deleteParams
@@ -159,9 +168,10 @@ func getConsistencyLevel(consistencyValue string) gocql.Consistency {
}
func main() {
log.SetOutput(os.Stdout)
kingpin.Parse()
numberOfParallelClientThreads = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
workerCount = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
ranges = getTokenRanges()
shuffle(ranges)
@@ -202,7 +212,36 @@ Page size : %d
# of parallel threads : %d
# of ranges to be executed : %d
`, *earliestLedgerIdx, *clusterHosts, *keyspace, *clusterConsistency, cluster.Timeout/1000/1000, *clusterNumConnections, *clusterCQLVersion, *clusterPageSize, numberOfParallelClientThreads, len(ranges))
Skip deletion of:
- successor table : %t
- objects table : %t
- ledger_hashes table : %t
- transactions table : %t
- diff table : %t
- ledger_transactions table : %t
- ledgers table : %t
Will rite latest ledger : %t
`,
*earliestLedgerIdx,
*clusterHosts,
*keyspace,
*clusterConsistency,
cluster.Timeout/1000/1000,
*clusterNumConnections,
*clusterCQLVersion,
*clusterPageSize,
workerCount,
len(ranges),
*skipSuccessorTable,
*skipObjectsTable,
*skipLedgerHashesTable,
*skipTransactionsTable,
*skipDiffTable,
*skipLedgerTransactionsTable,
*skipLedgersTable,
!*skipWriteLatestLedger)
fmt.Println(runParameters)
@@ -277,94 +316,108 @@ func deleteLedgerData(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, toLedg
log.Printf("Start scanning and removing data for %d -> latest (%d according to ledger_range table)\n\n", fromLedgerIdx, toLedgerIdx)
// successor queries
log.Println("Generating delete queries for successor table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, seq FROM successor WHERE token(key) >= %s AND token(key) <= %s",
"DELETE FROM successor WHERE key = ? AND seq = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// diff queries
log.Println("Generating delete queries for diff table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, seq FROM diff WHERE token(seq) >= %s AND token(seq) <= %s",
"DELETE FROM diff WHERE key = ? AND seq = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipSuccessorTable {
log.Println("Generating delete queries for successor table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?",
"DELETE FROM successor WHERE key = ? AND seq = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
}
// objects queries
log.Println("Generating delete queries for objects table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, sequence FROM objects WHERE token(key) >= %s AND token(key) <= %s",
"DELETE FROM objects WHERE key = ? AND sequence = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipObjectsTable {
log.Println("Generating delete queries for objects table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?",
"DELETE FROM objects WHERE key = ? AND sequence = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
}
// ledger_hashes queries
log.Println("Generating delete queries for ledger_hashes table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= %s AND token(hash) <= %s",
"DELETE FROM ledger_hashes WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipLedgerHashesTable {
log.Println("Generating delete queries for ledger_hashes table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?",
"DELETE FROM ledger_hashes WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
}
// transactions queries
log.Println("Generating delete queries for transactions table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= %s AND token(hash) <= %s",
"DELETE FROM transactions WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipTransactionsTable {
log.Println("Generating delete queries for transactions table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?",
"DELETE FROM transactions WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
}
// diff queries
if !*skipDiffTable {
log.Println("Generating delete queries for diff table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM diff WHERE seq = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
}
// ledger_transactions queries
log.Println("Generating delete queries for ledger_transactions table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipLedgerTransactionsTable {
log.Println("Generating delete queries for ledger_transactions table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
}
// ledgers queries
log.Println("Generating delete queries for ledgers table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledgers WHERE sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
if !*skipLedgersTable {
log.Println("Generating delete queries for ledgers table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledgers WHERE sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
}
// TODO: tbd what to do with account_tx as it got tuple for seq_idx
// TODO: also, whether we need to take care of nft tables and other stuff like that
if err := updateLedgerRange(cluster, fromLedgerIdx-1); err != nil {
log.Printf("ERROR failed updating ledger range: %s\n", err)
return err
if !*skipWriteLatestLedger {
if err := updateLedgerRange(cluster, fromLedgerIdx-1); err != nil {
log.Printf("ERROR failed updating ledger range: %s\n", err)
return err
}
log.Printf("Updated latest ledger to %d in ledger_range table\n\n", fromLedgerIdx-1)
}
log.Printf("TOTAL ERRORS: %d\n", totalErrors)
@@ -410,11 +463,11 @@ func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, qu
var totalRows uint64
var totalErrors uint64
wg.Add(numberOfParallelClientThreads)
sessionCreationWaitGroup.Add(numberOfParallelClientThreads)
wg.Add(workerCount)
sessionCreationWaitGroup.Add(workerCount)
for i := 0; i < numberOfParallelClientThreads; i++ {
go func() {
for i := 0; i < workerCount; i++ {
go func(q string) {
defer wg.Done()
var session *gocql.Session
@@ -424,30 +477,42 @@ func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, qu
sessionCreationWaitGroup.Done()
sessionCreationWaitGroup.Wait()
preparedQueryString := fmt.Sprintf(queryTemplate, "?", "?")
preparedQuery := session.Query(preparedQueryString)
preparedQuery := session.Query(q)
for r := range rangesChannel {
preparedQuery.Bind(r.StartRange, r.EndRange)
iter := preparedQuery.Iter()
var pageState []byte
var rowsRetrieved uint64
var key []byte
var seq uint64
var rowsRetrieved uint64
for iter.Scan(&key, &seq) {
rowsRetrieved++
for {
iter := preparedQuery.PageSize(*clusterPageSize).PageState(pageState).Iter()
nextPageState := iter.PageState()
scanner := iter.Scanner()
// only grab the rows that are in the correct range of sequence numbers
if fromLedgerIdx <= seq {
outChannel <- deleteParams{Seq: seq, Blob: key}
for scanner.Next() {
err = scanner.Scan(&key, &seq)
if err == nil {
rowsRetrieved++
// only grab the rows that are in the correct range of sequence numbers
if fromLedgerIdx <= seq {
outChannel <- deleteParams{Seq: seq, Blob: key}
}
} else {
log.Printf("ERROR: page iteration failed: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState))
atomic.AddUint64(&totalErrors, 1)
}
}
}
if err := iter.Close(); err != nil {
log.Printf("ERROR: iteration failed: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d]", preparedQueryString, r.StartRange, r.EndRange))
atomic.AddUint64(&totalErrors, 1)
if len(nextPageState) == 0 {
break
}
pageState = nextPageState
}
atomic.AddUint64(&totalRows, rowsRetrieved)
@@ -457,7 +522,7 @@ func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, qu
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
}
}()
}(queryTemplate)
}
wg.Wait()
@@ -480,11 +545,14 @@ func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSet
close(chunksChannel)
wg.Add(len(chunks))
sessionCreationWaitGroup.Add(len(chunks))
wg.Add(workerCount)
sessionCreationWaitGroup.Add(workerCount)
for i := 0; i < len(chunks); i++ {
go func() {
query := info.Query
bindCount := strings.Count(query, "?")
for i := 0; i < workerCount; i++ {
go func(number int, q string, bc int) {
defer wg.Done()
var session *gocql.Session
@@ -494,15 +562,13 @@ func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSet
sessionCreationWaitGroup.Done()
sessionCreationWaitGroup.Wait()
preparedQuery := session.Query(info.Query)
var bindCount = strings.Count(info.Query, "?")
preparedQuery := session.Query(q)
for chunk := range chunksChannel {
for _, r := range chunk {
if bindCount == 2 {
if bc == 2 {
preparedQuery.Bind(r.Blob, r.Seq)
} else if bindCount == 1 {
} else if bc == 1 {
if colSettings.UseSeq {
preparedQuery.Bind(r.Seq)
} else if colSettings.UseBlob {
@@ -524,7 +590,7 @@ func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSet
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
}
}()
}(i, query, bindCount)
}
wg.Wait()