mirror of
				https://github.com/XRPLF/clio.git
				synced 2025-11-04 11:55:51 +00:00 
			
		
		
		
	
							
								
								
									
										120
									
								
								tools/cassandra_delete_range/cassandra_delete_range.go
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							
							
						
						
									
										120
									
								
								tools/cassandra_delete_range/cassandra_delete_range.go
									
									
									
									
									
										
										
										Normal file → Executable file
									
								
							@@ -5,9 +5,11 @@
 | 
			
		||||
package main
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"bufio"
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"log"
 | 
			
		||||
	"os"
 | 
			
		||||
	"strconv"
 | 
			
		||||
	"strings"
 | 
			
		||||
	"time"
 | 
			
		||||
	"xrplf/clio/cassandra_delete_range/internal/cass"
 | 
			
		||||
@@ -33,7 +35,7 @@ var (
 | 
			
		||||
	deleteBefore          = app.Command("delete-before", "Prunes everything before the given ledger index")
 | 
			
		||||
	deleteBeforeLedgerIdx = deleteBefore.Arg("idx", "Sets the latest ledger_index to keep around (delete everything before this ledger index)").Required().Uint64()
 | 
			
		||||
 | 
			
		||||
	getLedgerRange = app.Command("get-ledger-range", "Fetch the current lender_range table values")
 | 
			
		||||
	getLedgerRange = app.Command("get-ledger-range", "Fetch the current ledger_range table values")
 | 
			
		||||
 | 
			
		||||
	nodesInCluster        = app.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
 | 
			
		||||
	coresInNode           = app.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
 | 
			
		||||
@@ -44,6 +46,7 @@ var (
 | 
			
		||||
	clusterCQLVersion     = app.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
 | 
			
		||||
	clusterPageSize       = app.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
 | 
			
		||||
	keyspace              = app.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
 | 
			
		||||
	resume                = app.Flag("resume", "Whether to resume deletion from the previous command due to something crashing").Default("false").Bool()
 | 
			
		||||
 | 
			
		||||
	userName = app.Flag("username", "Username to use when connecting to the cluster").String()
 | 
			
		||||
	password = app.Flag("password", "Password to use when connecting to the cluster").String()
 | 
			
		||||
@@ -56,9 +59,11 @@ var (
 | 
			
		||||
	skipLedgerTransactionsTable = app.Flag("skip-ledger-transactions", "Whether to skip deletion from ledger_transactions table").Default("false").Bool()
 | 
			
		||||
	skipLedgersTable            = app.Flag("skip-ledgers", "Whether to skip deletion from ledgers table").Default("false").Bool()
 | 
			
		||||
	skipWriteLatestLedger       = app.Flag("skip-write-latest-ledger", "Whether to skip writing the latest ledger index").Default("false").Bool()
 | 
			
		||||
	skipAccTransactionsTable    = app.Flag("skip-account-transactions", "Whether to skip deletion from account_transactions table").Default("false").Bool()
 | 
			
		||||
 | 
			
		||||
	workerCount = 1                // the calculated number of parallel goroutines the client should run
 | 
			
		||||
	ranges      []*util.TokenRange // the calculated ranges to be executed in parallel
 | 
			
		||||
	workerCount        = 1                // the calculated number of parallel goroutines the client should run
 | 
			
		||||
	ranges             []*util.TokenRange // the calculated ranges to be executed in parallel
 | 
			
		||||
	ledgerOrTokenRange *util.StoredRange  // mapping of startRange -> endRange. Used for resume deletion
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
func main() {
 | 
			
		||||
@@ -70,6 +75,11 @@ func main() {
 | 
			
		||||
		log.Fatal(err)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	cmd := strings.Join(os.Args[1:], " ")
 | 
			
		||||
	if *resume {
 | 
			
		||||
		prepareResume(&cmd)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	clioCass := cass.NewClioCass(&cass.Settings{
 | 
			
		||||
		SkipSuccessorTable:          *skipSuccessorTable,
 | 
			
		||||
		SkipObjectsTable:            *skipObjectsTable,
 | 
			
		||||
@@ -79,8 +89,11 @@ func main() {
 | 
			
		||||
		SkipLedgerTransactionsTable: *skipLedgerHashesTable,
 | 
			
		||||
		SkipLedgersTable:            *skipLedgersTable,
 | 
			
		||||
		SkipWriteLatestLedger:       *skipWriteLatestLedger,
 | 
			
		||||
		SkipAccTransactionsTable:    *skipAccTransactionsTable,
 | 
			
		||||
		WorkerCount:                 workerCount,
 | 
			
		||||
		Ranges:                      ranges}, cluster)
 | 
			
		||||
		Ranges:                      ranges,
 | 
			
		||||
		RangesRead:                  ledgerOrTokenRange,
 | 
			
		||||
		Command:                     cmd}, cluster)
 | 
			
		||||
 | 
			
		||||
	switch command {
 | 
			
		||||
	case deleteAfter.FullCommand():
 | 
			
		||||
@@ -157,6 +170,7 @@ Skip deletion of:
 | 
			
		||||
- diff table                  : %t
 | 
			
		||||
- ledger_transactions table   : %t
 | 
			
		||||
- ledgers table               : %t
 | 
			
		||||
- account_tx table            : %t
 | 
			
		||||
 | 
			
		||||
Will update ledger_range      : %t
 | 
			
		||||
 | 
			
		||||
@@ -179,7 +193,9 @@ Will update ledger_range      : %t
 | 
			
		||||
		*skipDiffTable,
 | 
			
		||||
		*skipLedgerTransactionsTable,
 | 
			
		||||
		*skipLedgersTable,
 | 
			
		||||
		!*skipWriteLatestLedger)
 | 
			
		||||
		*skipAccTransactionsTable,
 | 
			
		||||
		!*skipWriteLatestLedger,
 | 
			
		||||
	)
 | 
			
		||||
 | 
			
		||||
	fmt.Println(runParameters)
 | 
			
		||||
}
 | 
			
		||||
@@ -208,3 +224,97 @@ func prepareDb(dbHosts *string) (*gocql.ClusterConfig, error) {
 | 
			
		||||
 | 
			
		||||
	return cluster, nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func prepareResume(cmd *string) {
 | 
			
		||||
	// format of file continue.txt is
 | 
			
		||||
	/*
 | 
			
		||||
	 Previous user command (must match the same command to resume deletion)
 | 
			
		||||
	 Table name (ie. objects, ledger_hashes etc)
 | 
			
		||||
	 Values of token_ranges (each pair of values seperated line by line)
 | 
			
		||||
	*/
 | 
			
		||||
 | 
			
		||||
	file, err := os.Open("continue.txt")
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatal("continue.txt does not exist. Aborted")
 | 
			
		||||
	}
 | 
			
		||||
	defer file.Close()
 | 
			
		||||
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		log.Fatalf("Failed to open file: %v", err)
 | 
			
		||||
	}
 | 
			
		||||
	scanner := bufio.NewScanner(file)
 | 
			
		||||
	scanner.Scan()
 | 
			
		||||
 | 
			
		||||
	// --resume must be last flag passed; so can check command matches
 | 
			
		||||
	if os.Args[len(os.Args)-1] != "--resume" {
 | 
			
		||||
		log.Fatal("--resume must be the last flag passed")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// get rid of --resume at the end
 | 
			
		||||
	*cmd = strings.Join(os.Args[1:len(os.Args)-1], " ")
 | 
			
		||||
 | 
			
		||||
	// makes sure command that got aborted matches the user command they enter
 | 
			
		||||
	if scanner.Text() != *cmd {
 | 
			
		||||
		log.Fatalf("File continue.txt has %s command stored. \n You provided %s which does not match. \n Aborting...", scanner.Text(), *cmd)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scanner.Scan()
 | 
			
		||||
	// skip the neccessary tables based on where the program aborted
 | 
			
		||||
	// for example if account_tx, all tables before account_tx
 | 
			
		||||
	// should be already deleted so we skip for deletion
 | 
			
		||||
	tableFound := false
 | 
			
		||||
	switch scanner.Text() {
 | 
			
		||||
	case "account_tx":
 | 
			
		||||
		*skipLedgersTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "ledgers":
 | 
			
		||||
		*skipLedgerTransactionsTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "ledger_transactions":
 | 
			
		||||
		*skipDiffTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "diff":
 | 
			
		||||
		*skipTransactionsTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "transactions":
 | 
			
		||||
		*skipLedgerHashesTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "ledger_hashes":
 | 
			
		||||
		*skipObjectsTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "objects":
 | 
			
		||||
		*skipSuccessorTable = true
 | 
			
		||||
		fallthrough
 | 
			
		||||
	case "successor":
 | 
			
		||||
		tableFound = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !tableFound {
 | 
			
		||||
		log.Fatalf("Invalid table: %s", scanner.Text())
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	scanner.Scan()
 | 
			
		||||
	rangeRead := make(map[int64]int64)
 | 
			
		||||
 | 
			
		||||
	// now go through all the ledger range and load it to a set
 | 
			
		||||
	for scanner.Scan() {
 | 
			
		||||
		line := scanner.Text()
 | 
			
		||||
		tokenRange := strings.Split(line, ",")
 | 
			
		||||
		if len(tokenRange) != 2 {
 | 
			
		||||
			log.Fatalf("Range is not two integers. %s . Aborting...", tokenRange)
 | 
			
		||||
		}
 | 
			
		||||
		startStr := strings.TrimSpace(tokenRange[0])
 | 
			
		||||
		endStr := strings.TrimSpace(tokenRange[1])
 | 
			
		||||
 | 
			
		||||
		// convert string to int64
 | 
			
		||||
		start, err1 := strconv.ParseInt(startStr, 10, 64)
 | 
			
		||||
		end, err2 := strconv.ParseInt(endStr, 10, 64)
 | 
			
		||||
 | 
			
		||||
		if err1 != nil || err2 != nil {
 | 
			
		||||
			log.Fatalf("Error converting integer: %s, %s", err1, err2)
 | 
			
		||||
		}
 | 
			
		||||
		rangeRead[start] = end
 | 
			
		||||
	}
 | 
			
		||||
	ledgerOrTokenRange = &util.StoredRange{}
 | 
			
		||||
	ledgerOrTokenRange.TokenRange = rangeRead
 | 
			
		||||
}
 | 
			
		||||
 
 | 
			
		||||
@@ -21,8 +21,9 @@ type deleteInfo struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type deleteParams struct {
 | 
			
		||||
	Seq  uint64
 | 
			
		||||
	Blob []byte // hash, key, etc
 | 
			
		||||
	Seq      uint64
 | 
			
		||||
	Blob     []byte // hash, key, etc
 | 
			
		||||
	tnxIndex uint64 //transaction index
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type columnSettings struct {
 | 
			
		||||
@@ -30,6 +31,12 @@ type columnSettings struct {
 | 
			
		||||
	UseBlob bool
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type deleteMethod struct {
 | 
			
		||||
	deleteObject      maybe.Maybe[bool]
 | 
			
		||||
	deleteTransaction maybe.Maybe[bool]
 | 
			
		||||
	deleteGeneral     maybe.Maybe[bool]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Settings struct {
 | 
			
		||||
	SkipSuccessorTable          bool
 | 
			
		||||
	SkipObjectsTable            bool
 | 
			
		||||
@@ -39,9 +46,50 @@ type Settings struct {
 | 
			
		||||
	SkipLedgerTransactionsTable bool
 | 
			
		||||
	SkipLedgersTable            bool
 | 
			
		||||
	SkipWriteLatestLedger       bool
 | 
			
		||||
	SkipAccTransactionsTable    bool
 | 
			
		||||
 | 
			
		||||
	WorkerCount int
 | 
			
		||||
	Ranges      []*util.TokenRange
 | 
			
		||||
	RangesRead  *util.StoredRange // Used to resume deletion
 | 
			
		||||
	Command     string
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Marker struct {
 | 
			
		||||
	cmd  string
 | 
			
		||||
	file *os.File
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func NewMarker(cmd string) *Marker {
 | 
			
		||||
	return &Marker{cmd: cmd}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func CloseMarker(m *Marker) {
 | 
			
		||||
	if m.file != nil {
 | 
			
		||||
		m.file.Close()
 | 
			
		||||
	}
 | 
			
		||||
	os.Remove("continue.txt")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Marker) EnterTable(table string) error {
 | 
			
		||||
	// Create the file
 | 
			
		||||
	file, err := os.OpenFile("continue.txt", os.O_WRONLY|os.O_TRUNC|os.O_CREATE, 0644)
 | 
			
		||||
	m.file = file
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return fmt.Errorf("failed to create file: %w", err)
 | 
			
		||||
	}
 | 
			
		||||
	fmt.Fprintf(m.file, "%s\n", m.cmd)
 | 
			
		||||
	m.file.WriteString(fmt.Sprintf("%s\n", table))
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Marker) MarkProgress(x int64, y int64) {
 | 
			
		||||
	fmt.Fprintf(m.file, "%d, %d \n", x, y)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (m *Marker) ExitTable() {
 | 
			
		||||
	m.file.Close()
 | 
			
		||||
	m.file = nil
 | 
			
		||||
	os.Remove("continue.txt")
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
type Cass interface {
 | 
			
		||||
@@ -67,7 +115,7 @@ func (c *ClioCass) DeleteBefore(ledgerIdx uint64) {
 | 
			
		||||
 | 
			
		||||
	log.Printf("DB ledger range is %d -> %d\n", firstLedgerIdxInDB, latestLedgerIdxInDB)
 | 
			
		||||
 | 
			
		||||
	if firstLedgerIdxInDB > ledgerIdx {
 | 
			
		||||
	if firstLedgerIdxInDB >= ledgerIdx {
 | 
			
		||||
		log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -98,7 +146,7 @@ func (c *ClioCass) DeleteAfter(ledgerIdx uint64) {
 | 
			
		||||
		log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if latestLedgerIdxInDB < ledgerIdx {
 | 
			
		||||
	if latestLedgerIdxInDB <= ledgerIdx {
 | 
			
		||||
		log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
@@ -146,7 +194,6 @@ func (c *ClioCass) pruneData(
 | 
			
		||||
	var totalRows uint64
 | 
			
		||||
	var totalDeletes uint64
 | 
			
		||||
 | 
			
		||||
	var info deleteInfo
 | 
			
		||||
	var rowsCount uint64
 | 
			
		||||
	var deleteCount uint64
 | 
			
		||||
	var errCount uint64
 | 
			
		||||
@@ -180,100 +227,156 @@ func (c *ClioCass) pruneData(
 | 
			
		||||
 | 
			
		||||
	// successor queries
 | 
			
		||||
	if !c.settings.SkipSuccessorTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("successor"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for successor table")
 | 
			
		||||
		info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
		rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
			"SELECT key, seq FROM successor WHERE token(key) >= ? AND token(key) <= ?",
 | 
			
		||||
			"DELETE FROM successor WHERE key = ? AND seq = ?", false)
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", len(info.Data))
 | 
			
		||||
			"DELETE FROM successor WHERE key = ? AND seq = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", deleteCount)
 | 
			
		||||
		log.Printf("Total traversed rows: %d\n\n", rowsCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalRows += rowsCount
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true})
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// objects queries
 | 
			
		||||
	if !c.settings.SkipObjectsTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("objects"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for objects table")
 | 
			
		||||
		info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
		rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
			"SELECT key, sequence FROM objects WHERE token(key) >= ? AND token(key) <= ?",
 | 
			
		||||
			"DELETE FROM objects WHERE key = ? AND sequence = ?", true)
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", len(info.Data))
 | 
			
		||||
			"DELETE FROM objects WHERE key = ? AND sequence = ?", deleteMethod{deleteObject: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: true})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", deleteCount)
 | 
			
		||||
		log.Printf("Total traversed rows: %d\n\n", rowsCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalRows += rowsCount
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: true})
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// ledger_hashes queries
 | 
			
		||||
	if !c.settings.SkipLedgerHashesTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("ledger_hashes"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for ledger_hashes table")
 | 
			
		||||
		info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
		rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
			"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= ? AND token(hash) <= ?",
 | 
			
		||||
			"DELETE FROM ledger_hashes WHERE hash = ?", false)
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", len(info.Data))
 | 
			
		||||
			"DELETE FROM ledger_hashes WHERE hash = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", deleteCount)
 | 
			
		||||
		log.Printf("Total traversed rows: %d\n\n", rowsCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalRows += rowsCount
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// transactions queries
 | 
			
		||||
	if !c.settings.SkipTransactionsTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("transactions"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for transactions table")
 | 
			
		||||
		info, rowsCount, errCount = c.prepareDeleteQueries(fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
		rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
			"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= ? AND token(hash) <= ?",
 | 
			
		||||
			"DELETE FROM transactions WHERE hash = ?", false)
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", len(info.Data))
 | 
			
		||||
			"DELETE FROM transactions WHERE hash = ?", deleteMethod{deleteGeneral: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", deleteCount)
 | 
			
		||||
		log.Printf("Total traversed rows: %d\n\n", rowsCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalRows += rowsCount
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// diff queries
 | 
			
		||||
	if !c.settings.SkipDiffTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("diff"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for diff table")
 | 
			
		||||
		info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM diff WHERE seq = ?")
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", len(info.Data))
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM diff WHERE seq = ?", columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", deleteCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// ledger_transactions queries
 | 
			
		||||
	if !c.settings.SkipLedgerTransactionsTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("ledger_transactions"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for ledger_transactions table")
 | 
			
		||||
		info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", len(info.Data))
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM ledger_transactions WHERE ledger_sequence = ?", columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", deleteCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// ledgers queries
 | 
			
		||||
	if !c.settings.SkipLedgersTable {
 | 
			
		||||
		log.Println("Generating delete queries for ledgers table")
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("ledgers"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		info = c.prepareSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM ledgers WHERE sequence = ?")
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", len(info.Data))
 | 
			
		||||
		deleteCount, errCount = c.performDeleteQueries(&info, columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		log.Println("Generating delete queries for ledgers table")
 | 
			
		||||
		deleteCount, errCount = c.prepareAndExecuteSimpleDeleteQueries(rangeFrom, rangeTo,
 | 
			
		||||
			"DELETE FROM ledgers WHERE sequence = ?", columnSettings{UseBlob: false, UseSeq: true})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n\n", deleteCount)
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: tbd what to do with account_tx as it got tuple for seq_idx
 | 
			
		||||
	// TODO: also, whether we need to take care of nft tables and other stuff like that
 | 
			
		||||
	// account_tx queries
 | 
			
		||||
	if !c.settings.SkipAccTransactionsTable {
 | 
			
		||||
		marker := NewMarker(c.settings.Command)
 | 
			
		||||
		if err := marker.EnterTable("account_tx"); err != nil {
 | 
			
		||||
			return err
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		log.Println("Generating delete queries for account transactions table")
 | 
			
		||||
		rowsCount, deleteCount, errCount = c.prepareAndExecuteDeleteQueries(marker, fromLedgerIdx, toLedgerIdx,
 | 
			
		||||
			"SELECT account, seq_idx FROM account_tx WHERE token(account) >= ? AND token(account) <= ?",
 | 
			
		||||
			"DELETE FROM account_tx WHERE account = ? AND seq_idx = (?, ?)", deleteMethod{deleteTransaction: maybe.Set(true)}, columnSettings{UseBlob: true, UseSeq: false})
 | 
			
		||||
		log.Printf("Total delete queries: %d\n", deleteCount)
 | 
			
		||||
		log.Printf("Total traversed rows: %d\n\n", rowsCount)
 | 
			
		||||
		totalRows += rowsCount
 | 
			
		||||
		totalErrors += errCount
 | 
			
		||||
		totalDeletes += deleteCount
 | 
			
		||||
 | 
			
		||||
		marker.ExitTable()
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	// TODO: take care of nft tables and other stuff like that
 | 
			
		||||
 | 
			
		||||
	if !c.settings.SkipWriteLatestLedger {
 | 
			
		||||
		var (
 | 
			
		||||
@@ -304,27 +407,150 @@ func (c *ClioCass) pruneData(
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) prepareSimpleDeleteQueries(
 | 
			
		||||
func (c *ClioCass) prepareAndExecuteSimpleDeleteQueries(
 | 
			
		||||
	fromLedgerIdx uint64,
 | 
			
		||||
	toLedgerIdx uint64,
 | 
			
		||||
	deleteQueryTemplate string,
 | 
			
		||||
) deleteInfo {
 | 
			
		||||
	colSettings columnSettings,
 | 
			
		||||
) (uint64, uint64) {
 | 
			
		||||
	var totalDeletes uint64
 | 
			
		||||
	var totalErrors uint64
 | 
			
		||||
 | 
			
		||||
	var info = deleteInfo{Query: deleteQueryTemplate}
 | 
			
		||||
 | 
			
		||||
	for i := fromLedgerIdx; i <= toLedgerIdx; i++ {
 | 
			
		||||
		info.Data = append(info.Data, deleteParams{Seq: i})
 | 
			
		||||
	if session, err := c.clusterConfig.CreateSession(); err == nil {
 | 
			
		||||
		defer session.Close()
 | 
			
		||||
		for i := fromLedgerIdx; i <= toLedgerIdx; i++ {
 | 
			
		||||
			info.Data = append(info.Data, deleteParams{Seq: i})
 | 
			
		||||
			// for every 1000 queries in data, delete
 | 
			
		||||
			if len(info.Data) == 1000 {
 | 
			
		||||
				_, err := c.performDeleteQueries(&info, session, colSettings)
 | 
			
		||||
				atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
 | 
			
		||||
				atomic.AddUint64(&totalErrors, err)
 | 
			
		||||
				info = deleteInfo{Query: deleteQueryTemplate}
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
		// delete the rest of queries if exists
 | 
			
		||||
		if len(info.Data) > 0 {
 | 
			
		||||
			_, err := c.performDeleteQueries(&info, session, colSettings)
 | 
			
		||||
			atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
 | 
			
		||||
			atomic.AddUint64(&totalErrors, err)
 | 
			
		||||
		}
 | 
			
		||||
	} else {
 | 
			
		||||
		log.Printf("ERROR: %s\n", err)
 | 
			
		||||
		fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
 | 
			
		||||
		atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return info
 | 
			
		||||
	return totalDeletes, totalErrors
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) prepareDeleteQueries(
 | 
			
		||||
func (c *ClioCass) prepareDefaultDelete(
 | 
			
		||||
	scanner gocql.Scanner,
 | 
			
		||||
	info *deleteInfo,
 | 
			
		||||
	fromLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	toLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	rowsRetrieved *uint64,
 | 
			
		||||
) bool {
 | 
			
		||||
	for scanner.Next() {
 | 
			
		||||
		var key []byte
 | 
			
		||||
		var seq uint64
 | 
			
		||||
 | 
			
		||||
		err := scanner.Scan(&key, &seq)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			*rowsRetrieved++
 | 
			
		||||
 | 
			
		||||
			// only grab the rows that are in the correct range of sequence numbers
 | 
			
		||||
			if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq {
 | 
			
		||||
				info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key})
 | 
			
		||||
			} else if toLedgerIdx.HasValue() && seq <= toLedgerIdx.Value() {
 | 
			
		||||
				info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) prepareObjectDelete(
 | 
			
		||||
	scanner gocql.Scanner,
 | 
			
		||||
	info *deleteInfo,
 | 
			
		||||
	fromLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	toLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	rowsRetrieved *uint64,
 | 
			
		||||
) bool {
 | 
			
		||||
	var previousKey []byte
 | 
			
		||||
	var foundLastValid bool
 | 
			
		||||
	var keepLastValid = true
 | 
			
		||||
 | 
			
		||||
	for scanner.Next() {
 | 
			
		||||
		var key []byte
 | 
			
		||||
		var seq uint64
 | 
			
		||||
 | 
			
		||||
		err := scanner.Scan(&key, &seq)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			*rowsRetrieved++
 | 
			
		||||
 | 
			
		||||
			if keepLastValid && !slices.Equal(previousKey, key) {
 | 
			
		||||
				previousKey = key
 | 
			
		||||
				foundLastValid = false
 | 
			
		||||
			}
 | 
			
		||||
 | 
			
		||||
			// only grab the rows that are in the correct range of sequence numbers
 | 
			
		||||
			if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq {
 | 
			
		||||
				info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key})
 | 
			
		||||
			} else if toLedgerIdx.HasValue() {
 | 
			
		||||
				if seq <= toLedgerIdx.Value() && (!keepLastValid || foundLastValid) {
 | 
			
		||||
					info.Data = append(info.Data, deleteParams{Seq: seq, Blob: key})
 | 
			
		||||
				} else if seq <= toLedgerIdx.Value()+1 {
 | 
			
		||||
					foundLastValid = true
 | 
			
		||||
				}
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) prepareAccTxnDelete(
 | 
			
		||||
	scanner gocql.Scanner,
 | 
			
		||||
	info *deleteInfo,
 | 
			
		||||
	fromLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	toLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	rowsRetrieved *uint64,
 | 
			
		||||
) bool {
 | 
			
		||||
	for scanner.Next() {
 | 
			
		||||
		var key []byte
 | 
			
		||||
		var ledgerIndex, txnIndex uint64
 | 
			
		||||
 | 
			
		||||
		// account_tx/nft table has seq_idx frozen<tuple<bigint, bigint>>
 | 
			
		||||
		err := scanner.Scan(&key, &ledgerIndex, &txnIndex)
 | 
			
		||||
		if err == nil {
 | 
			
		||||
			*rowsRetrieved++
 | 
			
		||||
 | 
			
		||||
			// only grab the rows that are in the correct range of sequence numbers
 | 
			
		||||
			if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= ledgerIndex {
 | 
			
		||||
				info.Data = append(info.Data, deleteParams{Seq: ledgerIndex, Blob: key, tnxIndex: txnIndex})
 | 
			
		||||
			} else if toLedgerIdx.HasValue() && ledgerIndex <= toLedgerIdx.Value() {
 | 
			
		||||
				info.Data = append(info.Data, deleteParams{Seq: ledgerIndex, Blob: key, tnxIndex: txnIndex})
 | 
			
		||||
			}
 | 
			
		||||
		} else {
 | 
			
		||||
			return false
 | 
			
		||||
		}
 | 
			
		||||
	}
 | 
			
		||||
	return true
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) prepareAndExecuteDeleteQueries(
 | 
			
		||||
	marker *Marker,
 | 
			
		||||
	fromLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	toLedgerIdx maybe.Maybe[uint64],
 | 
			
		||||
	queryTemplate string,
 | 
			
		||||
	deleteQueryTemplate string,
 | 
			
		||||
	keepLastValid bool,
 | 
			
		||||
) (deleteInfo, uint64, uint64) {
 | 
			
		||||
	method deleteMethod,
 | 
			
		||||
	colSettings columnSettings,
 | 
			
		||||
) (uint64, uint64, uint64) {
 | 
			
		||||
	rangesChannel := make(chan *util.TokenRange, len(c.settings.Ranges))
 | 
			
		||||
	for i := range c.settings.Ranges {
 | 
			
		||||
		rangesChannel <- c.settings.Ranges[i]
 | 
			
		||||
@@ -332,24 +558,12 @@ func (c *ClioCass) prepareDeleteQueries(
 | 
			
		||||
 | 
			
		||||
	close(rangesChannel)
 | 
			
		||||
 | 
			
		||||
	outChannel := make(chan deleteParams)
 | 
			
		||||
	var info = deleteInfo{Query: deleteQueryTemplate}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		total := uint64(0)
 | 
			
		||||
		for params := range outChannel {
 | 
			
		||||
			total += 1
 | 
			
		||||
			if total%1000 == 0 {
 | 
			
		||||
				log.Printf("... %d queries ...\n", total)
 | 
			
		||||
			}
 | 
			
		||||
			info.Data = append(info.Data, params)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	var sessionCreationWaitGroup sync.WaitGroup
 | 
			
		||||
	var totalRows uint64
 | 
			
		||||
	var totalDeletes uint64
 | 
			
		||||
	var totalErrors uint64
 | 
			
		||||
	counter := uint64(1000)
 | 
			
		||||
 | 
			
		||||
	wg.Add(c.settings.WorkerCount)
 | 
			
		||||
	sessionCreationWaitGroup.Add(c.settings.WorkerCount)
 | 
			
		||||
@@ -368,58 +582,66 @@ func (c *ClioCass) prepareDeleteQueries(
 | 
			
		||||
				preparedQuery := session.Query(q)
 | 
			
		||||
 | 
			
		||||
				for r := range rangesChannel {
 | 
			
		||||
					if c.settings.RangesRead != nil {
 | 
			
		||||
						if value, exists := c.settings.RangesRead.TokenRange[r.StartRange]; exists {
 | 
			
		||||
							// Check for end range
 | 
			
		||||
							if value == r.EndRange {
 | 
			
		||||
								marker.MarkProgress(r.StartRange, r.EndRange)
 | 
			
		||||
								continue
 | 
			
		||||
							}
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					preparedQuery.Bind(r.StartRange, r.EndRange)
 | 
			
		||||
 | 
			
		||||
					var pageState []byte
 | 
			
		||||
					var rowsRetrieved uint64
 | 
			
		||||
 | 
			
		||||
					var previousKey []byte
 | 
			
		||||
					var foundLastValid bool
 | 
			
		||||
					var info = deleteInfo{Query: deleteQueryTemplate}
 | 
			
		||||
 | 
			
		||||
					for {
 | 
			
		||||
						iter := preparedQuery.PageSize(c.clusterConfig.PageSize).PageState(pageState).Iter()
 | 
			
		||||
						nextPageState := iter.PageState()
 | 
			
		||||
						scanner := iter.Scanner()
 | 
			
		||||
 | 
			
		||||
						for scanner.Next() {
 | 
			
		||||
							var key []byte
 | 
			
		||||
							var seq uint64
 | 
			
		||||
						var prepareDeleteResult bool
 | 
			
		||||
 | 
			
		||||
							err = scanner.Scan(&key, &seq)
 | 
			
		||||
							if err == nil {
 | 
			
		||||
								rowsRetrieved++
 | 
			
		||||
						// query object table first as it is the largest table by far
 | 
			
		||||
						if method.deleteObject.HasValue() && method.deleteObject.Value() {
 | 
			
		||||
							prepareDeleteResult = c.prepareObjectDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved)
 | 
			
		||||
						} else if method.deleteTransaction.HasValue() && method.deleteTransaction.Value() {
 | 
			
		||||
							prepareDeleteResult = c.prepareAccTxnDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved)
 | 
			
		||||
						} else if method.deleteGeneral.HasValue() && method.deleteGeneral.Value() {
 | 
			
		||||
							prepareDeleteResult = c.prepareDefaultDelete(scanner, &info, fromLedgerIdx, toLedgerIdx, &rowsRetrieved)
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
								if keepLastValid && !slices.Equal(previousKey, key) {
 | 
			
		||||
									previousKey = key
 | 
			
		||||
									foundLastValid = false
 | 
			
		||||
								}
 | 
			
		||||
 | 
			
		||||
								// only grab the rows that are in the correct range of sequence numbers
 | 
			
		||||
								if fromLedgerIdx.HasValue() && fromLedgerIdx.Value() <= seq {
 | 
			
		||||
									outChannel <- deleteParams{Seq: seq, Blob: key}
 | 
			
		||||
								} else if toLedgerIdx.HasValue() {
 | 
			
		||||
									if seq < toLedgerIdx.Value() && (!keepLastValid || foundLastValid) {
 | 
			
		||||
										outChannel <- deleteParams{Seq: seq, Blob: key}
 | 
			
		||||
									} else if seq <= toLedgerIdx.Value()+1 {
 | 
			
		||||
										foundLastValid = true
 | 
			
		||||
									}
 | 
			
		||||
								}
 | 
			
		||||
							} else {
 | 
			
		||||
								log.Printf("ERROR: page iteration failed: %s\n", err)
 | 
			
		||||
								fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState))
 | 
			
		||||
								atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
							}
 | 
			
		||||
						if !prepareDeleteResult {
 | 
			
		||||
							log.Printf("ERROR: page iteration failed: %s\n", err)
 | 
			
		||||
							fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d][pagestate=%x]", queryTemplate, r.StartRange, r.EndRange, pageState))
 | 
			
		||||
							atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						if len(nextPageState) == 0 {
 | 
			
		||||
							// Checks for delete queries after iterating all pages
 | 
			
		||||
							if len(info.Data) > 0 {
 | 
			
		||||
								_, numErr := c.performDeleteQueries(&info, session, colSettings)
 | 
			
		||||
								atomic.AddUint64(&totalErrors, numErr)
 | 
			
		||||
								atomic.AddUint64(&totalDeletes, uint64(len(info.Data)))
 | 
			
		||||
								if totalDeletes >= counter {
 | 
			
		||||
									log.Printf("... deleted %d queries ...", counter)
 | 
			
		||||
									counter += 1000
 | 
			
		||||
								}
 | 
			
		||||
								// reset back to the deleted query template after finishing executing delete
 | 
			
		||||
								info = deleteInfo{Query: deleteQueryTemplate}
 | 
			
		||||
							}
 | 
			
		||||
							break
 | 
			
		||||
						}
 | 
			
		||||
 | 
			
		||||
						pageState = nextPageState
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
					marker.MarkProgress(r.StartRange, r.EndRange)
 | 
			
		||||
					atomic.AddUint64(&totalRows, rowsRetrieved)
 | 
			
		||||
				}
 | 
			
		||||
				// after finishing deletion of one table, set to nil, because we continue to delete normally now
 | 
			
		||||
				c.settings.RangesRead = nil
 | 
			
		||||
			} else {
 | 
			
		||||
				log.Printf("ERROR: %s\n", err)
 | 
			
		||||
				fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
 | 
			
		||||
@@ -429,9 +651,7 @@ func (c *ClioCass) prepareDeleteQueries(
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
	close(outChannel)
 | 
			
		||||
 | 
			
		||||
	return info, totalRows, totalErrors
 | 
			
		||||
	return totalRows, totalDeletes, totalErrors
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams {
 | 
			
		||||
@@ -461,7 +681,7 @@ func (c *ClioCass) splitDeleteWork(info *deleteInfo) [][]deleteParams {
 | 
			
		||||
	return chunks
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSettings) (uint64, uint64) {
 | 
			
		||||
func (c *ClioCass) performDeleteQueries(info *deleteInfo, session *gocql.Session, colSettings columnSettings) (uint64, uint64) {
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	var sessionCreationWaitGroup sync.WaitGroup
 | 
			
		||||
	var totalDeletes uint64
 | 
			
		||||
@@ -485,43 +705,35 @@ func (c *ClioCass) performDeleteQueries(info *deleteInfo, colSettings columnSett
 | 
			
		||||
		go func(number int, q string, bc int) {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
 | 
			
		||||
			var session *gocql.Session
 | 
			
		||||
			var err error
 | 
			
		||||
			if session, err = c.clusterConfig.CreateSession(); err == nil {
 | 
			
		||||
				defer session.Close()
 | 
			
		||||
			sessionCreationWaitGroup.Done()
 | 
			
		||||
			sessionCreationWaitGroup.Wait()
 | 
			
		||||
			preparedQuery := session.Query(q)
 | 
			
		||||
 | 
			
		||||
				sessionCreationWaitGroup.Done()
 | 
			
		||||
				sessionCreationWaitGroup.Wait()
 | 
			
		||||
				preparedQuery := session.Query(q)
 | 
			
		||||
 | 
			
		||||
				for chunk := range chunksChannel {
 | 
			
		||||
					for _, r := range chunk {
 | 
			
		||||
						if bc == 2 {
 | 
			
		||||
							preparedQuery.Bind(r.Blob, r.Seq)
 | 
			
		||||
						} else if bc == 1 {
 | 
			
		||||
							if colSettings.UseSeq {
 | 
			
		||||
								preparedQuery.Bind(r.Seq)
 | 
			
		||||
							} else if colSettings.UseBlob {
 | 
			
		||||
								preparedQuery.Bind(r.Blob)
 | 
			
		||||
							}
 | 
			
		||||
			for chunk := range chunksChannel {
 | 
			
		||||
				for _, r := range chunk {
 | 
			
		||||
					if bc == 3 {
 | 
			
		||||
						preparedQuery.Bind(r.Blob, r.Seq, r.tnxIndex)
 | 
			
		||||
					} else if bc == 2 {
 | 
			
		||||
						preparedQuery.Bind(r.Blob, r.Seq)
 | 
			
		||||
					} else if bc == 1 {
 | 
			
		||||
						if colSettings.UseSeq {
 | 
			
		||||
							preparedQuery.Bind(r.Seq)
 | 
			
		||||
						} else if colSettings.UseBlob {
 | 
			
		||||
							preparedQuery.Bind(r.Blob)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
 | 
			
		||||
						if err := preparedQuery.Exec(); err != nil {
 | 
			
		||||
							log.Printf("DELETE ERROR: %s\n", err)
 | 
			
		||||
							fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
 | 
			
		||||
							atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
						} else {
 | 
			
		||||
							atomic.AddUint64(&totalDeletes, 1)
 | 
			
		||||
							if atomic.LoadUint64(&totalDeletes)%10000 == 0 {
 | 
			
		||||
								log.Printf("... %d deletes ...\n", totalDeletes)
 | 
			
		||||
							}
 | 
			
		||||
					if err := preparedQuery.Exec(); err != nil {
 | 
			
		||||
						log.Printf("DELETE ERROR: %s\n", err)
 | 
			
		||||
						fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
 | 
			
		||||
						atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
					} else {
 | 
			
		||||
						atomic.AddUint64(&totalDeletes, 1)
 | 
			
		||||
						if atomic.LoadUint64(&totalDeletes)%10000 == 0 {
 | 
			
		||||
							log.Printf("... %d deletes ...\n", totalDeletes)
 | 
			
		||||
						}
 | 
			
		||||
					}
 | 
			
		||||
				}
 | 
			
		||||
			} else {
 | 
			
		||||
				log.Printf("ERROR: %s\n", err)
 | 
			
		||||
				fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
 | 
			
		||||
				atomic.AddUint64(&totalErrors, 1)
 | 
			
		||||
			}
 | 
			
		||||
		}(i, query, bindCount)
 | 
			
		||||
	}
 | 
			
		||||
 
 | 
			
		||||
@@ -14,6 +14,12 @@ type TokenRange struct {
 | 
			
		||||
	EndRange   int64
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// not stored as arrays of startRange/endRange because it will be O(n) lookup
 | 
			
		||||
// stored as Map with key startRange, value endRange so O(1) lookup for tokenRange
 | 
			
		||||
type StoredRange struct {
 | 
			
		||||
	TokenRange map[int64]int64 // all ranges that has been read and deleted
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Shuffle(data []*TokenRange) {
 | 
			
		||||
	for i := 1; i < len(data); i++ {
 | 
			
		||||
		r := rand.Intn(i + 1)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user