Cassandra data removal tool (#1142)

Fixes #1143
This commit is contained in:
Alex Kremer
2024-01-30 13:27:42 +00:00
committed by GitHub
parent df27c4e629
commit 3fda74e3f7
5 changed files with 591 additions and 1499 deletions

View File

@@ -1,183 +0,0 @@
#!/usr/bin/python3
import argparse
from datetime import datetime
def getTime(line):
bracketOpen = line.find("[")
bracketClose = line.find("]")
timestampSub = line[bracketOpen+1:bracketClose]
timestamp = datetime.strptime(timestampSub, '%Y-%m-%d %H:%M:%S.%f')
return timestamp.timestamp()
def parseAccountTx(filename):
with open(filename) as f:
totalProcTime = 0.0
totalTxnTime = 0.0
numCalls = 0
for line in f:
if "executed stored_procedure" in line:
idx = line.find("in ")
idx = idx + 3
idx2 = line.find("num")
procTime = float(line[idx:idx2])
totalProcTime += procTime
if "fetchTransactions fetched" in line:
idx = line.find("took ")
idx = idx + 5
txnTime = float(line[idx:])
totalTxnTime += txnTime
numCalls = numCalls + 1
print(totalProcTime)
print(totalProcTime/numCalls)
print(totalTxnTime)
print(totalTxnTime/numCalls)
def parseLogs(filename, interval, minTxnCount = 0):
with open(filename) as f:
totalTime = 0
totalTxns = 0
totalObjs = 0
totalLoadTime = 0
start = 0
end = 0
totalLedgers = 0
intervalTime = 0
intervalTxns = 0
intervalObjs = 0
intervalLoadTime = 0
intervalStart = 0
intervalEnd = 0
intervalLedgers = 0
ledgersPerSecond = 0
print("ledgers, transactions, objects, loadTime, loadTime/ledger, ledgers/sec, txns/sec, objs/sec")
for line in f:
if "Load phase" in line:
sequenceIdx = line.find("Sequence : ")
hashIdx = line.find(" Hash :")
sequence = line[sequenceIdx + len("Sequence : "):hashIdx]
txnCountSubstr = "txn count = "
objCountSubstr = ". object count = "
loadTimeSubstr = ". load time = "
txnsSubstr = ". load txns per second = "
objsSubstr = ". load objs per second = "
txnCountIdx = line.find(txnCountSubstr)
objCountIdx = line.find(objCountSubstr)
loadTimeIdx = line.find(loadTimeSubstr)
txnsIdx = line.find(txnsSubstr)
objsIdx = line.find(objsSubstr)
txnCount = line[txnCountIdx + len(txnCountSubstr):objCountIdx]
objCount = line[objCountIdx + len(objCountSubstr):loadTimeIdx]
loadTime = line[loadTimeIdx + len(loadTimeSubstr):txnsIdx]
txnsPerSecond = line[txnsIdx + len(txnsSubstr):objsIdx]
objsPerSecond = line[objsIdx + len(objsSubstr):-1]
if int(txnCount) >= minTxnCount:
totalTime += float(loadTime);
totalTxns += float(txnCount)
totalObjs += float(objCount)
intervalTime += float(loadTime)
intervalTxns += float(txnCount)
intervalObjs += float(objCount)
totalLoadTime += float(loadTime)
intervalLoadTime += float(loadTime)
if start == 0:
start = getTime(line)
prevEnd = end
end = getTime(line)
if intervalStart == 0:
intervalStart = getTime(line)
intervalEnd = getTime(line)
totalLedgers+=1
intervalLedgers+=1
ledgersPerSecond = 0
if end != start:
ledgersPerSecond = float(totalLedgers) / float((end - start))
intervalLedgersPerSecond = 0
if intervalEnd != intervalStart:
intervalLedgersPerSecond = float(intervalLedgers) / float((intervalEnd - intervalStart))
if int(sequence) % interval == 0:
# print("Sequence = " + sequence + " : [time, txCount, objCount, txPerSec, objsPerSec]")
# print(loadTime + " , "
# + txnCount + " , "
# + objCount + " , "
# + txnsPerSecond + " , "
# + objsPerSecond)
# print("Interval Aggregate ( " + str(interval) + " ) [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]: ")
print(str(intervalLedgers) + " , "
+ str(intervalTxns) + " , "
+ str(intervalObjs) + " , "
+ str(intervalLoadTime) + " , "
+ str(intervalLoadTime/intervalLedgers) + " , "
+ str(intervalLedgers/intervalLoadTime) + " , "
+ str(intervalTxns/intervalLoadTime) + " , "
+ str(intervalObjs/intervalLoadTime))
# print("Total Aggregate: [ledgers, txns, objects, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]")
# print(str(totalLedgers) + " , "
# + str(totalTxns) + " , "
# + str(totalObjs) + " , "
# + str(end-start) + " , "
# + str(ledgersPerSecond) + " , "
# + str(totalLoadTime/totalLedgers) + " , "
# + str(totalTxns/totalTime) + " , "
# + str(totalObjs/totalTime))
if int(sequence) % interval == 0:
intervalTime = 0
intervalTxns = 0
intervalObjs = 0
intervalStart = 0
intervalEnd = 0
intervalLedgers = 0
intervalLoadTime = 0
print("Total Aggregate: [ledgers, elapsedTime, ledgersPerSec, avgLoadTime, txPerSec, objsPerSec]")
print(totalLedgers)
print(totalLoadTime)
print(str(totalLedgers) + " : "
+ str(end-start) + " : "
+ str(ledgersPerSecond) + " : "
+ str(totalLoadTime/totalLedgers) + " : "
+ str(totalTxns/totalTime) + " : "
+ str(totalObjs/totalTime))
parser = argparse.ArgumentParser(description='parses logs')
parser.add_argument("--filename")
parser.add_argument("--interval",default=100000)
parser.add_argument("--minTxnCount",default=0)
parser.add_argument("--account_tx",default=False)
args = parser.parse_args()
def run(args):
if args.account_tx:
parseAccountTx(args.filename)
else:
parseLogs(args.filename, int(args.interval))
run(args)

1316
test.py

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,13 @@
module xrplf/clio/cassandra_delete_range
go 1.21.6
require (
github.com/alecthomas/kingpin/v2 v2.4.0 // indirect
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 // indirect
github.com/gocql/gocql v1.6.0 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed // indirect
github.com/xhit/go-str2duration/v2 v2.1.0 // indirect
gopkg.in/inf.v0 v0.9.1 // indirect
)

View File

@@ -0,0 +1,26 @@
github.com/alecthomas/kingpin/v2 v2.4.0 h1:f48lwail6p8zpO1bC4TxtqACaGqHYA22qkHjHpqDjYY=
github.com/alecthomas/kingpin/v2 v2.4.0/go.mod h1:0gyi0zQnjuFk8xrkNKamJoyUo382HRL7ATRpFZCw6tE=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137 h1:s6gZFSlWYmbqAuRjVTiNNhvNRfY2Wxp9nhfyel4rklc=
github.com/alecthomas/units v0.0.0-20211218093645-b94a6e3cc137/go.mod h1:OMCwj8VM1Kc9e19TLln2VL61YJF0x1XFtfdL4JdbSyE=
github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k=
github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869/go.mod h1:Ekp36dRnpXw/yCqJaO+ZrUyxD+3VXMFFr56k5XYrpB4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/gocql/gocql v1.6.0 h1:IdFdOTbnpbd0pDhl4REKQDM+Q0SzKXQ1Yh+YZZ8T/qU=
github.com/gocql/gocql v1.6.0/go.mod h1:3gM2c4D3AnkISwBxGnMMsS8Oy4y2lhbPRsH4xnJrHG8=
github.com/golang/snappy v0.0.3 h1:fHPg5GQYlCeLIPB9BZqMVR5nR9A+IM5zcgeTdjMYmLA=
github.com/golang/snappy v0.0.3/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed h1:5upAirOpQc1Q53c0bnx2ufif5kANL7bfZWcc6VJWJd8=
github.com/hailocab/go-hostpool v0.0.0-20160125115350-e80d13ce29ed/go.mod h1:tMWxXQ9wFIaZeTI9F+hmhFiGpFmhOHzyShyFUhRm0H4=
github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/xhit/go-str2duration/v2 v2.1.0 h1:lxklc02Drh6ynqX+DdPyp5pCKLUQpRT8bp8Ydu2Bstc=
github.com/xhit/go-str2duration/v2 v2.1.0/go.mod h1:ohY8p+0f07DiV6Em5LKB0s2YpLtXVyJfNt1+BlmyAsU=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=

View File

@@ -0,0 +1,552 @@
//
// Based off of https://github.com/scylladb/scylla-code-samples/blob/master/efficient_full_table_scan_example_code/efficient_full_table_scan.go
//
package main
import (
"fmt"
"log"
"math"
"math/rand"
"os"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/alecthomas/kingpin/v2"
"github.com/gocql/gocql"
)
const (
defaultNumberOfNodesInCluster = 3
defaultNumberOfCoresInNode = 8
defaultSmudgeFactor = 3
)
var (
clusterHosts = kingpin.Arg("hosts", "Your Scylla nodes IP addresses, comma separated (i.e. 192.168.1.1,192.168.1.2,192.168.1.3)").Required().String()
earliestLedgerIdx = kingpin.Flag("ledgerIdx", "Sets the earliest ledger_index to keep untouched").Short('i').Required().Uint64()
nodesInCluster = kingpin.Flag("nodes-in-cluster", "Number of nodes in your Scylla cluster").Short('n').Default(fmt.Sprintf("%d", defaultNumberOfNodesInCluster)).Int()
coresInNode = kingpin.Flag("cores-in-node", "Number of cores in each node").Short('c').Default(fmt.Sprintf("%d", defaultNumberOfCoresInNode)).Int()
smudgeFactor = kingpin.Flag("smudge-factor", "Yet another factor to make parallelism cooler").Short('s').Default(fmt.Sprintf("%d", defaultSmudgeFactor)).Int()
clusterConsistency = kingpin.Flag("consistency", "Cluster consistency level. Use 'localone' for multi DC").Short('o').Default("localquorum").String()
clusterTimeout = kingpin.Flag("timeout", "Maximum duration for query execution in millisecond").Short('t').Default("15000").Int()
clusterNumConnections = kingpin.Flag("cluster-number-of-connections", "Number of connections per host per session (in our case, per thread)").Short('b').Default("1").Int()
clusterCQLVersion = kingpin.Flag("cql-version", "The CQL version to use").Short('l').Default("3.0.0").String()
clusterPageSize = kingpin.Flag("cluster-page-size", "Page size of results").Short('p').Default("5000").Int()
keyspace = kingpin.Flag("keyspace", "Keyspace to use").Short('k').Default("clio_fh").String()
userName = kingpin.Flag("username", "Username to use when connecting to the cluster").String()
password = kingpin.Flag("password", "Password to use when connecting to the cluster").String()
numberOfParallelClientThreads = 1 // the calculated number of parallel threads the client should run
ranges []*tokenRange // the calculated ranges to be executed in parallel
)
type tokenRange struct {
StartRange int64
EndRange int64
}
type deleteParams struct {
Seq uint64
Blob []byte // hash, key, etc
}
type columnSettings struct {
UseSeq bool
UseBlob bool
}
type deleteInfo struct {
Query string
Data []deleteParams
}
func getTokenRanges() []*tokenRange {
var n = numberOfParallelClientThreads
var m = int64(n * 100)
var maxSize uint64 = math.MaxInt64 * 2
var rangeSize = maxSize / uint64(m)
var start int64 = math.MinInt64
var end int64
var shouldBreak = false
var ranges = make([]*tokenRange, m)
for i := int64(0); i < m; i++ {
end = start + int64(rangeSize)
if start > 0 && end < 0 {
end = math.MaxInt64
shouldBreak = true
}
ranges[i] = &tokenRange{StartRange: start, EndRange: end}
if shouldBreak {
break
}
start = end + 1
}
return ranges
}
func splitDeleteWork(info *deleteInfo) [][]deleteParams {
var n = numberOfParallelClientThreads
var chunkSize = len(info.Data) / n
var chunks [][]deleteParams
if len(info.Data) == 0 {
return chunks
}
if chunkSize < 1 {
chunks = append(chunks, info.Data)
return chunks
}
for i := 0; i < len(info.Data); i += chunkSize {
end := i + chunkSize
if end > len(info.Data) {
end = len(info.Data)
}
chunks = append(chunks, info.Data[i:end])
}
return chunks
}
func shuffle(data []*tokenRange) {
for i := 1; i < len(data); i++ {
r := rand.Intn(i + 1)
if i != r {
data[r], data[i] = data[i], data[r]
}
}
}
func getConsistencyLevel(consistencyValue string) gocql.Consistency {
switch consistencyValue {
case "any":
return gocql.Any
case "one":
return gocql.One
case "two":
return gocql.Two
case "three":
return gocql.Three
case "quorum":
return gocql.Quorum
case "all":
return gocql.All
case "localquorum":
return gocql.LocalQuorum
case "eachquorum":
return gocql.EachQuorum
case "localone":
return gocql.LocalOne
default:
return gocql.One
}
}
func main() {
kingpin.Parse()
numberOfParallelClientThreads = (*nodesInCluster) * (*coresInNode) * (*smudgeFactor)
ranges = getTokenRanges()
shuffle(ranges)
hosts := strings.Split(*clusterHosts, ",")
cluster := gocql.NewCluster(hosts...)
cluster.Consistency = getConsistencyLevel(*clusterConsistency)
cluster.Timeout = time.Duration(*clusterTimeout * 1000 * 1000)
cluster.NumConns = *clusterNumConnections
cluster.CQLVersion = *clusterCQLVersion
cluster.PageSize = *clusterPageSize
cluster.Keyspace = *keyspace
if *userName != "" {
cluster.Authenticator = gocql.PasswordAuthenticator{
Username: *userName,
Password: *password,
}
}
if *earliestLedgerIdx == 0 {
log.Println("Please specify ledger index to delete from")
return
}
runParameters := fmt.Sprintf(`
Execution Parameters:
=====================
Range to be deleted : %d -> latest
Scylla cluster nodes : %s
Keyspace : %s
Consistency : %s
Timeout (ms) : %d
Connections per host : %d
CQL Version : %s
Page size : %d
# of parallel threads : %d
# of ranges to be executed : %d
`, *earliestLedgerIdx, *clusterHosts, *keyspace, *clusterConsistency, cluster.Timeout/1000/1000, *clusterNumConnections, *clusterCQLVersion, *clusterPageSize, numberOfParallelClientThreads, len(ranges))
fmt.Println(runParameters)
log.Printf("Will delete everything after ledger index %d (exclusive) and till latest\n", *earliestLedgerIdx)
log.Println("WARNING: Please make sure that there are no Clio writers operating on the DB while this script is running")
log.Println("Are you sure you want to continue? (y/n)")
var continueFlag string
if fmt.Scanln(&continueFlag); continueFlag != "y" {
log.Println("Aborting...")
return
}
startTime := time.Now().UTC()
earliestLedgerIdxInDB, latestLedgerIdxInDB, err := getLedgerRange(cluster)
if err != nil {
log.Fatal(err)
}
if earliestLedgerIdxInDB > *earliestLedgerIdx {
log.Fatal("Earliest ledger index in DB is greater than the one specified. Aborting...")
}
if latestLedgerIdxInDB < *earliestLedgerIdx {
log.Fatal("Latest ledger index in DB is smaller than the one specified. Aborting...")
}
if err := deleteLedgerData(cluster, *earliestLedgerIdx+1, latestLedgerIdxInDB); err != nil {
log.Fatal(err)
}
fmt.Printf("Total Execution Time: %s\n\n", time.Since(startTime))
fmt.Println("NOTE: Cassandra/ScyllaDB only writes tombstones. You need to run compaction to free up disk space.")
}
func getLedgerRange(cluster *gocql.ClusterConfig) (uint64, uint64, error) {
var (
firstLedgerIdx uint64
latestLedgerIdx uint64
)
session, err := cluster.CreateSession()
if err != nil {
log.Fatal(err)
}
defer session.Close()
if err := session.Query("select sequence from ledger_range where is_latest = ?", false).Scan(&firstLedgerIdx); err != nil {
return 0, 0, err
}
if err := session.Query("select sequence from ledger_range where is_latest = ?", true).Scan(&latestLedgerIdx); err != nil {
return 0, 0, err
}
log.Printf("DB ledger range is %d:%d\n", firstLedgerIdx, latestLedgerIdx)
return firstLedgerIdx, latestLedgerIdx, nil
}
func deleteLedgerData(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, toLedgerIdx uint64) error {
var totalErrors uint64
var totalRows uint64
var totalDeletes uint64
var info deleteInfo
var rowsCount uint64
var deleteCount uint64
var errCount uint64
log.Printf("Start scanning and removing data for %d -> latest (%d according to ledger_range table)\n\n", fromLedgerIdx, toLedgerIdx)
// successor queries
log.Println("Generating delete queries for successor table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, seq FROM successor WHERE token(key) >= %s AND token(key) <= %s",
"DELETE FROM successor WHERE key = ? AND seq = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// diff queries
log.Println("Generating delete queries for diff table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, seq FROM diff WHERE token(seq) >= %s AND token(seq) <= %s",
"DELETE FROM diff WHERE key = ? AND seq = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// objects queries
log.Println("Generating delete queries for objects table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT key, sequence FROM objects WHERE token(key) >= %s AND token(key) <= %s",
"DELETE FROM objects WHERE key = ? AND sequence = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// ledger_hashes queries
log.Println("Generating delete queries for ledger_hashes table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, sequence FROM ledger_hashes WHERE token(hash) >= %s AND token(hash) <= %s",
"DELETE FROM ledger_hashes WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
// transactions queries
log.Println("Generating delete queries for transactions table")
info, rowsCount, errCount = prepareDeleteQueries(cluster, fromLedgerIdx,
"SELECT hash, ledger_sequence FROM transactions WHERE token(hash) >= %s AND token(hash) <= %s",
"DELETE FROM transactions WHERE hash = ?")
log.Printf("Total delete queries: %d\n", len(info.Data))
log.Printf("Total traversed rows: %d\n\n", rowsCount)
totalErrors += errCount
totalRows += rowsCount
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: true, UseSeq: false})
totalErrors += errCount
totalDeletes += deleteCount
// ledger_transactions queries
log.Println("Generating delete queries for ledger_transactions table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledger_transactions WHERE ledger_sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// ledgers queries
log.Println("Generating delete queries for ledgers table")
info = prepareSimpleDeleteQueries(fromLedgerIdx, toLedgerIdx,
"DELETE FROM ledgers WHERE sequence = ?")
log.Printf("Total delete queries: %d\n\n", len(info.Data))
deleteCount, errCount = performDeleteQueries(cluster, &info, columnSettings{UseBlob: false, UseSeq: true})
totalErrors += errCount
totalDeletes += deleteCount
// TODO: tbd what to do with account_tx as it got tuple for seq_idx
// TODO: also, whether we need to take care of nft tables and other stuff like that
if err := updateLedgerRange(cluster, fromLedgerIdx-1); err != nil {
log.Printf("ERROR failed updating ledger range: %s\n", err)
return err
}
log.Printf("TOTAL ERRORS: %d\n", totalErrors)
log.Printf("TOTAL ROWS TRAVERSED: %d\n", totalRows)
log.Printf("TOTAL DELETES: %d\n\n", totalDeletes)
log.Printf("Completed deletion for %d -> %d\n\n", fromLedgerIdx, toLedgerIdx)
return nil
}
func prepareSimpleDeleteQueries(fromLedgerIdx uint64, toLedgerIdx uint64, deleteQueryTemplate string) deleteInfo {
var info = deleteInfo{Query: deleteQueryTemplate}
// Note: we deliberately add 1 extra ledger to make sure we delete any data Clio might have written
// if it crashed or was stopped in the middle of writing just before it wrote ledger_range.
for i := fromLedgerIdx; i <= toLedgerIdx+1; i++ {
info.Data = append(info.Data, deleteParams{Seq: i})
}
return info
}
func prepareDeleteQueries(cluster *gocql.ClusterConfig, fromLedgerIdx uint64, queryTemplate string, deleteQueryTemplate string) (deleteInfo, uint64, uint64) {
rangesChannel := make(chan *tokenRange, len(ranges))
for i := range ranges {
rangesChannel <- ranges[i]
}
close(rangesChannel)
outChannel := make(chan deleteParams)
var info = deleteInfo{Query: deleteQueryTemplate}
go func() {
for params := range outChannel {
info.Data = append(info.Data, params)
}
}()
var wg sync.WaitGroup
var sessionCreationWaitGroup sync.WaitGroup
var totalRows uint64
var totalErrors uint64
wg.Add(numberOfParallelClientThreads)
sessionCreationWaitGroup.Add(numberOfParallelClientThreads)
for i := 0; i < numberOfParallelClientThreads; i++ {
go func() {
defer wg.Done()
var session *gocql.Session
var err error
if session, err = cluster.CreateSession(); err == nil {
defer session.Close()
sessionCreationWaitGroup.Done()
sessionCreationWaitGroup.Wait()
preparedQueryString := fmt.Sprintf(queryTemplate, "?", "?")
preparedQuery := session.Query(preparedQueryString)
for r := range rangesChannel {
preparedQuery.Bind(r.StartRange, r.EndRange)
iter := preparedQuery.Iter()
var key []byte
var seq uint64
var rowsRetrieved uint64
for iter.Scan(&key, &seq) {
rowsRetrieved++
// only grab the rows that are in the correct range of sequence numbers
if fromLedgerIdx <= seq {
outChannel <- deleteParams{Seq: seq, Blob: key}
}
}
if err := iter.Close(); err != nil {
log.Printf("ERROR: iteration failed: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [from=%d][to=%d]", preparedQueryString, r.StartRange, r.EndRange))
atomic.AddUint64(&totalErrors, 1)
}
atomic.AddUint64(&totalRows, rowsRetrieved)
}
} else {
log.Printf("ERROR: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
}
}()
}
wg.Wait()
close(outChannel)
return info, totalRows, totalErrors
}
func performDeleteQueries(cluster *gocql.ClusterConfig, info *deleteInfo, colSettings columnSettings) (uint64, uint64) {
var wg sync.WaitGroup
var sessionCreationWaitGroup sync.WaitGroup
var totalDeletes uint64
var totalErrors uint64
chunks := splitDeleteWork(info)
chunksChannel := make(chan []deleteParams, len(chunks))
for i := range chunks {
chunksChannel <- chunks[i]
}
close(chunksChannel)
wg.Add(len(chunks))
sessionCreationWaitGroup.Add(len(chunks))
for i := 0; i < len(chunks); i++ {
go func() {
defer wg.Done()
var session *gocql.Session
var err error
if session, err = cluster.CreateSession(); err == nil {
defer session.Close()
sessionCreationWaitGroup.Done()
sessionCreationWaitGroup.Wait()
preparedQuery := session.Query(info.Query)
var bindCount = strings.Count(info.Query, "?")
for chunk := range chunksChannel {
for _, r := range chunk {
if bindCount == 2 {
preparedQuery.Bind(r.Blob, r.Seq)
} else if bindCount == 1 {
if colSettings.UseSeq {
preparedQuery.Bind(r.Seq)
} else if colSettings.UseBlob {
preparedQuery.Bind(r.Blob)
}
}
if err := preparedQuery.Exec(); err != nil {
log.Printf("DELETE ERROR: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s\n", fmt.Sprintf("%s [blob=0x%x][seq=%d]", info.Query, r.Blob, r.Seq))
atomic.AddUint64(&totalErrors, 1)
} else {
atomic.AddUint64(&totalDeletes, 1)
}
}
}
} else {
log.Printf("ERROR: %s\n", err)
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
atomic.AddUint64(&totalErrors, 1)
}
}()
}
wg.Wait()
return totalDeletes, totalErrors
}
func updateLedgerRange(cluster *gocql.ClusterConfig, ledgerIndex uint64) error {
log.Printf("Updating latest ledger to %d\n", ledgerIndex)
if session, err := cluster.CreateSession(); err == nil {
defer session.Close()
query := "UPDATE ledger_range SET sequence = ? WHERE is_latest = ?"
preparedQuery := session.Query(query, ledgerIndex, true)
if err := preparedQuery.Exec(); err != nil {
fmt.Fprintf(os.Stderr, "FAILED QUERY: %s [seq=%d][true]\n", query, ledgerIndex)
return err
}
} else {
fmt.Fprintf(os.Stderr, "FAILED TO CREATE SESSION: %s\n", err)
return err
}
return nil
}